1.1. Pipeline Designer UI
The following image shows the Pipeline Designer UI when you configure a pipeline:
AREA/ICON |
NAME |
DESCRIPTION |
1 |
Pipeline canvas |
Displays the pipeline. Use to configure the pipeline data flow. |
2 |
Pipeline Creation Help Bar |
Offers lists of stages to help complete the pipeline. You can use the help bar to connect a stage to an open node. You can also add a stage between linked stages by clicking the link. |
3 |
Stage list |
Lists the stages in the pipeline. Use to select a stage to configure. You can also select the stage in the canvas. |
4 |
Properties panel |
Displays the properties of the pipeline or selected stage when you configure a pipeline. |
5 |
Stage library panel |
List of available stages. Use to add stages to the pipeline. You can drag a stage to a location on canvas or click a stage to add it to the end of the pipeline. You can view all stages, stages by type, or stages by library. You can also search for a stage by name. |
![]()
|
Pipeline name display |
Displays the name of the pipeline in the canvas. |
![]()
|
Pipeline version display and selection |
The version of the pipeline in the canvas. To select a different version, click the icon and select the version to view. |
![]()
|
Publish icon |
Publishes the pipeline or fragment in the canvas. Displays for pipelines only when the pipeline passes implicit validation. When the pipeline has already been published, the Edit icon displays in the same location. Publish a pipeline to enable creating a job for the pipeline. Publish a fragment to enable using the fragment in a pipeline. |
![]()
|
Edit icon |
Enables editing the pipeline or fragment. Displays when the pipeline or fragment has already been published and is being viewed in read only mode. When the pipeline or fragment is already in edit mode, the Publish icon displays in the same location. |
![]()
|
Compare with Previous Version icon |
Compares the pipeline or fragment in the canvas with a previous version. |
History icon |
Displays the history of the pipeline or fragment in the canvas. |
|
![]()
|
Validation Errors icon |
Lists the number of validation errors for implicit validation. Click to view the error messages. |
![]()
|
More icon |
Provides additional actions to take. Use to delete or export a pipeline, or to update the stage libraries used in the pipeline. Use to import or export a pipeline fragment. |
![]()
|
Delete icon |
Deletes the selected item in the canvas. |
![]()
|
Duplicate Stage icon |
Duplicates the selected stage. |
![]()
|
Auto Arrange icon |
Automatically arranges the stages on the canvas. |
![]()
|
Authoring Data Collector icon |
Authoring Data Collector associated with the pipeline. You can click the icon and select a different Data Collector to use. |
![]()
|
Validate icon |
Validates the pipeline. Performs explicit validation. Displays when the pipeline is in edit mode and passes implicit validation. |
![]()
|
Preview icon |
Starts data preview. Available for valid pipelines when using a registered Data Collector as the authoring Data Collector. Not available when using the system Data Collector for authoring. |
![]()
|
Share icon |
Shares the pipeline or fragment with users and groups. Use to configure permissions for the pipeline or fragment. |
![]()
|
Create Job icon |
Creates a job based on the pipeline. Creates a job for a published pipeline. |
![]()
|
Stage Library icon |
Toggles the display of the Stage Library panel. |
![]()
|
Expand Fragments icon |
Expands all pipeline fragments in the pipeline, displaying all stages in the fragments. |
![]()
|
Collapse Fragments icon |
Collapses expanded pipeline fragments, displaying a single fragment stage for each fragment. |
![]()
|
Stream link icon |
Indicates the flow of data through the pipeline or fragment. Select to configure data rules and alerts. Darker icons indicate that a data rule is configured for the link. |
![]()
|
Error icon |
Indicates that one or more required properties are not defined. Can display on a stage for stage properties or in the canvas for pipeline properties. Related error messages display when you hover over the icon. You can also view the messages in the Validation Errors list. The icon can also display on tabs in the properties panel to indicate the location of missing properties. |
Note: Some icons and options might not display. The items that display are based on the task that you are performing and roles assigned to your user account.
For example, the Create Job icon displays only for published pipelines when you log in with the Job Operator role. Or, if you log in with only the Pipeline User role configuration-related icons are not available.
1.2. Pipeline Designer Tips
The Control Hub Pipeline Designer is closely based on the Data Collector pipeline configuration canvas. Some functional differences are described below:
Authoring Data Collector
When configuring a pipeline in Pipeline Designer, you select the authoring Data Collector to use. You can use the system Data Collector provided with Control Hub for exploration and light development. To perform explicit validation or data preview, use one of your registered Data Collectors as the authoring Data Collector.
Choose a registered Data Collector that is the same version as the Data Collectors that you intend to use to run the pipeline in production. Using a different Data Collector version can result in developing a pipeline that is invalid for production Data Collectors.
For example, if you use a more recent version of Data Collector for development than for production, you might include a stage, stage library, or stage functionality does not exist in the production Data Collector.
The registered Data Collector must meet certain requirements to be used as the authoring Data Collector. For more information, see Authoring Data Collectors.
Create a Pipeline
When you create a pipeline, you specify whether the pipeline will run on Data Collector or SDC Edge, and you can start with a blank canvas or an existing template.
Pipeline Designer provides several Data Collector and SDC Edge templates as sample pipelines. You can use them to familiarize yourself with Pipeline Designer and pipeline designs or you can use them as a basis for pipeline development.
Edit a Published Pipeline
When viewing a published pipeline, Pipeline Designer displays the pipeline in read only mode. The pipeline mode displays above the canvas as shown:
To edit a published pipeline, click the Edit icon: . The pipeline then enters Edit mode.
Update Stage Libraries for the Pipeline
When editing a pipeline, you can use the Update Stage Libraries dialog box to update the stage libraries for multiple stages in the pipeline.
This allows you to update all necessary stage libraries at one time when you change the authoring Data Collector for the pipeline. When preferred, you can also change stage libraries individually by editing each stage.
The stage libraries that display for each stage depends on the authoring Data Collector selected for the pipeline. For example, if the authoring Data Collector has the MapR 5.0 and 5.1 stage libraries installed, then these are the stage libraries that display for a MapR FS destination or MapR FS File Metadata executor.
To update multiple stage libraries at one time, click the More icon ( ), then select Update Stage Libraries.
The Update Stage Libraries dialog box displays the stage name and type for each stage in the pipeline. On the right is the corresponding list of stage libraries for the stage that are available on the authoring Data Collector.
Update the stage libraries as needed, then click Update to save your changes.
Work with Versions
When you have multiple versions of a pipeline, Pipeline Designer indicates the version of the pipeline that you are viewing. You can click the pipeline version to select a different version to view, as follows:
When you first create a pipeline in Pipeline Designer, the pipeline is in a draft state so that you can add and configure stages in the pipeline. Draft pipelines are versioned as <version>-DRAFT, for example, v1-DRAFT or v2-DRAFT. When you finish editing a draft pipeline, publish the pipeline to indicate that the pipeline is final and is available to be added to a job.
Published pipelines are versioned by number, for example, v1 or v2. You cannot edit published pipelines. When viewing a published version, you can click edit to create a new version based on the pipeline in the canvas.
For more information about working with versions, see Comparing Pipeline Versions.
Publish a Pipeline
When you have completed work on a pipeline, you publish the pipeline. After you publish the pipeline, you can create and run jobs based on the pipeline.
Use the Publish icon to publish a valid pipeline: . Enter a commit message stating what changed in this pipeline version so that you can track the commit history of the pipeline.
After you publish the pipeline, it enters read only mode and can be used to create a job.
Create a Job
After you publish a pipeline, you can create a job.
You can create a job using the Create Job icon in Pipeline Designer: . Or, you can create a job from the Jobs view.
Data Preview Requirement
You can preview data when the pipeline uses a registered Data Collector as the authoring Data Collector.
If the pipeline uses the system Data Collector or a selected registered Data Collector that is not accessible, the Preview Pipeline icon ( ) is disabled.
Validation Requirement
You can perform explicit validation when the pipeline uses a registered Data Collector as the authoring Data Collector.
Use the Validate icon to perform explicit validation: .
Compare Versions
When there are multiple versions of the pipeline, you can use the Compare with Previous Version icon ( ) to compare the current version with a previous version.
In the Comparison window, you can select different pipeline versions in each window and navigate through the pipelines.
View Pipeline History
You can view the history of the pipeline to see the commit history and the tags associated with the pipeline.
Use the History icon to view the pipeline history: .
For more information about working with the pipeline history, see Pipeline History.
Deleting a Pipeline Version
When a pipeline has multiple versions, you can delete the pipeline version that you are viewing. Or, you can delete all versions of the pipeline. Use the Delete Pipeline option available in the More menu:
You can only delete pipelines or pipeline versions when they are not used in jobs.
1.3. Retrying the Pipeline
By default, when Data Collector encounters a stage-level error that might cause a standalone pipeline to fail, it retries the pipeline. That is, it waits a period of time, and then tries again to run the pipeline.
A stage-level error might include a stage not being able to connect to an external system because the system or network is down.
You can define the maximum number of pipeline retries that Data Collector attempts.
With the default value, -1, Data Collector retries the pipeline an infinite number of times. This allows the pipeline to continue running as soon as any temporary connection or system failures resolve.
If you want the pipeline to stop after a given number of retries, you can define the maximum number of retries to perform.
The wait time between retries begins at 15 seconds and doubles in length until reaching a maximum of 5 minutes.
1.4. Pipeline Memory
When you enable the monitor.memory Data Collector configuration property, you can use the Max Pipeline Memory pipeline property to define the maximum amount of memory Data Collector uses when it runs the pipeline.
The monitor.memory property should only be enabled during development and testing. For more information, see Configuring Data Collector in the Data Collector documentation.
When you configure the Max Pipeline Memory pipeline property, best practice is to configure the pipeline to use up to 65% of the Data Collector Java heap size. For example, with the default Data Collector Java heap size of 1024 MB, the highest setting for the pipeline memory should be 665 MB.
You can also configure the action to take when pipeline memory usage exceeds the configured amount.
Tip: To avoid errors, you might create an alert to indicate when the memory usage approaches the configured limit.
For information about configuring the Data Collector heap size, see Java Heap Size in the Data Collector documentation.
1.5. Rate Limit
You can limit the rate at which a pipeline processes records by defining the maximum number of records that the pipeline can read in a second.
By default, a pipeline has no rate limit. You might want to limit the rate for the following reasons:
The pipeline reads records faster than it can write them to the destination system
Because a pipeline processes one batch at a time, pipeline performance slows when a pipeline reads records faster than it can process them or write them to the destination system. The pipeline must wait until a batch is committed to the destination system before reading the next batch, causing a delay before the next batch is read and preventing the pipeline from reading at a steady rate. Reading data at a steady rate provides better performance than reading sporadically.
You can limit the rate at which the pipeline reads records to decrease the delay between reads from the origin system.
The origin system requires that the data be read at a slower rate
If the origin system is being used for other purposes, it might not be able to handle the rate at which the pipeline reads records. You can limit the rate to meet the origin system requirements.
Use the Rate Limit pipeline property to define the maximum number of records that the pipeline can read in a second.
1.6. Simple and Bulk Edit Mode
Some pipeline and pipeline stage properties include an Add icon ( ) where you add additional configurations. You can add the configurations in simple or bulk edit mode. By default, each property uses simple edit mode.
For example, in the Expression Evaluator, you can click the Add icon in simple edit mode to add multiple expressions to the processor. The stage displays text boxes for you to enter configuration values, as follows:
Or, you can switch to bulk edit mode to enter a list of configurations in JSON format, for example:
When you switch to bulk edit mode, Data Collector displays the correct field names and any values that you entered in simple edit mode. Modify the values, but be sure not to modify the field names. If you modify the field names by mistake, Data Collector displays a validation error message.
In bulk edit mode, you can add additional configurations by entering text in JSON format. Or, you can click the Add icon to add another configuration with empty values and then enter the values in the JSON text.
Bulk edit mode can be particularly useful when you have a large number of configurations to enter. For example, you might want to configure the OPC UA Client origin to subscribe to hundreds of nodes in an OPC UA server. In this case, use the Add icon to add one node configuration. Then, switch to bulk edit mode and copy the JSON text for that node configuration into a text file. Use the same format and required field names to configure additional nodes in the text file. Then simply copy the node configurations from the JSON text file into bulk edit mode, rather than individually adding each node with the Add icon.
1.7. Runtime Values
Runtime values are values that you define outside of the pipeline and use for stage and pipeline properties. You can change the values for each pipeline run without having to edit the pipeline.
You can use runtime values for any pipeline property that allows the use of the expression language. You can, for example, use runtime values to represent batch sizes, timeouts, directories, and URI. You cannot use runtime values to represent fields.
You can use the following methods of passing runtime values to pipelines:
Runtime parameters
Use runtime parameters when you want to specify the values for pipeline properties when you start the pipeline.
You define runtime parameters when you configure the pipeline, and then you call the parameters from within that pipeline. When you start the pipeline, you specify the parameter values to use.
Runtime parameters are defined for a single pipeline - only that pipeline can call them.
Note: In Data Collector versions earlier than 2.5.0.0, pipeline runtime parameters were named pipeline constants.
Runtime properties
Use runtime properties when you want to define values for multiple pipeline properties in a file.
You define runtime properties in a file local to the Data Collector, and then you call the properties from within a pipeline. At runtime, Data Collector loads the property values from the file. A runtime properties file can contain multiple properties.
Runtime properties are defined for the entire Data Collector - any pipeline can call them.
Runtime resources
Use runtime resources when you want to secure sensitive information in files with restricted permissions.
You define runtime resources in a file local to the Data Collector, and then you call the resources from within a pipeline. You can restrict the permissions for the resource files to secure sensitive information. A resource file must contain one piece of information.
Runtime resources are defined for the entire Data Collector - any pipeline can call them.
Note: When configuring runtime values for pipelines run by StreamSets Control Hub, all Data Collector that run the pipeline must have the values locally defined in the expected locations.
1.7.1. Using Runtime Parameters
Runtime parameters are parameters that you define in a pipeline and then call from within that same pipeline. When you start the pipeline, you specify the parameter values to use. Use runtime parameters to specify values for pipeline properties when you start the pipeline.
Use runtime parameters to define values for stage and pipeline properties. For example, you can define an error directory parameter that points to different directories on a test machine and a production machine. Or you can define a connection parameter that points to different database connections for an origin in the test and production environments.
When you define a runtime parameter, you enter the default value to use. When you start the pipeline, you can specify another value to override the default. When the pipeline runs, the value replaces the name of the runtime parameter.
Note: If you shut down and then restart the Data Collector without stopping the pipeline, the pipeline continues running with the last set parameter values.
To implement runtime parameters, perform the following steps:
- Define runtime parameters.
- Use an expression in the pipeline to call a runtime parameter.
- Start the pipeline with parameter values.
Step 1. Define Runtime Parameters
Define runtime parameters when you configure the pipeline.
- In the pipeline properties, click the Parameters tab.
- Using simple or bulk edit mode, click the Add icon and define the name and the default value for each parameter.
For example, define a parameter named JDBCConnectionString with the default value of jdbc:mysql://localhost:3306/sample.
Step 2. Call the Runtime Parameter
Use an expression to call a runtime parameter. You can use runtime parameters to represent any stage or pipeline property that allows the use of the expression language.
To call a runtime parameter in a stage or pipeline property, use the following syntax:
${<parameter name>}
For example, to use the JDBCConnectionString runtime parameter for the JDBC Multitable Consumer origin, enter the following syntax for the JDBC Connection String property:
${JDBCConnectionString}
You can call a runtime parameter from within an expression language function by simply entering the parameter name. For example, the following expression returns the value of the JDBCConnectionString runtime parameter:
${record:value(JDBCConnectionString)}
You can use a runtime parameter to represent a part of a property. For example, you could use a RootDir runtime parameter and append the rest of the directory in the property as follows:
${RootDir}/logfiles
You can also call a runtime parameter in the code developed for a scripting processor. The method you use to call the runtime parameter depends on the following scripting processor types:
JavaScript Evaluator or Jython Evaluator processor
Use the following syntax in any of the processor scripts: ${<parameter name>}. For example, the following line of JavaScript code assigns the value of the NewFieldValue parameter to a map field:
records[i].value.V= ${NewFieldValue}
Groovy Evaluator processor
Use the sdcFunctions.pipelineParameters() method in any of the processor scripts to return a map of all runtime parameters defined for the pipeline. For example, the following line of Groovy code assigns the value of the CompanyParam parameter to the Company Name field:
record.value['Company Name'] = sdcFunctions.pipelineParameters()['CompanyParam']
1.7.2. Using Runtime Properties
Runtime properties are properties that you define in a file local to the Data Collector and call from within a pipeline. With runtime properties, you can define different sets of values for different Data Collectors.
Use runtime properties to define values for stage and pipeline properties. For example, you can define an error directory runtime property that points to different directories on a test machine and a production machine. Similarly, you might create test and production runtime properties for the origin and destination stages.
When defining a runtime property, you can use a static value or an environment variable.
When calling the runtime property, you can use it as part of a larger property definition. For example, you can set a runtime property to the HOME environment variable, which would differ on different machines, and then use the runtime property as a base directory for a longer directory.
To implement runtime properties, perform the following steps:
- Define runtime properties.
- Use an expression in the pipeline to call a runtime property.
Step 1. Define Runtime Properties
You can define runtime properties in the Data Collector configuration file, sdc.properties, or in a separate runtime properties file.
If you define the properties in a separate runtime properties file, use the required procedure for your installation type.
Data Collector configuration file
Use the following steps to define runtime properties in the Data Collector configuration file:
- In the $SDC_CONF/sdc.properties file, configure the runtime.conf.location property as follows:
- To define the runtime properties in the $SDC_CONF/sdc.properties file, use either of the following formats:
runtime.conf.location=embedded
- To use a value for a runtime property, use the following format:
runtime.conf_<property name>=<value>
For example, the following runtime property defines a Hadoop FS directory template:
runtime.conf_HDFSDirTemplate=/HDFS/DirectoryTemplate
- To use an environment variable for a runtime property, use the following format:
runtime.conf_<property name>=${env("<environment_variable>")}
For example, the following runtime property defines a base directory, setting it to the HOME environment variable:
runtime.conf_BaseDir=${env("HOME")}
- Restart Data Collector to enable the changes.
Separate runtime properties file for RPM and tarball
Use the following steps to define runtime properties in a separate runtime properties file for an RPM or tarball installation:
- Create a text file and save it in a directory relative to the $SDC_CONF directory.
- To define runtime properties in a separate text file, use either of the following formats:
- To use a value for a runtime property, use the following format:
<property name>=<value>
For example, the following runtime property defines a Hadoop FS directory template:
HDFSDirTemplate=/HDFS/DirectoryTemplate
- To use an environment variable for a runtime property, use the following format:
<property name>=${env("<environment_variable>")}
For example, the following runtime property defines a base directory, setting it to the HOME environment variable:
BaseDir=${env("HOME")}
- In the Data Collector configuration file, $SDC_CONF/sdc.properties, configure the runtime.conf.location property to point to the relative location of the separate runtime properties file.
For example, the following separate runtime properties file is located in a runtime directory that is relative to the $SDC_CONF directory:
runtime.conf.location=runtime/test-runtime.properties
- Restart Data Collector to enable the changes.
Separate runtime properties file for Cloudera Manager
Use the following steps to define runtime properties in a separate runtime properties file for a Cloudera Manager installation:
- Create a text file and define the runtime properties in the text file using either of the following formats:
- To use a value for a runtime property, use the following format:
<property name>=<value>
For example, the following runtime property defines a Hadoop FS directory template:
HDFSDirTemplate=/HDFS/DirectoryTemplate
- To use an environment variable for a runtime property, use the following format:
<property name>=${env("<environment_variable>")}
For example, the following runtime property defines a base directory, setting it to the HOME environment variable:
BaseDir=${env("HOME")}
- Save the text file in the same directory on every node that runs Data Collector.
10. In Cloudera Manager, select the StreamSets service and click Configuration.
11. On the Configuration page, in the Data Collector Advanced Configuration Snippet (Safety Valve) for sdc-env.sh field, add the following line to define the runtime properties file directory:
ln -sf /<directory>/runtime.properties "${CONF_DIR}/runtime.properties"
For example:
ln -sf /opt/sdc-runtime/runtime.properties "${CONF_DIR}/runtime.properties"
12. In the Data Collector Advanced Configuration Snippet (Safety Valve) for sdc.properties field, configure the runtime.conf.location property to point to the separate runtime properties file by adding the following line:
runtime.conf.location=runtime.properties
13. Restart Data Collector to enable the changes.
For more information, see Configuring Data Collector in the Data Collector documentation.
Step 2. Call the Runtime Property
Use the runtime:conf function to call a runtime property. You can use runtime properties to represent any stage or pipeline property that allows the use of the expression language.
To call a runtime property, use the following syntax:
${runtime:conf(<property name>)}
Note: If you defined the runtime properties in the Data Collector configuration file, enter just <property name> and not runtime.conf_<property name>.
For example, to use the HDFSDirTemplate runtime property for the Hadoop FS destination, enter the following syntax for the Directory Template property:
${runtime:conf('HDFSDirTemplate')}
You can use a runtime property to represent a part of a property. For example, you could use a RootDir runtime property and append the rest of the directory in the property as follows:
${runtime:conf('RootDir')}/logfiles
1.7.3. Using Runtime Resources
Similar to runtime properties, runtime resources are values that you define in a file local to the Data Collector and call from within a pipeline. But with runtime resources, you can restrict the permissions for the files to secure sensitive information. Use runtime resources to load sensitive information from files at runtime.
Use runtime resources to define sensitive values for stage and pipeline properties. You can, for example, use runtime resources to represent user names and passwords, or OAuth authentication information.
Tip: To more securely define sensitive values, use credential stores.
To implement runtime resources, perform the following steps:
- Define each runtime resource.
- Use an expression in the pipeline to call a runtime resource.
Step 1. Define Runtime Resources
Use the following steps to define runtime resources:
- For each resource, create a text file and save it in the $SDC_RESOURCES directory.
A file must contain one piece of information to be used when the resource is called.
- Optionally, restrict the permissions for the file.
Generally, anyone can read a file. To restrict permissions, configure the file so only the owner has read or write permissions for the file - in octals, that's 600 or 400. And the owner must be the system user that runs the Data Collector.
When you use the resource in the pipeline, you specify whether the file is restricted.
Step 2. Call the Runtime Resource
Use the runtime:loadResource or runtime:loadResourceRaw function to call a runtime resource. You can use runtime resources to represent sensitive information in any stage or pipeline property that allows the use of the expression language.
Note: In most cases, you'll use the runtime:loadResource function which trims any leading or trailing whitespace characters from the file. However, if needed, you can also use the runtime:loadResourceRaw function which includes any leading or trailing whitespace characters in the file.
To call a runtime resource, use the following syntax:
${runtime:loadResource(<file name>, <restricted: true | false>)}
For example, the following expression returns the contents of the JDBCpassword.txt file, trimming any leading or trailing whitespace characters. The file contains a password and is restricted so only the owner can read the file:
${runtime:loadResource("JDBCpassword.txt", true)}
1.8. Event Generation
The event framework generates events for standalone pipelines when the pipeline starts and stops. You can pass the event to an executor or to another pipeline for additional processing. By default, these events are discarded. For more information about pipeline events, see Pipeline Event Generation.
For general information about the event framework, see Dataflow Triggers Overview.
1.8.1. Pipeline Event Records
Pipeline event records have the following event-related record header attributes. Record header attributes are stored as String values:
Record Header Attribute |
Description |
sdc.event.type |
Event type. Uses one of the following types:
|
sdc.event.version |
An integer that indicates the version of the event record type. |
sdc.event.creation_timestamp |
Epoch timestamp when the stage created the event. |
The event framework generates the following types of pipeline events:
Start
The event framework generates start events as the pipeline initializes, immediately after it starts and before individual stages are initialized.
The start event record has the sdc.event.type set to "start", and the following fields:
Pipeline Start Event Field |
Description |
pipelineId |
The ID of the pipeline that started. |
pipelineName |
The user-defined name of the pipeline that started. |
user |
The user who started the pipeline. |
parameters |
Any parameters used when starting the pipeline. |
Stop
The event framework generates stop events as the pipeline stops, either manually, programmatically, or due to a failure. The stop event is generated after all stages have completed processing and cleaning up temporary resources, such as removing temporary files.
The stop event record has the sdc.event.type set to "stop", and the following fields:
Pipeline Stop Event Field |
Description |
pipelineId |
The ID of the pipeline that stopped. |
pipelineName |
The user-defined name of the pipeline that stopped. |
reason |
The reason why the pipeline stopped. Can be set to the following reasons:
|
user |
The user who stopped the pipeline, when relevant. |
1.9. Webhooks
You can configure a pipeline use webhooks. A webhook is a user-defined HTTP callback - an HTTP request that the pipeline sends automatically when certain actions occur. You can use webhooks to automatically trigger external tasks based on an HTTP request. Tasks can be as simple as sending a message through an application API or as powerful as passing commands to the Data Collector command line interface.
You can configure the following types of webhooks:
Trigger |
Description |
Configuration Location |
Alert |
The pipeline sends all alert webhooks each time an alert is triggered. For example, if your text message application has a webhook API, you can have the pipeline send texts when alerts are triggered. For details on how to configure an alert webhook, see Configuring an Alert Webhook. |
Pipeline Rules tab > Webhook tab |
State notification |
The pipeline sends all state notification webhooks when the pipeline transitions to the selected pipeline states. For example, you can send an HTTP request to the Data Collector REST API to start a different pipeline when the current pipeline transitions to a Finished state. For details on how to configure a state notification webhook, see the Notifications properties in Configuring a Pipeline. |
Pipeline Configuration tab > Notifications tab |
The flexibility of webhooks enables you to automatically trigger a wide range of tasks, depending on the ability of external systems to support webhooks or process HTTP requests.
Important: You must configure webhooks as expected by the receiving system. For details on how to configure incoming webhooks check the receiving system's documentation. You might also need to enable webhook usage within that system.
When you configure a webhook, you specify the URL to send the request and the HTTP method to use. Some HTTP methods allow you to include a request body or payload. In the payload, you can use parameters to include information about the cause of the trigger, such as the text of the alert or the latest pipeline state. You can also include request headers, content type, authentication type, username and password as needed.
1.9.1. Request Method
You can use the following methods in webhooks:
- GET
- PUT
- POST
- DELETE
- HEAD
1.9.2. Payload and Parameters
You can include a request body or payload for PUT, POST, and DELETE request webhooks. Depending on the receiving system, the payload might not be used. For example, when using the Data Collector REST API, you simply include all required information in the URL.
When you configure a payload, you can use any valid content type, then specify the content type in the webhook properties. When defining a message, be sure to consider when the pipeline sends the webhook and include the information that the recipient needs.
You can use parameters in the payload to include information about the action that triggered the webhook. For example, when configuring a webhook that sends a text message when a pipeline stops, you might include the pipeline name, pipeline state, and the time parameters in the message.
Enclose parameters in double curly brackets, as follows:
{{<parameter name>>}}
You can use the following parameters in webhooks:
state notification parameters
When configuring a state notification webhook, you can use the following parameters:
- PIPELINE_TITLE - The pipeline title or name.
- PIPELINE_URL - The direct URL to the pipeline.
- PIPELINE_STATE - The current state of the pipeline.
- TIME - The time of the triggered request.
- PIPELINE_STATE_MESSAGE - The pipeline state and any related information, such as the last-saved offset or the error message for error states.
- PIPELINE_INPUT_RECORDS_COUNT - The number of records read.
- PIPELINE_OUTPUT_RECORDS_COUNT - The number of records written.
- PIPELINE_ERROR_RECORDS_COUNT - The number of error records.
- PIPELINE_ERROR_MESSAGES_COUNT - The number of error messages.
- PIPELINE_RUNTIME_PARAMETERS - The number of pipeline parameters.
- PIPELINE_METRICS - Metrics data for the pipeline.
For example, say you configure the pipeline to send a webhook only when the pipeline transitions to the Stopped state - that is, when someone stops the pipeline. You might use the following message in a JSON request body:
{
"text":"At {{TIME}}, a user stopped the {{PIPELINE_TITLE}} pipeline. \n <To see the pipeline, click here: {{PIPELINE_URL}}"
}
However, if the pipeline is configured to send webhooks when the pipeline changes to several different states, you might use a more generic message and include the pipeline state in the message. For example:
{
"text":"The '{{PIPELINE_TITLE}}' pipeline changed state to {{PIPELINE_STATE}} at {{TIME}}. \n Pipeline status message: {{PIPELINE_STATE_MESSAGE}} \n <{{PIPELINE_URL}}|Click here for details.>"
}
alert parameters
When configuring an alert webhook, you can use the following parameters:
- ALERT_CONDITION - The condition of the rule associated with the alert.
- ALERT_NAME - The alert label or name.
- ALERT_TEXT - The text configured for the alert.
- ALERT_VALUE - The value that triggered the condition. For example, if the alert is configured to trigger upon reaching 1000 error records, the alert value will be 1000.
- PIPELINE_TITLE - The pipeline title.
- PIPELINE_URL - The direct URL to the pipeline.
- TIME - Time of the triggered request.
For example, say you configure a pipeline to send a webhook alert each time an alert triggers. To include some key information in the JSON request body, you might try this:
{
"text":"{{ALERT_TEXT}}: At {{TIME}}, {{ALERT_NAME}} was triggered by {{ALERT_VALUE}} for the following condition: {{ALERT_CONDITION}}. \n This is for the {{PIPELINE_TITLE}} pipeline, at {{PIPELINE_URL}}"
}
1.9.3. Examples
Here are some examples of how you might use webhooks:
Send webhooks to a Slack channel
You can configure a pipeline to send webhooks to a Slack channel. For example, you could have all alerts sent to an Operations Slack channel so someone on your operations team can respond to the alert.
To do this, you perform the following steps:
- Configure Slack for incoming webhooks for the channel you want to use.
At the time of writing, you can enable webhooks in Slack starting from this page. When you enable webhooks, Slack generates a URL for you to use. If you have already enabled webhooks, check your account information for the Slack URL.
- Copy the URL that Slack generates and use this to configure the pipeline.
The URL looks something like this:
https://hooks.slack.com/services/<random string>/<random string>/<random string>
Slack also offers instructions on how to configure the payload. At this time, they suggest a text field with the contents of the message, like Data Collector default payload.
- Configure the webhook in the pipeline.
For Slack, you can just enter the URL and accept the defaults for everything else.
Here's how a Slack alert webhook might look:
Start another pipeline
You can start a pipeline after the first pipeline completes all processing using dataflow triggers, the Pipeline Finisher executor, and a state notification webhook.
For example, say you have a JDBC Query Consumer origin that performs a full query to process all legacy data in some database tables.
You configure the origin to generate an event when it completes processing all available data, and you connect the event stream to the Pipeline Finisher executor. When the Pipeline Finisher executor receives the event from the JDBC Query Consumer, it transitions the pipeline to a Finished state. For more information on using the Pipeline Finisher executor with dataflow triggers, see Case Study: Stop the Pipeline.
To use this state change to start a second pipeline, configure a webhook that triggers when the pipeline state changes to Finished:
- In the pipeline properties, click the Notifications tab and click Add to add a webhook.
- For the URL, enter the command to start the pipeline.
The command looks something like this:
<http|https>://<system ip>.<http port>/rest/v1/pipeline/<pipeline id>/start
For example: http://localhost:18630/rest/v1/pipeline/MyPipelinefbb6894c-08ec-421e-9e3c-4e6bbb2f5baf/start
- Using simple or bulk edit mode, click the Add icon to add a request header, and add the following header: X-Requested-By
And set the value to: sdc.
You can use the default method and authentication type, and keep or delete the payload, since they aren't used by the REST API.
This is how the webhook might look:
1.10. Notifications
You can configure a pipeline to send an email or webhook when the pipeline changes to specified states.
For example, you might send an email when someone manually stops the pipeline, causing it to transition to a Stopped state. Or you might send a Slack or text message when the pipeline changes to a Start_Error or Run_Error state.
You can send notifications when the pipeline converts to the following states:
- Running
- Start_Error
- Run_Error
- Stopped
- Finished
- Disconnected
- Connecting
You can specify multiple states to trigger notifications, but you cannot configure the pipeline to send different notifications based on different pipeline state changes at this time. For example, if you configure notifications for the Running and Finished states, the pipeline sends notifications when changing to both states.
However, when configuring a webhook, you can use webhook parameters in the payload to indicate the state change that triggered the notification. For more information about webhooks, see Webhooks.
To send email notification, Data Collector must be configured to send email. To enable sending email, configure the email alert Data Collector properties. For more information, see Configuring Data Collector in the Data Collector documentation.
For more information about pipeline states, see Understanding Pipeline States.
1.11. Pipeline Statistics
A Control Hub job defines the pipeline to run and the Data Collectors that run the pipeline. When you start a job, Control Hub remotely runs the pipeline on the group of Data Collectors. To monitor the job statistics and metrics within Control Hub, you must configure the pipeline to write statistics to Control Hub or to another system.
When a job runs on a single Data Collector, you can simply configure the pipeline to write the statistics directly to Control Hub. By default, pipelines are configured to write statistics directly to Control Hub.
When a job runs on multiple Data Collectors, a remote pipeline instance runs on each of the Data Collectors. To view aggregated statistics for the job within Control Hub, you must configure the pipeline to write the statistics to one of the following systems:
- SDC RPC
- Kafka cluster
- Amazon Kinesis Streams
- MapR Streams
When you start a job that includes a pipeline configured to write to Kafka, Kinesis, MapR Streams, or SDC RPC, Control Hub automatically generates and runs a system pipeline for the job. The system pipeline reads the statistics written by each running pipeline instance to Kafka, Kinesis, MapR Streams, or SDC RPC. Then, the system pipeline aggregates and sends the statistics to Control Hub.
Important: For a production environment, use a Kafka cluster, Amazon Kinesis Streams, or MapR Streams to aggregate statistics. Using SDC RPC to aggregate statistics is not highly available and might cause the loss of some data. It should be used for development purposes only.
When a Control Hub job includes a pipeline that is configured to write statistics, you can view the statistics and metrics when you monitor the job in Control Hub:
When a Control Hub job includes a pipeline that is configured to discard statistics, Control Hub cannot display statistics and metrics for the job. Instead, Control Hub displays the following warning message when you monitor the job:
Aggregated metrics for the job are not available as individual pipeline metrics are discarded.
1.11.1. Pipeline Execution Mode
Pipelines can run in standalone, cluster, or edge execution mode. Some pipeline execution modes do not support all statistics aggregator options.
The following table lists each pipeline execution mode and the statistics aggregator options that it supports:
Pipeline Execution Mode |
Supported Statistics Aggregator Options |
Standalone |
All options are supported. |
|
|
|
1.11.2. Write Statistics Directly to Control Hub
When you write statistics directly to Control Hub, Control Hub does not generate a system pipeline for the job. Instead, the Data Collector directly sends the statistics to Control Hub.
Write statistics directly to Control Hub when the job runs on a single Data Collector. If the job runs on multiple Data Collectors, Control Hub can display the pipeline statistics for each individual Data Collector. However, Control Hub cannot display an aggregated view of the statistics across all running pipeline instances.
1.11.3. Write Statistics to SDC RPC
When you write statistics to SDC RPC, Data Collector effectively adds an SDC RPC destination to the pipeline that you are configuring. Control Hub automatically generates and runs a system pipeline for the job. The system pipeline is a pipeline with a Dev SDC RPC with Buffering origin that reads the statistics passed from the SDC RPC destination, and then aggregates and sends the statistics to Control Hub.
Write statistics to SDC RPC when the job runs on multiple Data Collectors.
When you configure a pipeline to write statistics to an SDC RPC destination, you specify the following information:
- SDC RPC connection - The host and port number of the Data Collector machine where Control Hub starts the system pipeline. The host must be a Data Collector machine registered with Control Hub that can run a pipeline for the job. A Data Collector can run the pipeline when it has all labels associated with the job.
For example, if you associate the job with the WestCoast label, then the host specified in the RPC connection must be a machine with a registered Data Collector that also has the WestCoast label.
- SDC RPC ID - A user-defined identifier that allows SDC RPC stages to recognize each other. To avoid mixing statistics from different jobs, use a unique ID for each job.
You can optionally enable encryption to pass data securely and define retry and timeout properties.
Important: For a production environment, use a Kafka cluster, Amazon Kinesis Streams, or MapR Streams to aggregate statistics. Using SDC RPC to aggregate statistics is not highly available and might cause the loss of some data. It should be used for development purposes only.
For more information about SDC RPC pipelines, see SDC RPC Pipeline Overview.
Best Practices for SDC RPC
Consider the following best practices when you configure a pipeline to write statistics to an SDC RPC destination:
- To avoid mixing statistics from different jobs, use a unique SDC RPC ID for each job.
- Monitor the disk space where the Dev SDC RPC with Buffering origin in the system pipeline temporarily buffers the records to disk before passing the records to the next stage in the pipeline.
The Dev SDC RPC with Buffering origin in the system pipeline temporarily buffers the statistics to a queue on disk. If the system pipeline slows, the temporary location on disk might become full. The temporary statistics are written to the location specified in the java.io.tmpdir system property, to a file with the following name:
sdc-fragments<file ID>.queueFile
1.11.4. Write Statistics to Kafka
When you write statistics to a Kafka cluster, Data Collector effectively adds a Kafka Producer destination to the pipeline that you are configuring. Control Hub automatically generates and runs a system pipeline for the job. The system pipeline reads the statistics from Kafka, and then aggregates and sends the statistics to Control Hub.
Write statistics to a Kafka cluster when the job runs on multiple Data Collectors.
When you write statistics to a Kafka cluster, you define connection information and the topic to write to.
You also configure the partition strategy. The pipeline passes data to partitions in the Kafka topic based on the partition strategy that you choose. You can add additional Kafka configuration properties as needed.
Note: At this time, you cannot configure the pipeline to connect securely through SSL/TLS or Kerberos when using Kafka.
Partition Strategy
The partition strategy determines how to write statistics to Kafka partitions. You can use a partition strategy to balance the work load or to write data semantically.
The pipeline can use one of the following partition strategies:
Round-Robin
Writes statistics to a different partition using a cyclical order. Use for load balancing.
Random
Writes statistics to a different partition using a random order. Use for load balancing.
Expression
Writes statistics to a partition based on the results of the partition expression. Use to perform semantic partitioning.
When you configure the partition expression, define the expression to evaluate to the partition where you want statistics written.
Default
Writes statistics using the default partition strategy that Kafka provides.
Best Practices for a Kafka Cluster
Consider the following best practices when you configure a pipeline to write statistics to a Kafka cluster:
- To avoid mixing statistics from different jobs, use a unique topic name for each job.
- Consider the Kafka retention policy.
Each running pipeline instance writes statistics to Kafka, and then the system pipeline consumes the statistics from Kafka. If the system pipeline unexpectedly shuts down, Kafka retains the statistics for the amount of time determined by the Kafka retention policy. If the system pipeline is down for longer than Kafka retains data, the statistics are lost.
1.11.5. Write Statistics to Kinesis Streams
When you write statistics to Amazon Kinesis Streams, Data Collector effectively adds a Kinesis Producer destination to the pipeline that you are configuring. Control Hub automatically generates and runs a system pipeline for the job. The system pipeline reads the statistics from Kinesis Streams, and then aggregates and sends the statistics to Control Hub.
Write statistics to Kinesis Streams when the job runs on multiple Data Collectors.
When you write statistics to Kinesis Streams, you define connection information and the stream to write to.
You also configure the partition strategy. The pipeline passes data to partitions in Kinesis shards based on the partition strategy that you choose. You can add additional Kinesis configuration properties as needed.
AWS Credentials
When the pipeline writes aggregated statistics to Amazon Kinesis Streams, it must pass credentials to Amazon Web Services.
Use one of the following methods to pass AWS credentials:
IAM roles
When Data Collector runs on an Amazon EC2 instance, you can use the AWS Management Console to configure an IAM role for the EC2 instance. Data Collector uses the IAM instance profile credentials to automatically connect to AWS.
When you use IAM roles, you do not need to specify the Access Key ID and Secret Access Key properties when you configure statistics for the pipeline.
For more information about assigning an IAM role to an EC2 instance, see the Amazon EC2 documentation.
AWS access key pairs
When Data Collector does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an IAM role, you must specify the Access Key ID and Secret Access Key properties when you configure statistics for the pipeline.
Best Practices for Kinesis Streams
Consider the following best practices when you configure a pipeline to write statistics to Amazon Kinesis Streams:
- To avoid mixing statistics from different jobs, use a unique stream name for each job.
- Consider the Kinesis Streams retention policy.
Each running pipeline instance writes statistics to Kinesis Streams, and then the system pipeline reads the statistics from Kinesis Streams. If the system pipeline unexpectedly shuts down, Kinesis Streams retains the statistics for the amount of time determined by the Kinesis Streams retention policy. If the system pipeline is down for longer than Kinesis Streams retains data, the statistics are lost.
1.11.6. Write Statistics to MapR Streams
When you write statistics to MapR Streams, Data Collector effectively adds a MapR Streams Producer destination to the pipeline that you are configuring. Control Hub automatically generates and runs a system pipeline for the job. The system pipeline reads the statistics from MapR Streams, and then aggregates and sends the statistics to Control Hub.
Write statistics to MapR Streams when the job runs on multiple Data Collectors.
When you write statistics to MapR Streams, you define the topic to write to. You also configure the partition strategy. The pipeline passes data to partitions in the MapR Streams topic based on the partition strategy that you choose. You can add additional MapR Streams configuration properties as needed.
Before you can write statistics to MapR Streams, you must perform additional steps to enable Data Collector to process MapR data. For more information, see MapR Prerequisites in the Data Collectordocumentation.
Partition Strategy
The partition strategy determines how to write statistics to MapR Streams partitions. You can use a partition strategy to balance the work load or to write data semantically.
The pipeline can use one of the following partition strategies:
Round-Robin
Writes each record to a different partition using a cyclical order. Use for load balancing.
Random
Writes each record to a different partition using a random order. Use for load balancing.
Expression
Writes each record to a partition based on the results of the partition expression. Use to perform semantic partitioning.
When you configure the partition expression, define the expression to evaluate to the partition where you want each record written. The expression must return a numeric value.
Default
Writes each record using the default partition strategy that MapR Streams provides.
Best Practices for MapR Streams
Consider the following best practices when you configure a pipeline to write statistics to MapR Streams:
- To avoid mixing statistics from different jobs, use a unique topic name for each job.
- Consider the MapR Streams retention policy.
Each running pipeline instance writes statistics to MapR Streams, and then the system pipeline consumes the statistics from MapR Streams. If the system pipeline unexpectedly shuts down, MapR Streams retains the statistics for the amount of time determined by the MapR Streams retention policy. If the system pipeline is down for longer than MapR Streams retains data, the statistics are lost.
1.11.7. Configuring a Pipeline to Write Statistics
You can configure a pipeline to write statistics.
- Open the pipeline.
- On the Statistics tab, select one of the following options for the statistics aggregator:
- Discard - Discard the pipeline statistics. Control Hub cannot display statistics and metrics for the job.
- Write to Control Hub Directly - Write the pipeline statistics directly to Control Hub. Use when the job runs on a single Data Collector.
- Write to SDC RPC - Write the pipeline statistics to an SDC RPC destination. Use when the job runs on multiple Data Collectors and for development purposes only.
- Write to Kafka - Write the pipeline statistics to a Kafka cluster. Use when the job runs on multiple Data Collectors.
- Write to Kinesis - Write the pipeline statistics to Amazon Kinesis Streams. Use when the job runs on multiple Data Collectors.
- Write to MapR Streams - Write the pipeline statistics to MapR Streams. Use when the job runs on multiple Data Collectors.
- To write statistics to an SDC RPC destination, on the Stats Aggregator - Write to SDC RPC tab, configure the following properties:
SDC RPC Properties |
Description |
SDC RPC Connection |
Host and port where the system pipeline runs. The host must be a machine with a registered Data Collector that runs a pipeline instance for the job. Use the following format: <host>:<port>. |
Retries per Batch |
Number of times the SDC RPC destination tries to write a batch to the Dev SDC RPC with Buffering origin in the system pipeline. When the SDC RPC destination cannot write the batch within the configured number of retries, it fails the batch. Default is 3. |
Back off Period |
Milliseconds to wait before retrying writing a batch to the Dev SDC RPC with Buffering origin in the system pipeline. The value that you enter increases exponentially after each retry. For example, if you set the back off period to 10, the SDC RPC destination attempts the first retry after waiting 10 milliseconds, attempts the second retry after waiting 100 milliseconds, and attempts the third retry after waiting 1,000 milliseconds. Set to 0 to retry immediately. Default is 0. |
SDC RPC ID |
User-defined ID to allow the SDC RPC destination to pass statistics to the system pipeline. To avoid mixing statistics from different jobs, use a unique ID for each job. You cannot define an expression that evaluates to the ID. |
Connection Timeout (ms) |
Milliseconds to establish a connection to the system pipeline. The SDC RPC destination retries the connection based on the Retries Per Batch property. Default is 5000 milliseconds. |
TLS Enabled |
Enables the secure transfer of data using TLS. |
Truststore File |
Truststore file for TLS. Required if the keystore file is a self-signed certificate. Must be stored in the Data Collector resources directory, $SDC_RESOURCES, on each Data Collector machine that runs a pipeline instance for the job. |
Truststore Password |
Password for the truststore file. Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation. |
Read Timeout (ms) |
Milliseconds to wait for the Dev SDC RPC with Buffering origin in the system pipeline to read data from a batch. The SDC RPC destination retries the write based on the Retries Per Batch property. Default is 2000 milliseconds. |
Use Compression |
Enables the SDC RPC destination to use compression to pass data to the Dev SDC RPC with Buffering origin in the system pipeline. Enabled by default. |
Verify Host in Server Certificate |
Verifies the host in the keystore file on the Data Collector machine that runs the system pipeline. |
- To write statistics to Kafka, on the Stats Aggregator - Write to Kafka tab, configure the following properties:
Kafka Properties |
Description |
Broker URI |
Connection string for the Kafka broker. Use the following format: <host>:<port>. To ensure a connection, enter a comma-separated list of additional broker URI. |
Runtime Topic Resolution |
Do not use at this time. |
Topic |
Topic to use. To avoid mixing statistics from different jobs, use a unique topic name for each job. You cannot define an expression that evaluates to the topic name. |
Partition Strategy |
Strategy to use to write to partitions:
|
Partition Expression |
Expression to use when using the expression partition strategy. Define the expression to evaluate to the partition where you want statistics written. Partition numbers start with 0. Optionally, click Ctrl + Space Bar for help with creating the expression. |
Kafka Configuration |
Additional Kafka properties to use. Using simple or bulk edit mode, click the Add icon and define the Kafka property name and value. Use the property names and values as expected by Kafka. Do not use the broker.list property. |
ZooKeeper URI |
Connection string for the ZooKeeper of the Kafka cluster. Use the following format: <host>:<port>. To use a ZooKeeper quorum, enter a comma-separated list. To use a ZooKeeper chroot path, add the path at the end of the list as follows: <host>:<port>, <host2>:<port2>, .../<chroot_path> |
- To write statistics to Amazon Kinesis Streams, on the Stats Aggregator - Write to Kinesis tab, configure the following properties:
Kinesis Properties |
Description |
Access Key ID |
AWS access key ID. Required when not using IAM roles with IAM instance profile credentials. |
Secret Access Key |
AWS secret access key. Required when not using IAM roles with IAM instance profile credentials. |
Region |
Amazon Web Services region that hosts the Kinesis cluster. |
Endpoint |
Endpoint to connect to when you select Other for the region. Enter the endpoint name. |
Stream Name |
Kinesis stream name. To avoid mixing statistics from different jobs, use a unique stream name for each job. You cannot define an expression that evaluates to the stream name. |
Partitioning Strategy |
Strategy to write data to Kinesis shards:
|
Partition Expression |
Expression to generate the partition key used to pass data to different shards. Use for the expression partition strategy. |
Kinesis Producer Configuration |
Additional Kinesis properties. When you add a configuration property, enter the exact property name and the value. The pipeline does not validate the property names or values. |
- To write statistics to MapR Streams, on the Stats Aggregator - Write to MapR Streams tab, configure the following properties:
MapR Streams Properties |
Description |
Runtime Topic Resolution |
Do not use at this time. |
Topic |
Topic to use. To avoid mixing statistics from different jobs, use a unique topic name for each job. You cannot define an expression that evaluates to the topic name. |
Partition Strategy |
Strategy to use to write to partitions:
|
Partition Expression |
Expression to use when using the expression partition strategy. Define the expression to evaluate to the partition where you want statistics written. Partition numbers start with 0. Optionally, click Ctrl + Space Bar for help with creating the expression. |
MapR Streams Configuration |
Additional configuration properties to use. Using simple or bulk edit mode, click the Add icon and define the MapR Streams property name and value. Use the property names and values as expected by MapR Streams. You can use MapR Streams properties and the set of Kafka properties supported by MapR Streams. |
1.12. SSL/TLS Configuration
Some stages allow you use SSL/TLS to connect to the external system.
When you enable TLS, you can generally configure properties on the TLS tab of the stage. The properties that are available can depend on the stage that you are configuring. The TLS tab can include the following properties:
- Keystore properties
- Truststore properties
- TLS protocols
- Cipher suites
You can enable SSL/TLS type properties in the following stages and locations:
- HTTP Client origin, processor, and destination
- HTTP Server origin
- HTTP to Kafka origin
- Kafka Consumer origin and Kafka Producer destination, Kafka version 0.9.0.0 or later
- Kafka Multitopic Consumer origin
- MongoDB origin and destination, and the MongoDB Oplog origin - These stages require configuring the SDC_JAVA_OPTS environment variable. For more information, see "Enabling SSL/TLS" in the stage documentation.
- MQTT Subscriber origin and MQTT Publisher destination
- OPC UA Client origin
- Salesforce origin, lookup, and destination, and the Einstein Analytics destination
- SDC RPC origin and destination
- SDC RPC to Kafka origin
- Spark executor
- TCP Server origin
- UDP to Kafka origin, Kafka version 0.9.0.0 or later
- WebSocket Client origin and destination
- WebSocket Server origin
- Pipeline error handling, when writing error records to another pipeline
1.12.1. Keystore and Truststore Configuration
When SSL/TLS is enabled in a stage, you can also enable the use of a keystore and a truststore.
Though similar in many ways, a keystore contains a private key and public certificates that are used to verify the identity of the client upon a request from an SSL/TLS server. In contrast, a truststore generally contains certificates from trusted certificate authorities that an SSL/TLS client uses to verify the identity of an SSL/TLS server.
Important: Before enabling SSL/TLS in a stage, store the keystore or truststore file in the Data Collector resources directory, $SDC_RESOURCES.
When you configure a keystore or truststore, you can configure the following properties:
keystore/truststore type
You can use the following types of keystores and truststores:
- Java Keystore File (JKS)
- PKCS-12 (p12 file)
file and location
When specifying the file and location of the keystore or truststore file, you can either use an absolute path to the file or a path relative to the Data Collector resources directory.
password
A password is optional for keystore and truststore files, but highly recommended.
algorithm
Data Collector uses the SunX509 key exchange algorithm by default. You can use any algorithm compatible with your keystore/truststore file that is supported by your JVM.
1.12.2. Transport Protocols
When SSL/TLS is enabled in a stage, you can configure the transport protocol to use.
Data Collector uses TLSv1.2 by default. You can specify one or more other protocols, but versions prior to TLSv1.2 are not as secure.
1.12.3. Cipher Suites
When SSL/TLS is enabled in a stage, you can configure the cipher suites to use to perform the SSL/TLS handshake.
By default, Data Collector can use any of the following cipher suites:
Supported Cipher Suite |
Java Secure Socket Extension (JSSE) Name |
ECDHE-ECDSA-AES256-GCM-SHA384 |
TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384 |
ECDHE-RSA-AES256-GCM-SHA384 |
TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384 |
ECDHE-ECDSA-AES128-GCM-SHA256 |
TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 |
ECDHE-RSA-AES128-GCM-SHA256 |
TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 |
ECDHE-ECDSA-AES256-SHA384 |
TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384 |
ECDHE-RSA-AES256-SHA384 |
TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384 |
ECDHE-ECDSA-AES128-SHA256 |
TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 |
ECDHE-RSA-AES128-SHA256 |
TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256 |
1.13. Implicit and Explicit Validation
Data Collector performs two types of validation:
Implicit validation
Implicit validation occurs by default as the Data Collector UI saves your changes. Implicit validation lists missing or incomplete configuration, such as an unconnected stage or a required property that has not been configured.
Errors found by implicit validation display in the Issues list. An error icons display on stages with undefined required properties and on the canvas for pipeline issues.
Explicit validation
Explicit validation occurs when you click the Validate icon, request data preview, or start the pipeline. Explicit validation becomes available when implicit validation passes.
Explicit validation is a semantic validation that checks all configured values for validity and verifies whether the pipeline can run as configured.
For example, while implicit validation verifies that you entered a value for a URI, explicit validation tests the validity of the URI by connecting to the system.
Errors found by explicit validation display in a list from the validation error message.
1.14. Expression Configuration
Use the expression language to configure expressions and conditions in processors, such as the Expression Evaluator or Stream Selector. Some destination properties also allow the expression language, such as the directory template for the Hadoop FS destination.
You can use the expression language to define any stage or pipeline property that represents a numeric or string value. You can also use field path expressions to select the fields to use in some processors.
Use expression completion to determine where you can use an expression and the expression elements that you can use in that location.
You can use the following elements in an expression:
- Constants
- Datetime variables
- Field names
- Functions
- Literals
- Operators
- Runtime parameters
- Runtime properties
- Runtime resources
Related concepts
1.14.1. Basic Syntax
Precede all expressions with a dollar sign and enclose them with curly brackets, as follows: ${<expression>}.
For example, to add 2 + 2, use the following syntax: ${2 + 2}.
1.14.2. Using Field Names in Expressions
When a pipeline is valid for preview, expression completion provides available field names in a list. When a list is not available, use the appropriate format for the field name.
When you use a field name in an expression, use the following syntax:
${record:value("/<field name>")}
Note: You can use single or double quotation marks to surround a field name.
For example, the following expressions both concatenate the values from the DATE field with values from the TIME field:
${record:value('/DATE')} ${record:value('/TIME')}
${record:value("/DATE")} ${record:value("/TIME")}
Field Names with Special Characters
You can use quotation marks and the backslash character to handle special characters in field names.
Expression completion provides the correct syntax for field names with special characters. But when you need to enter the field names manually, be sure to use the following guidelines:
Use quotation marks around field names with special characters
When a field name includes special characters, surround the field name with single or double quotation marks as follows:
/"<field w/specialcharacter>"
Some examples:
/"Stream$ets"
/'city&state'
/"product names"
When using multiple sets of quotation marks, alternate between types as you go
Throughout the expression language, when using quotation marks, you can use single or double quotation marks. But make sure to alternate between the types when nesting quotation marks.
For example:
${record:value('/"Stream$ets"'}
${record:value("/'city&state'"}
Use a backslash as an escape character
To use a quotation mark or backslash in a field name, use a backslash ( \ ).
Add additional backslashes as necessary to escape quotation marks.
For example, to use a field named "ID's" as a required field, you would use a single backslash:
/ID\'s
To use the same field in an expression, you might need additional backslashes as follows:
${record:value('/ID\\'s')}
1.14.3. Referencing Field Names and Field Paths
When a pipeline is valid for preview, you can generally select fields from a list. When a list is not available or when you are defining a new field name, you need to use the appropriate format for the field name.
To reference a field, you specify the path of the field. A field path describes a data element in a record using a syntax similar to files in directories. The complexity of a field path differs based on the type of data in the record:
Simple maps or JSON objects
With simple maps or JSON objects, the fields are one level removed from the root. Reference the field as follows:
/<field name>
So, to reference a CITY field in a simple JSON object, enter /CITY. A simple expression that calls the field might look like this:
${record:value('/CITY')}
Complex maps or JSON objects
To reference a field in a complex map or JSON object, include the path to the field, as follows:
/<path to field>/<field name>
For example, the following field path describes an employeeName field several levels deep in a JSON object: /region/division/group/employeeName. An expression that calls the field might look like this:
${record:value("/region/division/group/employeeName")}
Arrays or lists
To reference a field in an array or list, include the index and path to the field, as follows:
[<index value>]/<path to field>/<field name>
For example, the following field path describes the same employeeName field in the third region index in an array: [2]/east/HR/employeeName.
An expression that calls the field might look like this:
${record:value('[2]/east/HR/employeeName')}
Delimited records can be structured as lists. For more information, see Delimited Data Root Field Type.
Text
To reference text when a record is a line of text, use the following field name:
/text
Wildcard Use for Arrays and Maps
In some processors, you can use the asterisk wildcard (*) as indices in an array or key values in a map. Use a wildcard to help define the field paths for maps and arrays.
You can use the asterisk wildcard as follows:
[*]
Matches all values for the specified index in an array. For example, the following field path represents the social security number of every employee in every division:
/Division[*]/Employee[*]/SSN
/*
Matches all values for the specified keys in a map. For example, the following field path represents all employee information in the first division:
/Division[0]/Employee[*]/*
1.14.4. Field Path Expressions
You can use field path expressions in certain processors to determine the set of fields that you want the processor to use.
For example, you want to use the Field Remover processor to remove all fields that start with the same prefix. Instead of manually entering each field name, you can use a field path expression to specify the fields to remove.
Supported Stages
You can use field path expressions to specify the fields to use in the following processors:
- Field Hasher processor
- Field Masker processor
- Field Remover processor
- Field Replacer processor
- Field Type Converter processor
- Value Replacer processor (deprecated)
Field Path Expression Syntax
When creating a field path expression, you can use a combination of standard expression language syntax with field path expression syntax. You can use the following components in field path expressions:
Root field and relative paths
As with specifying any field path, begin a field path expression with a slash ( / ) to indicate the location of the fields in relation to the root field. Then, continue defining the field path as appropriate.
For example, the following field path expression uses a wildcard to specify all fields in the record:
/*
Wildcard characters
You can use the asterisk character ( * ) and question mark character ( ? ) as wildcards, as follows:
- Use the asterisk wildcard to represent one or more characters.
For example, to perform an action on all fields in a Stores map field, you can use the following field path expression:
/Stores/*
- Use the question mark wildcard to represent exactly one character.
For example, the following expression includes all fields that have a two-character prefix followed by an underscore:
/??_*
Brackets for position predicates
You can specify a field based on its position in a list field. After the name of the list field, specify the position surrounded by brackets ( [ ] ). Note that position numbering starts with 0.
For example, the following expression calls the fourth item in a colors list field:
/colors[3]
Brackets for complex expressions
You can configure field path expressions that use functions, typically field functions, to define a specific subset of fields to return. When configuring complex expressions, surround the expression with brackets ( [ ] ), as follows:
/*[${<complex expression>}]
For example, the following expression returns all fields with an "info" field attribute set to any value:
/*[${f:attribute('info') == '*'}]
Field functions
Use field functions to determine the fields to use based on field-related information, such as f:type for the datatype of the field, f:value for the value of the field, or f:attribute for an attribute or attribute value of the field.
For example, you can use the Field Type Converter processor to convert all Integer fields with the following expression:
/*[${f:type() == 'INTEGER'}]
For more information about field functions, see Field Functions.
Other functions
You can use other functions, such as record, string, or time functions, as part of complex field path expressions.
For example, the following expression defines the subset of fields where the region attribute is set to the results of the storeId field:
/*[${f:attribute('storeId') == record:value('/storeId')}]
1.14.5. Expression Completion in Properties
Expression completion provides a list of data types, runtime parameters, fields, and functions that you can use. The list includes runtime parameters when defined, and available fields when the pipeline is valid for data preview.
When an element does not display in the list, it is not a valid element at the specified location.
Tips for Expression Completion
Use the following information and tips when you invoke expression completion:
- To invoke expression completion, place the cursor where you want to create an expression and click Ctrl + Space Bar.
A list of valid expression elements displays. Scroll to view the entire list.
You can invoke expression completion anywhere where you can use an expression.
- Field names display at the top of the list when data preview is available. When defined, runtime parameters display in the list with a purple Parameters icon.
In the following example, DirectoryRoot is a runtime parameter:
- To view more information about an element, click the element name:
- To add an element to an expression, double-click the element name or hit Enter.
- You can filter the element list by typing the first few letters of the element name.
- To view the syntax of a function, after you add the function, click within the parentheses.
1.14.6. Data Type Coercion
When an expression requires, the expression language attempts implicit data type conversion - a.k.a. data type coercion. When coercion is not possible, Data Collector passes the error records to the stage for error handling.
For example, you have an Expression Evaluator stage configured to send error records to the pipeline for error handling, and the pipeline writes error records to a file. The Expression Evaluator includes an expression that treats string data as integers. When the field includes integer or valid numeric data, the expression language coerces the data type. If the field includes a date, that record is written to the error records file.
To avoid coercion errors, you can use the Field Type Converter earlier in the pipeline to convert data to the appropriate data type.
1.15. Configuring a Pipeline
Configure a pipeline to define the stream of data. After you configure the pipeline, you can start the pipeline.
A pipeline can include the following stages:
- A single origin stage
- Multiple processor stages
- Multiple destination stages
- Multiple executor stages
- Multiple pipeline fragments
- From the Pipeline Repository view, click the Add icon.
- In the New Pipeline window, enter a pipeline title and optional description, and select where you want the pipeline to run:
- Data Collector - Select to design a standalone or cluster execution mode pipeline that runs on Data Collector.
- Data Collector Edge - Select to design an edge execution mode pipeline that runs on Data Collector Edge.
- Then, specify how you want to develop the pipeline:
- Blank pipeline - Select to create a pipeline from scratch.
- Pipeline template - Select to use a template as a basis for the pipeline.
- Click Save.
The pipeline canvas displays the pipeline title, the generated pipeline ID, and an error icon. The error icon indicates that you need to configure error handling for the pipeline. The Properties panel displays the pipeline properties.
- In the Properties panel, on the General tab, configure the following properties:
Pipeline Property |
Description |
Title |
Title of the pipeline. Pipeline Designer uses the alphanumeric characters entered for the pipeline title as a prefix for the generated pipeline ID. For example, if you enter “My Pipeline *&%&^^ 123” as the pipeline title, then the pipeline ID has the following value: MyPipeline123tad9f592-5f02-4695-bb10-127b2e41561c. You can edit the pipeline title. However, because the pipeline ID is used to identify the pipeline, any changes to the pipeline title are not reflected in the pipeline ID. |
Description |
Optional description of the pipeline. |
Labels |
Optional labels to assign to the pipeline. Use labels to group similar pipelines. For example, you might want to group pipelines by database schema or by the test or production environment. You can use nested labels to create a hierarchy of pipeline groupings. Enter nested labels using the following format: <label1>/<label2>/<label3> For example, you might want to group pipelines in the test environment by the origin system. You add the labels Test/HDFS and Test/Elasticsearch to the appropriate pipelines. |
Execution Mode |
Execution mode of the pipeline:
Use to stream data from a Kafka or MapR cluster that uses Spark Streaming on YARN.
|
Data Collector Edge URL |
Used only for edge pipelines designed in Data Collector and then published directly to and managed on an SDC Edge that is not registered with Control Hub. You can leave the default value for edge pipelines designed in Pipeline Designer. |
Delivery Guarantee |
Determines how Data Collector handles data after an unexpected event causes the pipeline to stop running:
Default is At Least Once. |
Start Event |
Determines how the start event is handled. Select one of the following options:
Use in standalone pipelines only. For more information about pipeline events, see Pipeline Event Generation. |
Stop Event |
Determines how the stop event is handled. Select one of the following options:
Use in standalone pipelines only. For more information about pipeline events, see Pipeline Event Generation. |
Retry Pipeline on Error |
Retries the pipeline upon error. |
Retry Attempts |
Number of retries attempted. Use -1 to retry indefinitely. The wait time between retries starts at 15 seconds and doubles until reaching five minutes. |
Max Pipeline Memory |
Maximum amount of memory for the pipeline to use. Used only when the Data Collector monitor.memory configuration property is set to true. You can enter a numeric value or edit the default expression to use a percentage of the Data Collector Java heap size. Default is 65% of the Data Collector Java heap size: ${jvm:maxMemoryMB() * 0.65} |
On Memory Exceeded |
Action to take when the pipeline memory reaches the configured Max Pipeline Memory:
|
Rate Limit (records / sec) |
Maximum number of records that the pipeline can read in a second. Use 0 or no value to set no rate limit. Default is 0. |
Max Runners |
The maximum number of pipeline runners to use in a multithreaded pipeline. Use 0 for no limit. When set to 0, Data Collector generates up to the maximum number of threads or concurrency configured in the origin. You can use this property to help tune pipeline performance. For more information, see Tuning Threads and Runners. Default is 0. |
Create Failure Snapshot |
Automatically creates a snapshot if the pipeline fails because of data-related errors. Can be used to troubleshoot the pipeline. |
- To define runtime parameters, on the Parameters tab, click the Add icon and define the name and the default value for each parameter. You can use simple or bulk edit mode to add the parameters.
For more information, see Using Runtime Parameters.
- To configure notifications based on changes in pipeline state, on the Notifications tab, configure the following properties:
Notifications Property |
Description |
Notify on Pipeline State Changes |
Sends notifications when the pipeline encounters the listed pipeline states. |
Email IDs |
Email addresses to receive notification when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional addresses. |
Webhooks |
Webhook to send when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional webhooks. |
Webhook URL |
URL to send the HTTP request. |
Headers |
Optional HTTP request headers. |
HTTP Method |
HTTP method. Use one of the following methods:
|
Payload |
Optional payload to use. Available for PUT, POST, and DELETE methods. Use any valid content type. You can use webhook parameters in the payload to include information about the triggering event, such as the pipeline name or state. Enclose webhook parameters in double curly brackets as follows: {{PIPELINE_STATE}}. |
Content Type |
Optional content type of the payload. Configure this property when the content type is not declared in the request headers. |
Authentication Type |
Optional authentication type to include in the request. Use None, Basic, Digest, or Universal. Use Basic for Form authentication. |
User Name |
User name to include when using authentication. |
Password |
Password to include when using authentication. |
- Click the Error Records tab and configure the following error handling options:
Error Records Property |
Description |
Error Records |
Determines how to handle records that cannot be processed as expected. Use one of the following options:
Write to File is not supported for cluster mode at this time.
|
Error Record Policy |
Determines the version of the record to use as a basis for an error record. For more information, see Error Records and Version. |
- When writing error records to an SDC RPC pipeline, click the Error Records - Write to Another Pipeline tab and configure the following properties:
Write to Pipeline Property |
Description |
SDC RPC Connection |
Connection information for the destination pipeline to continue processing data. Use the following format: <host>:<port>. Use a single RPC connection for each destination pipeline. Using simple or bulk edit mode, add additional connections as needed. Use the port number when you configure the SDC RPC origin that receives the data. |
SDC RPC ID |
User-defined ID to allow the destination to pass data to an SDC RPC origin. Use this ID in all SDC RPC origins to process data from the destination. |
Retries Per Batch |
Number of times the destination tries to write a batch to the SDC RPC origin. When the destination cannot write the batch within the configured number of retries, it fails the batch. Default is 3. |
Back Off Period |
Milliseconds to wait before retrying writing a batch to the SDC RPC origin. The value that you enter increases exponentially after each retry, until it reaches the maximum wait time of 5 minutes. For example, if you set the back off period to 10, the destination attempts the first retry after waiting 10 milliseconds, attempts the second retry after waiting 100 milliseconds, and attempts the third retry after waiting 1,000 milliseconds. Set to 0 to retry immediately. Default is 0. |
Connection Timeout (ms) |
Milliseconds to establish a connection to the SDC RPC origin. The destination retries the connection based on the Retries Per Batch property. Default is 5000 milliseconds. |
Read Timeout (ms) |
Milliseconds to wait for the SDC RPC origin to read data from a batch. The destination retries the write based on the Retries Per Batch property. Default is 2000 milliseconds. |
Use Compression |
Enables the destination to use compression to pass data to the SDC RPC origin. Enabled by default. |
Use TLS |
Enables the use of TLS. |
Truststore File |
The path to the truststore file. Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES. For more information about environment variables, see Data Collector Environment Configuration in the Data Collector documentation. By default, no truststore is used. |
Truststore Type |
Type of truststore to use. Use one of the following types:
Default is Java Keystore File (JKS). |
Truststore Password |
Password to the truststore file. A password is optional, but recommended. Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation. |
Truststore Trust Algorithm |
The algorithm used to manage the truststore. Default is SunX509. |
Use Default Protocols |
Determines the transport layer security (TLS) protocol to use. The default protocol is TLSv1.2. To use a different protocol, clear this option. |
Transport Protocols |
The TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols. Note: Older protocols are not as secure as TLSv1.2. |
Use Default Cipher Suites |
Determines the cipher suite to use when performing the SSL/TLS handshake. Data Collector provides a set of cipher suites that it can use by default. For a full list, see Cipher Suites. |
Cipher Suites |
Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites. Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use. |
10. When writing error records to Microsoft Azure Event Hub, click the Error Records - Write to Event Hub tab and configure the following properties:
Event Hub Property |
Description |
Namespace Name |
The name of the namespace that contains the event hub that you want to use. |
Event Hub Name |
The event hub name. |
Shared Access Policy Name |
The policy name associated with the namespace. To retrieve the policy name, when logged into the Azure portal, navigate to your namespace and event hub, and then click Shared Access Policies for a list of policies. When appropriate, you can use the default shared access key policy, RootManageSharedAccessKey. |
Connection String Key |
One of the connection string keys associated with the specified shared access policy. To retrieve a connection string key, after accessing the list of shared access policies, click the policy name, and then copy the Connection String - Primary Key value. The value typically begins with "Endpoint". |
11. When writing error records to Elasticsearch, click the Error Records - Write to Elasticsearch tab and configure the following properties:
Elasticsearch Property |
Description |
Cluster HTTP URI |
HTTP URI used to connect to the cluster. Use the following format: <host>:<port> |
Additional HTTP Params |
Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch. |
Detect Additional Nodes in Cluster |
Detects additional nodes in the cluster based on the configured Cluster URI. Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true. Use only when the Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters. |
Use Security |
Specifies whether security is enabled on the Elasticsearch cluster. |
Time Basis |
Time basis to use for writing to time-based indexes. Use one of the following expressions:
When the Index property does not include datetime variables, you can ignore this property. Default is ${time:now()}. |
Data Time Zone |
Time zone for the destination system. Used to resolve datetimes in time-based indexes. |
Index |
Index for the generated documents. Enter an index name or an expression that evaluates to the index name. For example, if you enter customer as the index, the destination writes the document within the customer index. If you use datetime variables in the expression, make sure to configure the time basis appropriately. For details about datetime variables, see Datetime Variables. |
Mapping |
Mapping type for the generated documents. Enter the mapping type, an expression that evaluates to the mapping type, or a field that includes the mapping type. For example, if you enter user as the mapping type, the destination writes the document with a user mapping type. |
Document ID |
Expression that evaluates to the ID for the generated documents. When you do not specify an ID, Elasticsearch creates an ID for each document. By default, the destination allows Elasticsearch to create the ID. |
Parent ID |
Optional parent ID for the generated documents. Enter a parent ID or an expression that evaluates to the parent ID. Use to establish a parent-child relationship between documents in the same index. |
Routing |
Optional custom routing value for the generated documents. Enter a routing value or an expression that evaluates to the routing value. Elasticsearch routes a document to a particular shard in an index based on the routing value defined for the document. You can define a custom value for each document. If you don’t define a custom routing value, Elasticsearch uses the parent ID (if defined) or the document ID as the routing value. |
Data Charset |
Character encoding of the data to be processed. |
12. If you enabled security, configure the following security property:
Security Property |
Description |
Security Username/Password |
Elasticsearch username and password. Enter the username and password using the following syntax: <username>:<password> Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation. |
SSL Truststore Path |
Location of the truststore file. Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property. Not necessary for Elastic Cloud clusters. |
SSL Truststore Password |
Password for the truststore file. Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property. Not necessary for Elastic Cloud clusters. |
13. When writing error records to file, click the Error Records - Write to File tab and configure the following properties:
Write to File Property |
Description |
Directory |
Local directory for error record files. |
File Prefix |
Prefix used for error record files. Use to differentiate error record files from other files in the directory. Uses the prefix sdc-${sdc:id()} by default. The prefix evaluates to sdc-<Data Collector ID>. This provides default differentiation in case several Data Collectors write to the same directory. The Data Collector ID is stored in the following file: $SDC_DATA/sdc.id file. For more information about environment variables, see Data Collector Environment Configuration in the Data Collector documentation. |
File Wait Time (secs) |
Number of seconds Data Collector waits for error records. After that time, it creates a new error record file. You can enter a number of seconds or use the default expression to enter the time in minutes. |
Max File Size (MB) |
Maximum size for error files. Exceeding this size creates a new error file. Use 0 to avoid using this property. |
14. When writing error records to Google Cloud Storage, click the Error Records - Write to Google Cloud Storage tab and configure the following properties:
Google Cloud Storage Property |
Description |
Project ID |
Project ID to connect to. |
Bucket |
Bucket that contains the objects to be read. |
Credentials Provider |
Credentials provider to use to connect:
|
Credentials File Path (JSON) |
When using a Google Cloud service account credentials file, path to the file that the origin uses to connect to Google Cloud Storage. The credentials file must be a JSON file. Enter a path relative to the Data Collector resources directory, $SDC_RESOURCES, or enter an absolute path. |
Common Prefix |
Common prefix that determines where objects are written. |
Partition Prefix |
Optional partition prefix to specify the partition to use. Use a specific partition prefix or define an expression that evaluates to a partition prefix. When using datetime variables in the expression, be sure to configure the time basis for the stage. |
Data Time Zone |
Time zone for the destination system. Used to resolve datetimes in a time-based partition prefix. |
Time Basis |
Time basis to use for writing to a time-based bucket or partition prefix. Use one of the following expressions:
When the Partition Prefix property has no time component, you can ignore this property. Default is ${time:now()}. |
Object Name Prefix |
Defines a prefix for object names written by the destination. By default, object names start with "sdc" as follows: sdc-<UUID>. Not required for the whole file data format. |
15. When writing error records to Google Pub/Sub, click the Error Records - Write to Google Pub/Sub tab and configure the following properties:
Google Pub/Sub Property |
Description |
Topic ID |
Google Pub/Sub topic ID to write messages to. |
Project ID |
Google Pub/Sub project ID to connect to. |
Credentials Provider |
Credentials provider to use to connect to Google Pub/Sub:
|
Credentials File Path (JSON) |
When using a Google Cloud service account credentials file, path to the file that the destination uses to connect to Google Pub/Sub. The credentials file must be a JSON file. Enter a path relative to the Data Collector resources directory, $SDC_RESOURCES, or enter an absolute path. |
16. When writing error records to Kafka, click the Error Records - Write to Kafka tab and configure the following properties:
Write to Kafka Property |
Description |
Broker URI |
Connection string for the Kafka broker. Use the following format: <host>:<port>. To ensure a connection, enter a comma-separated list of additional broker URI. |
Runtime Topic Resolution |
Evaluates an expression at runtime to determine the topic to use for each record. |
Topic Expression |
Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name. |
Topic White List |
List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling. Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid. |
Topic |
Topic to use. Not available when using runtime topic resolution. |
Partition Strategy |
Strategy to use to write to partitions:
|
Partition Expression |
Expression to use with the default or expression partition strategy. When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value. When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value. Optionally, click Ctrl + Space Bar for help with creating the expression. |
One Message per Batch |
For each batch, writes the records to each partition as a single message. |
Kafka Configuration |
Additional Kafka properties to use. Using simple or bulk edit mode, click the Add icon and define the Kafka property name and value. Use the property names and values as expected by Kafka. Do not use the broker.list property. For information about enabling secure connections to Kafka, see Enabling Security. |
17. When writing error records to Kinesis, click the Error Records - Write to Kinesis tab and configure the following properties:
Kinesis Property |
Description |
Access Key ID |
AWS access key ID. Required when not using IAM roles with IAM instance profile credentials. |
Secret Access Key |
AWS secret access key. Required when not using IAM roles with IAM instance profile credentials. |
Region |
Amazon Web Services region that hosts the Kinesis cluster. |
Endpoint |
Endpoint to connect to when you select Other for the region. Enter the endpoint name. |
Stream Name |
Kinesis stream name. |
Partitioning Strategy |
Strategy to write data to Kinesis shards:
|
Partition Expression |
Expression to generate the partition key used to pass data to different shards. Use for the expression partition strategy. |
Kinesis Producer Configuration |
Additional Kinesis properties. When you add a configuration property, enter the exact property name and the value. The Kinesis Producer does not validate the property names or values. |
Preserve Record Order |
Select to preserve the order of records. Enabling this option can reduce pipeline performance. |
18. When writing error records to a MapR Streams cluster, click the Error Records - Write to MapR Streams tab and configure the following properties:
MapR Streams Producer Property |
Description |
Runtime Topic Resolution |
Evaluates an expression at runtime to determine the topic to use for each record. |
Topic |
Topic to use. Not available when using runtime topic resolution. |
Topic Expression |
Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name. |
Topic White List |
List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling. Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid. |
Partition Strategy |
Strategy to use to write to partitions:
|
Partition Expression |
Expression to use with the default or expression partition strategy. When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value. When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value. Optionally, click Ctrl + Space Bar for help with creating the expression. |
One Message per Batch |
For each batch, writes the records to each partition as a single message. |
MapR Streams Configuration |
Additional configuration properties to use. Using simple or bulk edit mode, click the Add icon and define the MapR Streams property name and value. Use the property names and values as expected by MapR. You can use MapR Streams properties and the set of Kafka properties supported by MapR Streams. |
19. When writing error records to an MQTT broker, click the Error Records - Write to MQTT tab and configure the following properties:
MQTT Property |
Description |
Broker URL |
MQTT Broker URL. Enter in the following format: <tcp | ssl>://<hostname>:<port> Use ssl for secure connections to the broker. For example: tcp://localhost:1883 |
Client ID |
MQTT Client ID. The ID must be unique across all clients connecting to the same broker. You can define an expression that evaluates to the client ID. For example, you can enter the following expression to use the unique pipeline ID as the client ID: ${pipeline:id()} |
Topic |
Topic to publish to. Using simple or bulk edit mode, click the Add icon to read from additional topics. |
Quality of Service |
Determines the quality of service level used to guarantee message delivery:
For more information, see the HiveMQ documentation on quality of service levels. |
Client Persistence Mechanism |
Determines the persistence mechanism that the destination uses to guarantee message delivery when the quality of service level is at least once or exactly once. Select one of the following options:
Not used when the quality of service level is at most once. For more information, see the HiveMQ documentation on client persistence. |
Client Persistence Data Directory |
Local directory on the Data Collector machine where the destination temporarily stores messages in a file when you configure file persistence. The user who starts Data Collector must have read and write access to this directory. |
Keep Alive Interval (secs) |
Maximum time in seconds to allow the connection to the MQTT broker to remain idle. After the destination publishes no messages for this amount of time, the connection is closed. The destination must reconnect to the MQTT broker. Default is 60 seconds. |
Use Credentials |
Enables entering MQTT credentials. Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation. |
Username |
MQTT user name. |
Password |
MQTT password. |
Retain the Message |
Determines whether or not the MQTT broker retains the message last published by the destination when no MQTT client is subscribed to listen to the topic. When selected, the MQTT broker retains the last message published by the destination. Any messages published earlier are lost. When cleared, all messages published by the destination are lost. For more information about MQTT retained messages, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages. |
Use TLS |
Enables the use of TLS. |
Truststore File |
The path to the truststore file. Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES. For more information about environment variables, see Data Collector Environment Configuration in the Data Collector documentation. By default, no truststore is used. |
Truststore Type |
Type of truststore to use. Use one of the following types:
Default is Java Keystore File (JKS). |
Truststore Password |
Password to the truststore file. A password is optional, but recommended. Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation. |
Truststore Trust Algorithm |
The algorithm used to manage the truststore. Default is SunX509. |
Use Default Protocols |
Determines the transport layer security (TLS) protocol to use. The default protocol is TLSv1.2. To use a different protocol, clear this option. |
Transport Protocols |
The TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols. Note: Older protocols are not as secure as TLSv1.2. |
Use Default Cipher Suites |
Determines the cipher suite to use when performing the SSL/TLS handshake. Data Collector provides a set of cipher suites that it can use by default. For a full list, see Cipher Suites. |
Cipher Suites |
Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites. Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use. |
20. When using the cluster execution mode, click the Cluster tab and configure the following properties:
For Spark Streaming on Mesos, configure the following properties:
Mesos Cluster Property |
Description |
Dispatcher URL |
Master URL of the Mesos dispatcher. For example, mesos://dispatcher:7077. |
Checkpoint Configuration Directory |
Location of the HDFS configuration files that specify whether to write checkpoint metadata to HDFS or Amazon S3. Use a directory or symlink within the Data Collector resources directory. The directory should include the following files:
|
For Spark Streaming or MapReduce on YARN, configure the following properties.
Yarn Cluster Property |
Description |
Worker Count |
Number of workers used in a Cluster Yarn Streaming pipeline. Use to limit the number of workers spawned for processing. By default, one worker is spawned for every partition in the topic. Default is 0 for one worker for each partition. |
Worker Java Options |
Additional Java properties for the pipeline. Separate properties with a space. The following properties are set by default.
Changing the default properties is not recommended. You can add any valid Java property. |
Launcher Env Configuration |
Additional configuration properties for the cluster launcher. Using simple or bulk edit mode, click the Add icon and define the property name and value. |
Worker Memory (MB) |
Maximum amount of memory allocated to each Data Collector worker in the cluster. Default is 1024 MB. |
Extra Spark Configuration |
For Cluster Yarn Streaming pipelines, you can configure additional Spark configurations to pass to the spark-submit script. Enter the Spark configuration name and the value to use. The specified configurations are passed to the spark-submit script as follows: spark-submit --conf <key>=<value> For example, to limit the off-heap memory allocated to each executor, you can use the spark.yarn.executor.memoryOverhead configuration and set it to the number of MB that you want to use. Data Collector does not validate the property names or values. For details on additional Spark configurations that you can use, see the Spark documentation for the Spark version that you are using. |
21. Configure the pipeline to aggregate statistics on the Statistics tab.
For information about Control Hub aggregated statistics, see Pipeline Statistics.
22. If you are using the pipeline start or stop events, configure the related event consumer properties on the <event type> - <event consumer> tab.
For details on the Amazon S3 executor, see Configuring an Amazon S3 Executor.
For details on the Email executor, see Configuring an Email Executor.
For details on the HDFS File Metadata executor, see Configuring an HDFS File Metadata Executor.
For details on the Hive Query executor, see Configuring a Hive Query Executor.
For details on the JDBC Query executor, see Configuring a JDBC Query Executor.
For details on the MapReduce executor, see Configuring a MapReduce Executor.
For details on the Shell executor, see Configuring a Shell Executor.
For details on the Spark executor, see Configuring a Spark Executor.
For details on writing to another pipeline, see Configuring an SDC RPC Destination.
23. Use the Stage Library panel to add an origin stage. In the Properties panel, configure the stage properties.
Or, to use a pipeline fragment that includes an origin, use the Stage Library panel to add the fragment.
For configuration details about origin stages, see Origins.
For more information about pipeline fragments, see Pipeline Fragments.
24. Use the Stage Library panel to add the next stage that you want to use, connect the origin to the new stage, and configure the new stage.
For configuration details about processors, see Processors.
For configuration details about destinations, see Destinations.
For configuration details about executors, see Executors.
For more information about pipeline fragments, see Pipeline Fragments.
25. Add additional stages as necessary.
26. At any point, you can use the Preview icon to preview data to help .configure the pipeline. For more information, see Data Preview Overview.
27. Optionally, you can create metric or data alerts to track details about a pipeline run and create threshold alerts. For more information, see Rules and Alerts.
When the pipeline is validated and complete, you can use the Publish Pipeline icon to publish the pipeline, then use the Create Job icon to create a job.