To create a new Snowpipe (a continuous data ingestion pipeline) in Snowflake, follow these steps:
- Prerequisites
- Storage Integration: Set up a cloud storage integration (AWS S3, Azure Blob, GCP) to securely connect Snowflake to your cloud storage.
- Stage: Create an external stage pointing to your cloud storage (if not already created).
- Target Table: Ensure the destination table exists in Snowflake.
- Create a Stage (if needed)
- Define a stage pointing to your cloud storage location. Example for AWS S3:
CREATE OR REPLACE STAGE my_s3_stage
URL = ‘s3://your-bucket/path/’
STORAGE_INTEGRATION = my_storage_integration
FILE_FORMAT = my_file_format; — (e.g., CSV, JSON)
- Create a Pipe
A pipe uses the COPY INTO command to load data from the stage into the target table. Example:
sql
CREATE OR REPLACE PIPE my_pipe
AUTO_INGEST = TRUE — Enable auto-ingest (for cloud notifications)
AS
COPY INTO my_target_table
FROM @my_s3_stage
FILE_FORMAT = (TYPE = ‘CSV’)
PATTERN = ‘.*.csv’; — Optional: File pattern
- Configure Auto-Ingest (Optional)
For automated ingestion when new files arrive:
- AWS: Set up an SQS event notification on your S3 bucket and note the ARN.
- Azure/GCP: Configure event notifications (e.g., Azure Event Grid).
Link the event notification to the pipe:
ALTER PIPE my_pipe SET NOTIFICATION_CHANNEL = ‘arn:aws:sns:…’; — AWS SQS ARN.
For manual ingestion (without auto-ingest), use:
ALTER PIPE my_pipe REFRESH;
5. Permissions
Grant necessary privileges to the role creating/running the pipe:
GRANT USAGE ON DATABASE my_db TO ROLE my_role;
GRANT USAGE ON SCHEMA my_schema TO ROLE my_role;
GRANT CREATE PIPE ON SCHEMA my_schema TO ROLE my_role;
GRANT OPERATE, MONITOR ON PIPE my_pipe TO ROLE my_role;
GRANT SELECT ON TABLE my_target_table TO ROLE my_role;
- Monitor the Pipe
Check the pipe status and load history:
— Check pipe status
SELECT SYSTEM$PIPE_STATUS(‘my_pipe’);
— View copy history
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => ‘my_target_table’,
START_TIME => DATEADD(HOUR, -1, CURRENT_TIMESTAMP) ));
— Check errors (if any)
SELECT *
FROM TABLE(VALIDATE_PIPE_LOAD(
PIPE_NAME => ‘my_pipe’,
START_TIME => ‘2024-01-01T00:00:00Z’));