To create a new Snowpipe (a continuous data ingestion pipeline) in Snowflake, follow these steps:

  1. Prerequisites
  • Storage Integration: Set up a cloud storage integration (AWS S3, Azure Blob, GCP) to securely connect Snowflake to your cloud storage.
  • Stage: Create an external stage pointing to your cloud storage (if not already created).
  • Target Table: Ensure the destination table exists in Snowflake.
  1. Create a Stage (if needed)
  • Define a stage pointing to your cloud storage location. Example for AWS S3:

CREATE OR REPLACE STAGE my_s3_stage

URL = ‘s3://your-bucket/path/’

STORAGE_INTEGRATION = my_storage_integration

FILE_FORMAT = my_file_format; — (e.g., CSV, JSON)

  1. Create a Pipe

A pipe uses the COPY INTO command to load data from the stage into the target table. Example:

sql

CREATE OR REPLACE PIPE my_pipe

AUTO_INGEST = TRUE — Enable auto-ingest (for cloud notifications)

AS

COPY INTO my_target_table

FROM @my_s3_stage

FILE_FORMAT = (TYPE = ‘CSV’)

PATTERN = ‘.*.csv’; — Optional: File pattern

  1. Configure Auto-Ingest (Optional)

For automated ingestion when new files arrive:

  •              AWS: Set up an SQS event notification on your S3 bucket and note the ARN.
  •              Azure/GCP: Configure event notifications (e.g., Azure Event Grid).

Link the event notification to the pipe:

ALTER PIPE my_pipe SET NOTIFICATION_CHANNEL = ‘arn:aws:sns:…’; — AWS SQS ARN

        For manual ingestion (without auto-ingest), use:

ALTER PIPE my_pipe REFRESH;

      5. Permissions

Grant necessary privileges to the role creating/running the pipe:

GRANT USAGE ON DATABASE my_db TO ROLE my_role;

GRANT USAGE ON SCHEMA my_schema TO ROLE my_role;

GRANT CREATE PIPE ON SCHEMA my_schema TO ROLE my_role;

GRANT OPERATE, MONITOR ON PIPE my_pipe TO ROLE my_role;

GRANT SELECT ON TABLE my_target_table TO ROLE my_role;

  1. Monitor the Pipe

Check the pipe status and load history:

— Check pipe status

SELECT SYSTEM$PIPE_STATUS(‘my_pipe’);

— View copy history

SELECT *

FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(

TABLE_NAME => ‘my_target_table’,

START_TIME => DATEADD(HOUR, -1, CURRENT_TIMESTAMP) ));

— Check errors (if any)

SELECT *

FROM TABLE(VALIDATE_PIPE_LOAD(

PIPE_NAME => ‘my_pipe’,

START_TIME => ‘2024-01-01T00:00:00Z’));

Recommended Posts

Start typing and press Enter to search