In snowflake, Streams are a powerful feature designed for tracking changes (like inserts, updates, and deletes) to a table. They allow you to capture data changes in realtime and use this data for various purposes, such as incremental loading, change data capture (CDC), or auditing.

Here’s an overview of Streams in Snowflake and how to use them:

  1. What is a Stream in Snowflake?

A Stream in Snowflake is an object that tracks changes (DML operations like INSERT, UPDATE, and DELETE) made to a table. It captures these changes in a special table that is internally managed by Snowflake, and these changes can then be queried.

A Stream has two important components:

  • Change tracking:It captures DML changes, including the type of change (insert, update, delete).
  • Metadata:Information such as the row’s previous state for updates, and how many rows have changed since the last time the stream was queried.
  1. How to Create a Stream

You can create a stream on a table using the CREATE STREAM command.

Basic Syntax:

CREATE OR REPLACE STREAM <stream_name>

ON TABLE <table_name>

[ SHOW_INITIAL_ROWS = TRUE | FALSE ];

  • SHOW_INITIAL_ROWS:
    • If set to TRUE, it will include existing rows in the stream at the time of creation.
    • If set to FALSE, only changes after the stream is created will be captured.

Example:

CREATE OR REPLACE STREAM my_stream

ON TABLE my_table

SHOW_INITIAL_ROWS = TRUE;

  1. Understanding the Stream Table

Once you create a stream, Snowflake internally tracks changes in a pseudo-table. This is not a normal table; it tracks changes and allows querying changes.

You can query the stream like a table, and it will return the changes made to the source table:

SELECT * FROM my_stream;

The results will include metadata about the changes:

  • METADATA$ACTION: The type of action (INSERT, DELETE, UPDATE).
  • METADATA$ROW_ID: Unique identifier for the changed row.
  • METADATA$IS_DELETED: Marks if the row was deleted.
  1. Querying Streams for Changes

To fetch the changes, simply query the stream. Here’s an example of how to query the stream to get recent changes:

SELECT

METADATA$ACTION,

METADATA$ROW_ID,

*

FROM my_stream

WHERE METADATA$ACTION IN (‘INSERT’, ‘UPDATE’);

This will return the actions (INSERT, UPDATE) and the data for the rows that have been modified since the last time the stream was queried.

  1. Consuming the Stream Data

After querying the stream, you will want to consume the data by processing it. This typically involves:

  • Processing the changes(e.g., copying the changes to another table or applying transformations).
  • Marking the stream as consumed to indicate that the changes have been processed.

Once you query the stream, Snowflake tracks that you’ve consumed the changes, and it will not return those changes again unless new changes are made.

  1. Commit to the Stream

Once you’ve processed the changes, you can “commit” the stream so that it knows the changes have been fully handled and it can clear out those changes. This can be done using the DROP statement on the stream, or you can just let Snowflake handle it automatically.

Example:

— After processing the changes, drop the stream

DROP STREAM my_stream;

Alternatively, if you want to keep the stream for ongoing tracking of changes, you do not need to drop it immediately.

  1. Use Cases for Streams

Streams are commonly used for:

  • Change Data Capture (CDC):Tracking inserts, updates, and deletes to keep data in sync between tables.
  • Data Synchronization: Replicating changes between Snowflake tables or external systems.
  • Incremental Loading: Loading only changed data (instead of full table scans) for downstream processing or ETL.

Example: Using Streams for ETL

  1. Create a Stream on the Source Table:

CREATE OR REPLACE STREAM my_stream

ON TABLE my_table

SHOW_INITIAL_ROWS = TRUE;

  1. Query the Stream for New Changes:

SELECT * FROM my_stream;

  1. Process the Changes (e.g., insert into another table):

INSERT INTO my_target_table

SELECT * FROM my_stream WHERE METADATA$ACTION = ‘INSERT’;

  1. Clear the Stream after Processing (optional):

DROP STREAM my_stream;

  1. Important Considerations
  • Streams are not persistent tables:They track changes but do not store actual data like regular tables.
  • **Snowflake’s Stream is eventually consistent: Some real-time latency is expected, especially when processing high-frequency changes.
  • Streams can be queried multiple times:After processing data, you can query a stream multiple times for subsequent changes.
  • Delete and Update Changes:For UPDATE operations, the stream records both the old and new values of the modified row.

Conclusion

Streams are an excellent tool in Snowflake for tracking changes in tables and implementing incremental processing. They work best in scenarios like CDC or ETL processes where you need to track only the changes instead of processing the entire table. By using streams, you can efficiently handle real-time data processing, avoid unnecessary table scans, and optimize data pipelines.

Recommended Posts

Start typing and press Enter to search