1. 管道概念和设计
1.1. 设计数据流
你能在 pipeline 中分支或者合并一个数据流.
1.1.1. 数据流分叉
When you connect a stage to multiple stages, all data passes to all connected stages. You can configure required fields for a stage to discard records before they enter the stage, but by default all records are passed.
For example, 下面的管道,从目录中的所有数据,穿戴2个分支去处理, ,但是你可以配置必要的字段,便于分割,或者替代不需要的记录.
对于更复杂的条件,路由数据通过 stream selector.
某些状态产生事件,到事件流。事件流
Some stages generate events that pass to event streams. Event streams originate from an event-generating stage, such as an origin or destination, and pass from the stage through an event stream output, as follows:
For more information about the event framework and event streams, see Dataflow Triggers Overview.
1.1.2. 合并流
在一个管道中,可以合并2步或更多步 流到 下一个状态. 当合并数据时, Data Collector 传递 所有流到相同的 阶段,但是并不 合并记录.
For example, 下面例子中, the Stream Selector 阶段 sends "空值" 到 Field Replacer 阶段:
The data from the Stream Selector default stream and all data from Field Replacer pass to Expression Evaluator for further processing, but in no particular order and with no record merging.
Important: Pipeline validation does not prevent duplicate data. To avoid writing duplicate data to destinations, configure the pipeline logic to remove duplicate data or to prevent the generation of duplicate data.
Note that you cannot merge event streams with data streams. Event records must stream from the event-generating stage to destinations or executors without merging with data streams. For more information about the event framework and event streams, see Dataflow Triggers Overview.
1.2. 丢弃不想要的 记录
你可以在每一步 drop 不想要的记录, 通过定义 必要的字段,或者 在进入状态前预设条件 preconditions .
1.2.1. Required Fields
必要字段,是一个字段,一定存在于一个记录中,有他才能进入 本 阶段 ,当记录不存在这个 “”必须 字段“, 他将会按照 管道的error 配置方式处理, 你可以为任何 处理,执行,目的地 定义 必要字段.
Configure required fields as part of the overall pipeline logic or to minimize processing errors. For example, if a Field Hasher encodes social security data in the /SSN field. To ensure all records include social security numbers, you can make /SSN a required field for the stage.
1.2.2. Preconditions前提
前置条件 是 进入 本阶段必须满足的 ,Stages process all preconditions before passing a record to the stage or to error handling. When a record does not meet all configured preconditions, it is processed based on the error handling configured for the stage.
You can define preconditions for any processor, executor, and most destination stages. You can use most functions, pipeline constants, and runtime properties in preconditions.
Configure preconditions as part of the overall pipeline logic or to minimize processing errors. For example, you might use the following expression to exclude records that originate from outside the United States:
${record:value('/COUNTRY') == 'US'}
Error records include information about failed preconditions in the errorMessage record header attribute.
1.3. Error Record Handling
You can configure error record handling at a stage level and at a pipeline level. You can also specify the version of the record to use as the basis for the error record.
When an error occurs as a stage processes a record, Data Collector handles the record based on the stage configuration. One of the stage options is to pass the record to the pipeline for error handling. For this option, Data Collector processes the record based on the pipeline error record handling configuration.
When you configure a pipeline, be aware that stage error handling takes precedence over pipeline error handling. That is, a pipeline might be configured to write error records to file, but if a stage is configured to discard error records those records are discarded. You might use this functionality to reduce the types of error records that are saved for review and reprocessing.
Note that records missing required fields do not enter the stage. They are passed directly to the pipeline for error handling.
Related concepts
1.3.1. Pipeline Error Record Handling
Pipeline error record handling determines how Data Collector processes error records that stages send to the pipeline for error handling. It also handles records deliberately dropped from the pipeline such as records without required fields.
The pipeline handles error records based on the Error Records property on the Error Records tab. When Data Collector encounters an unexpected error, it stops the pipeline and logs the error.
The pipeline provides the following error record handling options:
Discard
The pipeline discards the record. Data Collector includes the records in error record counts and metrics.
Write to Another Pipeline
The pipeline writes error records to an SDC RPC pipeline. Data Collector includes the records in error record counts and metrics.
When you write to another pipeline, Data Collector effectively creates an SDC RPC origin pipeline to pass the error records to another pipeline.
You need to create an SDC RPC destination pipeline to process the error records. The pipeline must include an SDC RPC origin configured to read error records from this pipeline.
For more information about SDC RPC pipelines, see SDC RPC Pipeline Overview.
Write to Azure Event Hub
The pipeline writes error records and related details to Microsoft Azure Event Hub. Data Collector includes the records in error record counts and metrics.
You define the configuration properties for the Azure Event Hub to use.
Write to Elasticsearch
The pipeline writes error records and related details to Elasticsearch. Data Collector includes the records in error record counts and metrics.
You define the configuration properties for the Elasticsearch cluster to use.
Write to File
The pipeline writes error records and related details to a local directory. Data Collector includes the records in error record counts and metrics.
You define the directory to use and the maximum file size. Error files are named based on the File Prefix pipeline property.
Write to file is not supported for cluster pipelines at this time.
Write to Google Cloud Storage
The pipeline writes error records and related details to Google Cloud Storage. Data Collector includes the records in error record counts and metrics.
You define the Google Cloud Storage configuration properties.
Write to Google Sub/Pub
The pipeline writes error records and related details to Google Sub/Pub. Data Collector includes the records in error record counts and metrics.
You define the Google Sub/Pub configuration properties.
Write to Kafka
The pipeline writes error records and related details to Kafka. Data Collector includes the records in error record counts and metrics.
You define the configuration properties for the Kafka cluster to use.
Write to Kinesis
The pipeline writes error records and related details to Amazon Kinesis Streams. Data Collector includes the records in error record counts and metrics.
You define the configuration properties for the Kinesis stream to use.
Write to MapR Streams
The pipeline writes error records and related details to MapR Streams. Data Collector includes the records in error record counts and metrics.
You define the configuration properties for the MapR Streams cluster to use.
Write to MQTT
The pipeline writes error records and related details to an MQTT broker. Data Collector includes the records in error record counts and metrics.
You define the configuration properties for the MQTT broker to use.
1.3.2. Stage Error Record Handling
Most stages include error record handling options. When an error occurs when processing a record, Data Collector processes records based on the On Record Error property on the General tab of the stage.
Stages include the following error handling options:
Discard
The stage silently discards the record. Data Collector does not log information about the error or note the specific record that encountered an error. The discarded record is not included in error record counts or metrics.
Send to Error
The stage sends the record to the pipeline for error handling. The pipeline processes the record based on the pipeline error handling configuration.
Stop Pipeline
Data Collector stops the pipeline and logs information about the error. The error that stopped the pipeline displays as an error in the pipeline history.
Stop pipeline is not supported for cluster mode pipelines at this time.
1.3.3. Example
A Kafka Consumer origin stage reads JSON data with a maximum object length of 4096 characters and the stage encounters an object with 5000 characters. Based on the stage configuration, Data Collectoreither discards the record, stops the pipeline, or passes the record to the pipeline for error record handling.
When the stage is configured to send the record to the pipeline, one of the following occurs based on how you configure the pipeline error handling:
- When the pipeline discards error records, Data Collector discards the record without noting the action or the cause.
- When the pipeline writes error records to a destination, Data Collector writes the error record and additional error information to the destination. It also includes the error records in counts and metrics.
1.3.4. Error Records and Version
When Data Collector creates an error record, it preserves the data and attributes from the record that triggered the error, and then adds error related information as record header attributes. For a list of the error header attributes and other internal header attributes associated with a record, see Internal Attributes.
When you configure a pipeline, you can specify the version of the record that you want to use:
- The original record - The record as originally generated by the origin. Use this record when you want the original record without any additional pipeline processing.
- The current record - The record in the stage that generated the error. Depending on the type of error that occurred, this record can be unprocessed or partially processed by the error-generating stage.
Use this record when you want to preserve any processing that the pipeline completed before the record caused an error.
1.4. Record Header Attributes
某些阶段 创建记录头部属性,为了特殊目的. For example, CDC-enabled origins include the CRUD operation type in the sdc.operation.type record header attribute. This enables CRUD-enabled destinations to determine the operation type to use when processing records. Similarly, the Hive Metadata processor generates record header attributes that some destinations can use as part of the Drift Synchronization Solution for Hive.
Other stages include processing-related information in record header attributes for general use. For example, event-generating stages include the event type in record header attributes in case you want to process the event based on that information. And several origins include information such as the originating file name, location, or partition for each record.
You can use certain processors to create or update record header attributes. For example, you can use an Expression Evaluator to create attributes for record-based writes.
The inclusion of attributes in record headers does not require using them in the pipeline. You can, for example, use the CDC-enabled Salesforce origin in a non-CDC pipeline and ignore the CDC record header attributes that are automatically generated.
When writing data to destination systems, record header attributes are preserved with the record only when using the Google Pub/Sub Publisher destination or when using another destination with the SDC Record data format. To preserve the information when using other data formats, use the Expression Evaluator to copy information from record header attributes to record fields.
1.4.1. 使用头部属性
You can use the Expression Evaluator or any scripting processor to create or update record header attributes. For example, the MongoDB destination requires a CRUD operation to be specified in a record header attribute. If the origin providing the data does not generate that information automatically, you can use the Expression Evaluator or scripting processor to set the attribute value.
Record 记录头部属性 是一个字符串. You can use record:attribute functions in any expression to include attribute values in calculations.
Important: Record header attributes do not have field paths. When using an attribute in an expression, use just the attribute name, surrounded by quotation marks since attributes are strings, as follows:
${record:attribute('<attribute name>')}
For example, the following Expression Evaluator adds the file and offset record header attributes created by the Directory origin to the record:
内部属性
Data Collector generates and updates some read-only internal record header attributes as records move from stage to stage. These attributes can be viewed for debugging issues, but can only be updated by Data Collector.
The record:attribute function does not allow access to internal record header attributes. The following table describes the internal record header attributes and the functions that you can use to access the data in the pipeline:
Internal Record Header Attribute |
Description |
Related Function |
stageCreator |
The ID of the stage that created the record. |
record:creator() |
sourceId |
Source of the record. Can include different information based on the origin type. |
record:id() |
stagesPath |
List of stages that processed the record in order, by stage name. |
record:path() |
trackingId |
The route the record has taken through the pipeline, starting with the sourceId, then listing the stages that processed the record. |
n/a |
previousTrackingId |
The tracking ID of the record before it entered the current stage. |
n/a |
errorStage |
The stage that generated the error. In error records only. |
record:errorStage() |
errorStageLabel |
The user-defined name for a stage. In error records only. |
record:errorStageLabel() |
errorCode |
The error code. In error records only. |
record:errorCode() |
errorMessage |
The error message. In error records only. |
record:errorMessage() |
errorTimestamp |
The time that the error occurred. In error records only. |
record:errorTime() |
errorStackTrace |
The stack trace associated with the error. In error records only. |
n/a |
1.4.2. Header Attribute-Generating Stages
The following table lists the stages that generate record header attributes to enable special processing:
Stage |
Description |
CDC-enabled origins |
Include the CRUD operation type in the sdc.operation.type header attribute and can include additional CRUD and CDC information in record header attributes. For more information, see CDC-Enabled Origins.. |
Origins that process Avro data and the Data Parser processor |
Include the Avro schema in an avroSchema record header attribute. |
Origins that process XML data |
Can include namespaces in an xmlns record header attribute when you enable field XPaths. |
Generate record header attributes for event records. For details about event record header attributes, see "Event Records" in the stage documentation. |
|
Amazon S3 origin |
Can be configured to include system-defined and user-defined object metadata in record header attributes. |
Amazon SQS Consumer origin |
Can be configured to include SQS message attributes in record header attributes. |
Directory origin |
Includes information about the originating file for the record in record header attributes. |
File Tail origin |
Includes information about the originating file for the record in record header attributes. Can be configured to use tag attributes for sets of files. |
Google Pub/Sub Subscriber origin |
When available, includes user-defined message attributes in record header attributes. |
Hadoop FS origin |
Includes information about the file and offset in record header attributes. |
Hadoop FS Standalone origin |
Includes information about the originating file for the record in record header attributes. |
HTTP Client origin |
Includes response header fields in record header attributes. |
HTTP Server origin |
Includes information about the requested URL and request header fields in record header attributes. |
JDBC Multitable Consumer origin |
Includes table and data type information in record header attributes. |
JDBC Query Consumer origin |
Can be configured to include table and data type information in record header attributes. |
Kafka Consumer origin |
Includes information about the origins of the record in record header attributes. |
Kafka Multitopic Consumer origin |
Includes information about the origins of the record in record header attributes. |
MapR FS origin |
Includes information about the file and offset in record header attributes. |
MapR FS Standalone origin |
Includes information about the originating file for the record in record header attributes. |
MapR Multitopic Streams Consumer origin |
Includes information about the origins of the record in record header attributes. |
MapR Streams Consumer origin |
Includes information about the origins of the record in record header attributes. |
MQTT Subscriber origin |
Includes information about the origins of the record in record header attributes. |
RabbitMQ Consumer origin |
Includes RabbitMQ attributes in record header attributes. |
Salesforce origin |
Includes Salesforce information about the origins of the record in record header attributes. |
SFTP/FTP Client origin |
Includes information about the originating file for the record in record header attributes. |
Expression Evaluator processor |
Can be configured to create or update record header attributes. |
Groovy Evaluator processor |
Can be configured to create or update record header attributes. |
Hive Metadata processor |
Generates record header attributes for the data record. These attributes can be used for record-based writes as part of the Drift Synchronization Solution for Hive. Can be configured to add custom header attributes to the metadata record. |
HTTP Client processor |
Includes response header fields in record header attributes. |
JavaScript Evaluator processor |
Can be configured to create or update record header attributes. |
Jython Evaluator processor |
Can be configured to create or update record header attributes. |
Schema Generator processor |
Generates schemas and write them to a user-defined record header attribute. |
1.4.3. Record Header Attributes for Record-Based Writes
Destinations can use information in record header attributes to write data. Destinations that write Avro data can use Avro schemas in the record header. The Hadoop FS and MapR FS destinations can use record header attributes to determine the directory to write to and when to roll a file as part of the Drift Synchronization Solution for Hive. For more information, see Drift Synchronization Solution for Hive.
To use a record header attribute, configure the destination to use the header attribute and ensure that the records include the header attribute.
The Hive Metadata processor automatically generates record header attributes for Hadoop FS and MapR FS to use as part of the Drift Synchronization Solution for Hive. For all other destinations, you can use the Expression Evaluator or a scripting processor to add record header attributes.
You can use the following record header attributes in destinations:
targetDirectory attribute in the Azure Data Lake Store, Hadoop FS, Local FS, and MapR FS destinations
The targetDirectory record header attribute defines the directory where the record is written. If the directory does not exist, the destination creates the directory. The targetDirectory header attribute replaces the Directory Template property in the destination.
When you use targetDirectory to provide the directory, the time basis configured for the destination is used only for determining whether a record is late. Time basis is not used to determine the output directories to create or to write records to directories.
To use the targetDirectory header attribute, on the Output tab, select Directory in Header.
avroSchema attribute in destinations that write Avro data
The avroSchema header attribute defines the Avro schema for the record. When you use this header attribute, you cannot define an Avro schema to use in the destination.
To use the avroSchema header attribute, on the Data Format tab, select the Avro data format, and then for the Avro Schema Location property, select In Record Header.
roll attribute in the Azure Data Lake Store, Hadoop FS, Local FS, and MapR FS destinations
The roll attribute, when present in the record header, triggers a roll of the file.
You can define the name of the roll header attribute. When you use the Hive Metadata processor to generate the roll header attribute, use the default "roll" attribute name. When you use an Expression Evaluator, use the name of the roll attribute that you defined in the processor.
To use a roll header attribute, on the Output tab, select Use Roll Attribute and define the name of the attribute.
Generating Attributes for Record-Based Writes
You can use the Hive Metadata processor, the Expression Evaluator, or a scripting processor to generate record header attributes for record-based writes. The Hive Metadata processor automatically generates record header attributes for Hadoop FS and MapR FS to use as part of the Drift Synchronization Solution for Hive. For all other destinations, you can use the Expression Evaluator or a scripting processor to add record header attributes.
To use the Expression Evaluator or scripting processor, you must generate record header attributes as expected by the destination. Use the following guidelines to generate record header attributes:
Generating the target directory
When using the Expression Evaluator or scripting processor to generate the target directory, note the following details:
- The destination expects the directory in a header attribute named "targetDirectory".
- The destination uses the directory exactly as written in the targetDirectory header attribute. Unlike directory templates, the directory specified in the targetDirectory attribute should not include any components that require evaluation, such as runtime parameters, variables, or runtime properties.
- When you define the expression that evaluates to a directory, you can use any valid component, including expressions that evaluate data in the record.
For example, you want to write records to different directories based on the Data Collector that runs the pipeline, and the region and store ID where the transaction took place. You can set up a runtime resource named DIR that defines the base for the directory and define DIR for each Data Collector that runs the pipeline. Then, you can use the following expression in the Expression Evaluator to define the targetDirectory attribute:
${runtime:conf('DIR')/transactions/${record.value('/region')}/${record.value('/storeID')}
Generating the Avro schema
When using the Expression Evaluator or a scripting processor to generate the Avro schema, note the following details:
- The destination expects the Avro schema in a header attribute named "avroSchema".
- Use the standard Avro schema format, for example:
- The database name and table name must be included in the Avro schema.
· {"type":"record","name":"table_name","namespace":"database_name",
· "fields":[{"name":"int_val","type":["null","int"],"default":null},
{"name":"str_val","type":["null","string"],"default":null}]}
Tip: You might use an Avro schema generator to help generate the Avro schema.
Generating the roll attribute
When using the Expression Evaluator or scripting processor to generate the roll attribute, note the following details:
- Use any name for the attribute and specify the attribute name in the destination.
- Configure an expression that defines when to roll files.
To define these record header attributes in the Expression Evaluator, perform the following steps:
- On the Expressions tab of the Expression Evaluator, specify the Header Attribute name.
To generate a target directory, use targetDirectory.
To generate an Avro schema, use avroSchema.
You can use any name for a roll indicator header attribute.
- For the Header Attribute Expression, define the expression that evaluates to the information you want the destination to use.
For information about generating record header attributes with scripting processors, see the scripting processor documentation.
1.4.4. Viewing Attributes in Data Preview
You can use data preview to view the record header attributes associated with a record at any given point in the pipeline. To view record header attributes, enable the Show Record/Field Header data preview property.
For example, the following image shows a record generated by the Directory origin in data preview.
The "Record Header" list displays the set of read-only internal attributes in the record at this point of the pipeline. The header attributes under "values" are the attributes created by the Directory origin.
1.5. 字段属性
Field attributes are attributes that provide additional information about each field that you can use in pipeline logic, as needed.
Some stages generate field attributes. For example, the Salesforce origin includes the original Salesforce data type in the salesforce.salesforceType attribute for every field.
You can create, modify, and evaluate field attributes in the pipeline. The Expression Evaluator enables creating and modifying field-level attributes. You can use field attribute functions to evaluate field attributes.
For example, you can use an Expression Evaluator to create a field attribute based on record data, then pass the record to a Stream Selector that routes data based on the value of the attribute.
Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination.
To include field attributes in the record data or to use field attributes in calculations, use the record:fieldAttribute and record:fieldAttributeOrDefault functions. For more information about field attribute functions, see Record Functions.
When using data preview, you can enable viewing field attributes and record header attributes to aid with pipeline development.
1.5.1. Working with Field Attributes
Like record header attributes, you can use the Expression Evaluator or any scripting processor to create or update field attributes. For example, when processing Avro data, origins place the precision and scale information for Decimal data into field attributes. If you want to increase those values before writing them to the destination system, you can use the Expression Evaluator or any scripting processor to set the attribute value.
Field attributes are string values. You can also use record:fieldAttribute functions in any expression to include attribute values in calculations.
Important: Field attributes do not have field paths. When using an attribute in an expression, use just the attribute name surrounded by quotation marks after specifying the path to the associated field, as follows:
${record:fieldAttribute(<path to the field>,'<attribute name>')}
For example, say you are processing Avro data as part of the Drift Synchronization Solution for Hive. All Decimal fields include automatically generated precision and scale field attributes. Before passing the data to the Hive Metadata processor, you want to create a new Price Decimal field based on the existing cost data.
The following Expression Evaluator creates the Price field and defines the precision and scale field attributes based on the existing Cost precision and scale.
1.5.2. Field Attribute-Generating Stages
The following table lists the stages that generate field attributes to enable special processing:
Stage |
Description |
Origins that process Avro data |
Include "precision" and "scale" field attributes for every Decimal field. |
Origins that process XML data |
Can include field XPath information, XML attribute, and namespace declarations in field attributes. |
Salesforce origin |
Includes data type information in field attributes. |
Salesforce Lookup processor |
Includes data type information in field attributes. |
XML Parser |
Can include field XPath information, XML attribute, and namespace declarations in field attributes. |
1.5.3. Viewing Field Attributes in Data Preview
As with record header attributes, you can use data preview to view field attributes at any point in the pipeline. To view field attributes, enable the Show Record/Field Header data preview property.
1.6. Processing Changed Data
Certain stages enable you to easily process change capture data (CDC) or transactional data in a pipeline.
CDC-enabled origins can read change capture data. Some exclusively read change capture data, others can be configured to read it. When reading changed data, they determine the CRUD operation associated with the data and include CRUD operations - such as insert, update, upsert, or delete - in the sdc.operation.type record header attribute.
CRUD-enabled processors and destinations can use the CRUD operation type in the sdc.operation.type header attribute when writing records, enabling the external system to perform the appropriate operation.
Using a CDC-enabled origin and CRUD-enabled stages in a pipeline allows you to easily write changed data from one system into another. You can also use a CDC-enabled origin to write to non-CRUD destinations, and non-CDC origins to write to CRUD-enabled stages. For information on how that works, see Use Cases.
1.6.1. CRUD Operation Header Attribute
CDC-enabled origins read include the sdc.operation.type record header attribute in all records when reading changed data.
CRUD-enabled processors and destinations can use the CRUD operation type in the sdc.operation.type header attribute when writing records, enabling the external system to perform the appropriate operation.
The sdc.operation.type record header attribute uses the following integers to represent CRUD operations:
- 1 for INSERT records
- 2 for DELETE records
- 3 for UPDATE records
- 4 for UPSERT records
- 5 for unsupported operations or codes
- 6 for UNDELETE records
- 7 for REPLACE records
- 8 for MERGE records
Note: Some origins use only a subset of the operations, based on the operations supported by the origin system. Similarly, destinations recognize only the subset of the operations that the destination systems support. See the origin and destination documentation for details about supported operations.
Earlier Implementations
Some origins were enabled for CDC using different record header attributes in earlier releases, but they all now include the sdc.operation.type record header attribute. All earlier CRUD header attributes are retained for backward compatibility.
Similarly, CRUD-enabled destinations that were enabled to look for the CRUD operation type in other header attributes can now look for the sdc.operation.type record header attribute first and check the alternate attribute afterwards. The alternate header attribute functionality is retained for backward compatibility.
1.6.2. CDC-Enabled Origins
CDC-enabled origins provide the CRUD operation type in the sdc.operation.type record header attribute. Some origins provide alternate and additional header attributes.
The following origins provide CRUD record header attributes:
CDC-Enabled Origin |
CRUD Record Header Attributes |
JDBC Query Consumer for Microsoft SQL Server |
Includes the CRUD operation type in the sdc.operation.type record header attribute. For more information, see CRUD Record Header Attribute. |
MapR DB CDC |
Includes the CRUD operation type in the sdc.operation.type record header attribute. Includes additional CDC information in record header attributes. For more information, see CRUD Operation and CDC Header Attributes. |
MongoDB Oplog |
Includes the CRUD operation type in the sdc.operation.type record header attribute. Can include additional CDC information in record header attributes, such as the op and ns attributes. For more information, see Generated Records. |
MySQL Binary Log |
Includes the CRUD operation type in the sdc.operation.type record header attribute. Includes additional CDC information in record fields. For more information, see Generated Records. |
Oracle CDC Client |
Includes the CRUD operation type in both of the following headers:
Includes additional CDC information in record header attributes with the oracle.cdc prefix, such as oracle.cdc.table. For more information, see CRUD Operation Header Attributes. |
Salesforce |
Includes the CRUD operation type in the sdc.operation.type record header attribute. For more information, see CRUD Operation Header Attribute. |
SQL Server CDC Client |
Includes the CRUD operation type in the sdc.operation.type record header attribute. Includes CDC information in header attributes named jdbc.<CDC column name>. For more information, see Record Header Attributes. |
SQL Server Change Tracking |
Includes the CRUD operation type in the sdc.operation.type record header attribute. Includes additional information from change tables in the jdbc.SYS_CHANGE header attributes. For more information, see Record Header Attributes. |
1.6.3. CRUD-Enabled Stages
The following stages recognize CRUD operations stored in record header attributes and can perform writes based on those values. Some stages also provide CRUD-related properties.
CRUD-Enabled Stage |
Supported Operations |
Stage Processing |
JDBC Tee processor |
|
Determines the operation to use based on either of the following:
Includes a Change Log property that enables processing records based on the CDC-enabled origin that you use. For more information, see Define the CRUD Operation. |
Elasticsearch destination |
|
Determines the operation to use based on either of the following:
For more information, see Define the CRUD Operation. |
JDBC Producer destination |
|
Determines the operation to use based on:
Includes a Change Log property that enables processing records based on the CDC-enabled origin that you use. For more information, see Define the CRUD Operation. |
Kudu destination |
|
Determines the operation to use based on:
For more information, see Define the CRUD Operation. |
MapR DB JSON destination |
|
Determines the operation to use based on:
For more information, see Writing to MapR DB JSON. |
MongoDB destination |
|
Determines the operation to use based on:
For more information, see Define the CRUD Operation. |
Redis destination |
|
Determines the operation to use based on:
For more information, see Define the CRUD Operation. |
Salesforce destination |
|
Determines the operation to use based on:
For more information, see Define the CRUD Operation. |
1.6.4. Processing the Record
Change logs can provide record data in different formats. The JDBC Tee processor and JDBC Producer can decode most change log formats to generate record data based on the origin change log. When using other CRUD-enabled destinations, you might need to add additional processing to the pipeline to alter the format of the record.
For example, Microsoft SQL CDC records created by the JDBC Query Consumer contains CDC fields in the record, in addition to record data. You might use a Field Remover to drop any unnecessary fields from the record.
In contrast, the MySQL Server binary logs read by the My SQL Binary Log origin provides new or updated data in a New Data map field and changed or deleted data in a Changed Data map field. You might want to use the Field Flattener processor to flatten the map field with the data that you need, and a Field Remover to remove any unnecessary fields.
For details on the format of generated records, see the documentation for the CDC-enabled origin.
1.6.5. Use Cases
You can use CDC-enabled origins and CRUD-enabled destinations in pipelines together or individually. Here are some typical use cases:
CDC-enabled origin with CRUD-enabled destinations
You can use a CDC-enabled origin and a CRUD-enabled destination to easily process changed records and write them to a destination system.
For example, say you want to write CDC data from Microsoft SQL Server to Kudu. To do this, you use the CDC-enabled JDBC Query Consumer origin to read data from a Microsoft SQL Server change capture table. The origin places the CRUD operation type in the sdc.operation.type header attribute, in this case: 1 for INSERT, 2 for DELETE, 3 for UPDATE.
You configure the pipeline to write to the CRUD-enabled Kudu destination. In the Kudu destination, you can specify a default operation for any record with no value set in the sdc.operation.type attribute, and you can configure error handling for invalid values. You set the default to INSERT and you configure the destination to use this default for invalid values. In the sdc.operation.type attribute, the Kudu destination supports 1 for INSERT, 2 for DELETE, 3 for UPDATE, and 4 for UPSERT.
When you run the pipeline, the JDBC Query Consumer origin determines the CRUD operation type for each record and writes it to the sdc.operation.type record header attribute. And the Kudu destination uses the operation in the sdc.operation.type attribute to inform the Kudu destination system how to process each record. Any record with an undeclared value in the sdc.operation.type attribute, such as a record created by the pipeline, is treated like an INSERT record. And any record with an invalid value uses the same default behavior.
CDC-enabled origin to non-CRUD destinations
If you need to write changed data to a destination system without a CRUD-enabled destination, you can use an Expression Evaluator or scripting processor to move the CRUD operation information from the sdc.operation.type header attribute to a field, so the information is retained in the record.
For example, say you want to read from Oracle LogMiner redo logs and write the records to Hive tables with all of the CDC information in record fields. To do this, you'd use the Oracle CDC Client origin to read the redo logs, then add an Expression Evaluator to pull the CRUD information from the sdc.operation.type header attribute into the record. Oracle CDC Client writes additional CDC information such as the table name and scn into oracle.cdc header attributes, so you can use expressions to pull that information into the record as well. Then you can use the Hadoop FS destination to write the enhanced records to Hive.
Non-CDC origin to CRUD destinations
When reading data from a non-CDC origin, you can use the Expression Evaluator or scripting processors to define the sdc.operation.type header attribute.
For example, say you want to read from a transactional database table and keep a dimension table in sync with the changes. You'd use the JDBC Query Consumer to read the source table and a JDBC Lookup processor to check the dimension table for the primary key value of each record. Then, based on the output of the lookup processor, you know if there was a matching record in the table or not. Using an Expression Evaluator, you set the sdc.operation.type record header attribute - 3 to update the records that had a matching record, and 1 to insert new records.
When you pass the records to the JDBC Producer destination, the destination uses the operation in the sdc.operation.type header attribute to determine how to write the records to the dimension table.
1.7. Control Character Removal
You can use several stages to remove control characters - such as escape or end-of-transmission characters - from data. Remove control characters to avoid creating invalid records.
When Data Collector removes control characters, it removes ASCII character codes 0-31 and 127 with the following exceptions:
- 9 - Tab
- 10 - Line feed
- 13 - Carriage return
You can use the Ignore Ctrl Characters property in the following stages to remove control characters:
- Amazon S3 origin
- Amazon SQS Consumer origin
- Azure IoT/Event Hub Consumer origin
- CoAP Server origin
- Directory origin
- File Tail origin
- Google Cloud Storage origin
- Google Pub/Sub Subscriber origin
- Hadoop FS origin
- Hadoop FS Standalone origin
- HTTP Client origin
- HTTP Server origin
- JMS Consumer origin
- Kafka Consumer origin
- Kafka Multitopic Consumer origin
- Kinesis Consumer origin
- MapR FS origin
- MapR FS Standalone origin
- MapR Multitopic Streams Consumer origin
- MapR Streams Consumer origin
- MQTT Subscriber origin
- RabbitMQ Consumer origin
- Redis Consumer origin
- SFTP/FTP Client origin
- TCP Server origin
- WebSocket Client origin
- WebSocket Server origin
- Data Parser processor
- HTTP Client processor
- JSON Parser processor
- Log Parser processor
- XML Parser processor
1.8. Development Stages
You can use several development stages to help develop and test pipelines.
Note: Do not use development stages in production pipelines.
You can use the following stages when developing or testing pipelines:
Dev Data Generator origin
Generates records with the configured number of fields with the selected data types. You can use a Map or List-Map root field type.
The origin can generate events to test event handling functionality. To generate event records, select the Produce Events property.
When generating events, the origin uses the configured fields as the body of the event record and adds event record header attributes. You can also specify the event type with the Event Name property. For example, to create a no-more-data event, enter "no-more-data" for the event name. For more information about the event framework and event record header attributes, see Dataflow Triggers Overview.
The origin can also generate multiple threads for testing for a multithreaded pipeline. To generate multiple threads, enter a number larger than 1 for the Number of Threads property. For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
The On Record Error property has no effect in this stage.
Dev Random Source origin
Generates records with the configured number of Long fields. You can define a delay between batches and a maximum number of records to generate.
The On Record Error property has no effect in this stage.
Dev Raw Data Source origin
Generates records based on user-supplied data. You can enter raw data, select the data format of the data, and then configure any format-related configuration options.
For example, you can enter a set of log data, select the log data format, and then define the log format and other log properties for the data.
In data preview, this stage displays the raw source data as well as the data generated by the stage.
Dev SDC RPC with Buffering origin
Receives records from an SDC RPC destination, temporarily buffering the records to disk before passing the records to the next stage in the pipeline. Use as the origin in an SDC RPC destination pipeline.
Note: After a deliberate or unexpected stop of the pipeline, buffered records are lost.
Dev Snapshot Replaying origin
Reads records from a downloaded snapshot file. The origin can start reading from the first set of records in the snapshot file. Or, you can configure the origin to start reading from a specific stage in the snapshot file.
Sensor Reader origin
Generates records with atmospheric data like that generated by BMxx80 atmospheric sensors. Records include the following fields: temperature_C, pressure_KPa, and humidity. For example:
{“temperature_C”: “12.34", “pressure_KPa”: “567.89", “humidity”: “534.44"}
For use in edge pipelines only.
Dev Identity processor
Passes all records through to the next stage. Use as a placeholder in the pipeline. You can define required fields and preconditions, and configure stage error handling.
Dev Random Error processor
Generates error records so you can test pipeline error handling. You can configure the stage to discard records, define required fields and preconditions, and configure stage error handling.
Dev Record Creator processor
Generates two records for each record that enters the stage. You can define required fields and preconditions, and configure stage error handling.
To Event destination
Generates events for testing event handling functionality. To generate events, select the Produce Events property.
The destination generates an event record for each incoming record. It uses the incoming record as the body of an event record and adds event record header attributes. Note that any record header attributes in the incoming record might be lost or replaced.
For more information about the event framework and event record header attributes, see Dataflow Triggers Overview.
1.9. Understanding Pipeline States
A pipeline state is the current condition of the pipeline, such as "running" or "stopped". The pipeline state can display in the All Pipelines list. The state of a pipeline can also appear in the Data Collector log.
The following pipeline states often display in the All Pipelines list:
- EDITED - The pipeline has been created or modified, and has not run since the last modification.
- FINISHED - The pipeline has completed all expected processing and has stopped running.
- RUN_ERROR - The pipeline encountered an error while running and stopped.
- RUNNING - The pipeline is running.
- STOPPED - The pipeline was manually stopped.
- START_ERROR - The pipeline encountered an error while starting and failed to start.
- STOP_ERROR - The pipeline encountered an error while stopping.
The following pipeline states are transient and rarely display in the All Pipelines list. These states can display in the Data Collector log when the pipeline logging level is set to Debug:
- CONNECT_ERROR - When running a cluster-mode pipeline, Data Collector cannot connect to the underlying cluster manager, such as Mesos or YARN.
- CONNECTING - The pipeline is preparing to restart after a Data Collector restart.
- DISCONNECTED - The pipeline is disconnected from external systems, typically because Data Collector is restarting or shutting down.
- FINISHING - The pipeline is in the process of finishing all expected processing.
- RETRY - The pipeline is trying to run after encountering an error while running. This occurs only when the pipeline is configured for a retry upon error.
- RUNNING_ERROR - The pipeline encounters errors while running.
- STARTING - The pipeline is initializing, but hasn't started yet.
- STARTING_ERROR - The pipeline encounters errors while starting.
- STOPPING - The pipeline is in the process of stopping after a manual request to stop.
- STOPPING_ERROR - The pipeline encounters errors while stopping.
1.9.1. State Transition Examples
Here are some examples of how pipelines can move through states:
Starting a pipeline
When you successfully start a pipeline for the first time, a pipeline transitions through the following states:
(EDITED)... STARTING... RUNNING
When you start a pipeline for the first time but it cannot start, the pipeline transitions through the following states:
(EDITED)... STARTING... STARTING_ERROR... START_ERROR
Stopping or restarting Data Collector
When Data Collector shuts down, running pipelines transition through the following states:
(RUNNING)... DISCONNECTING... DISCONNECTED
When Data Collector restarts, any pipelines that were running transition through the following states:
DISCONNECTED... CONNECTING... STARTING... RUNNING
Retrying a pipeline
When a pipeline is configured to retry upon error, Data Collector performs the specified number of retries when the pipeline encounters errors while running.
When retrying upon error and successfully retrying, a pipeline transitions through the following states:
(RUNNING)... RUNNING_ERROR... RETRY... STARTING... RUNNING
When retrying upon error and encountering another error, a pipeline transitions through the following states:
(RUNNING)... RUNNING_ERROR... RETRY... STARTING... RUNNING... RUNNING_ERROR...
When performing a final retry and unable to return to a Running state, a pipeline transitions through the following states:
(RUNNING)... RUNNING_ERROR... RUN_ERROR
Stopping a pipeline
When you successfully stop a pipeline, a pipeline transitions through the following states:
(RUNNING)... STOPPING... STOPPED
When you stop a pipeline and the pipeline encounters errors, the pipeline transitions through the following states:
(RUNNING)... STOPPING... STOPPING_ERROR... STOP_ERROR