Stream Pipelines concepts
Processing stream data
The Data Fabric streaming architecture, consisting of Streaming Ingestion and Stream Pipelines, provides an end-to-end continuous data flow between the event source and the pipelines that are used for delivering data. This enables a real-time processing of source events to the target system.
When data events are ingested into Data Fabric through the Streaming Ingestion method, Stream Pipelines processes them immediately and continuously without waiting for the events to be stored in Data Lake for durability as data objects. This approach ensures an effective data processing by minimizing the data journey and thus accelerating operations. Consequently, users can extract needed insights from their data in real time.
A data event is a discrete unit of data that represents a change or update. A data event can take the form of a record, row, message, or any other type of data structure that conveys information. Data events are often used in real-time streaming systems to represent changes or updates to data sources, and they are processed by streaming pipelines to enable a real-time data processing and delivery.
For more information on the Data Fabric ingestion method, see Sending data to Data Lake.
Stream Pipelines have these limitations:
- Currently, Stream Pipelines support only Newline-delimited JSON (NDJSON). Payload records must be in the NDJSON format.
- Events larger than 4.5 MB cannot be processed in Stream Pipelines. If an event's size is larger than 4.5 MB, then the event becomes an error in the Replay Queue.
Processing batched data
Stream Pipelines are designed to process data events in real time. However, Stream Pipelines can also process data that comes through the batch ingestion methods, such as Batch Ingestion API or ION, and data from Data Lake. In the case of batched data, the data is published in a batch format from the data source and must be stored in Data Lake before it can reach Stream Pipelines. This results in a delay between the data event and its delivery to the destination.
Processing data objects from Data Lake
Through the Initial Load feature in Stream Pipelines, you can process historical data events from the Data Lake data objects. Initial Load enables you to fill in historical data gaps for a selected time frame in the destination tables.
Depending on a volume of data and variations of records in Data Lake for a specific time frame, data object processing by Initial Load can take a long time.
Stream Pipelines can process data from Data Lake, but their primary purpose is to process streamed data. To process large volumes of data from Data Lake, we suggest that you create an Extract-Transform-Load (ETL) solution based on, for example, the Compass Query Platform and Compass APIs.
Loading data to a destination
Data transfer to a destination through Stream Pipelines is based on the Data Fabric concepts. This includes adherence of data and tables to the schemas that are defined in the Data Catalog and application of data versioning techniques.
Stream Pipelines are designed to handle data loading continuously as events come in. To optimize this process, data is delivered in small batches. Each batch contains data from the time frame of 100 milliseconds or accumulates up to 1000 records, whichever condition is met first.
In Stream Pipelines, you can use one of these methods to load data to a destination:
- Upsert load method
Inserts new records into a table or updates existing records. Updating existing records is determined by whether a processed event is a newer version of the existing record in the table. This is based on verifying the higher value of the record variation field. Processing an event with a lower variation than the existing record does not result in updating the record.
The Upsert load method requires each record in a table to be unique. To establish record uniqueness, you must define the Identifier Paths field within the Data Catalog's object properties as the table's primary or unique keys.
We recommend that you use the Upsert method to load data for most scenarios. This method allows tables to reflect the current state of your source data.
Note: The Upsert method is not supported by all destinations. See Destinations.When you upsert records and in the batch window, there are multiple events that are associated with the same record, only the events' highest variation is retained for the delivery. Older variations are excluded from the delivery. Events that are excluded during the process are counted and their quantity is displayed on the Excluded graph on the Overview tab in a pipeline.
- Insert load method
Inserts events as new records into a table. Unlike the Upsert load method, the Insert load method does not update existing records. Hence, a processed event's variation does not matter.
When you use the Insert method, we recommend that you remove primary and unique keys from the table columns. Otherwise, if a record with the same unique key or identifier already exists in the table, the Insert operation fails and the events are moved to the Replay Queue because of the load errors.
We recommend that you use the Insert method to maintain historical data in the target tables.
Example of a data load with the Upsert method
These conditions must be met to load data with the Upsert method:
- Use the stream pipeline for which the Upsert loading method is defined.
- The Upsert method is supported by the destination.
- Source data has identifier and variation columns.
- The destination table has primary or unique keys columns that match the source identifier columns.
This table shows the destination table that contains these records:
ID | Description | Price | VariationNumber |
---|---|---|---|
1 | Banana | 1.10 | 9999 |
2 | Apple | 0.30 | 9999 |
This table shows events that are published by the source within the time frame of 100 ms:
ID | Description | Price | VariationNumber |
---|---|---|---|
1 | Banana | 1.50 | 10001 |
2 | Apple | 0.50 | 10001 |
3 | Orange | 2.10 | 10001 |
4 | Watermelon | 3.99 | 10001 |
2 | Apple | 0.55 | 10002 |
The upsert optimization process excludes the event with ID 2 and variation number 10001. This is because the delivery batch contains a newer event for the same ID but with a higher value of 10002 in the VariationNumber field.
This table shows records in the destination table after the successful event delivery:
ID | Description | Price | VariationNumber |
---|---|---|---|
1 | Banana | 1.50 | 10001 |
2 | Apple | 0.55 | 10002 |
3 | Orange | 2.10 | 10001 |
4 | Watermelon | 3.99 | 10001 |
Events with IDs 3 and 4 have been inserted and events with IDs 1 and 2 have been updated.
Example of a data load with the Insert method
These conditions must be met to load data with the Insert method:
- Use the stream pipeline for which the Insert loading method is defined.
- Source data has identifier and variation columns.
- The destination table has no unique column constraints.
This table shows the destination table that contains these records:
ID | Description | Price | VariationNumber |
---|---|---|---|
1 | Banana | 1.10 | 9999 |
2 | Apple | 0.30 | 9999 |
This table shows events that are published by the source:
ID | Description | Price | VariationNumber |
---|---|---|---|
1 | Banana | 1.50 | 10001 |
2 | Apple | 0.50 | 10001 |
3 | Orange | 2.10 | 10001 |
4 | Watermelon | 3.99 | 10001 |
2 | Apple | 0.55 | 10002 |
This table shows records in the destination table after the successful event delivery:
ID | Description | Price | VariationNumber |
---|---|---|---|
1 | Banana | 1.10 | 9999 |
2 | Apple | 0.30 | 9999 |
1 | Banana | 1.50 | 10001 |
2 | Apple | 0.50 | 10001 |
3 | Orange | 2.10 | 10001 |
4 | Watermelon | 3.99 | 10001 |
2 | Apple | 0.55 | 10002 |
After the successful delivery, the destination table contains all added and already existing events.