Streams & Tasks: Change Tracking & Scheduled Processing
Architecture Diagram 1: Streams Change Tracking Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STREAMS CHANGE TRACKING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β SOURCE TABLE: sales_transactions β
β βββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Current State: β β
β β ββββββββ¬βββββββββββ¬βββββββββ¬βββββββββββββ β β
β β β id β product β amount β updated_at β β β
β β ββββββββΌβββββββββββΌβββββββββΌβββββββββββββ€ β β
β β β 1 β Laptop β 999 β 2024-01-15 β β β
β β β 2 β Phone β 699 β 2024-01-15 β β β
β β β 3 β Tablet β 499 β 2024-01-15 β β β
β β ββββββββ΄βββββββββββ΄βββββββββ΄βββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Stream on sales_transactions β
β βΌ β
β STREAM: sales_stream β
β βββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Change Log (Append-Only): β β
β β ββββββββ¬βββββββββββ¬βββββββββ¬βββββββββββββ¬ββββββββββ¬βββββββββββββ β β
β β β id β product β amount β updated_at β action β stream_id β β β
β β ββββββββΌβββββββββββΌβββββββββΌβββββββββββββΌββββββββββΌβββββββββββββ€ β β
β β β 4 β Monitor β 349 β 2024-01-16 β INSERT β S001 β β β
β β β 1 β Laptop β 949 β 2024-01-16 β UPDATE β S002 β β β
β β β 2 β Phone β NULL β 2024-01-16 β DELETE β S003 β β β
β β ββββββββ΄βββββββββββ΄βββββββββ΄βββββββββββββ΄ββββββββββ΄βββββββββββββ β β
β β β β
β β Stream Metadata: β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β METADATA$ACTION: INSERT, UPDATE, DELETE β β β
β β β METADATA$ISUPDATE: TRUE/FALSE (was this an update?) β β β
β β β METADATA$ROW_ID: Unique identifier for change β β β
β β β METADATA$SNAPSHOT: TRUE if table was cloned β β β
β β β METADATA$FILENAME: For external tables only β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Query stream β
β βΌ β
β CONSUMER: ETL Process β
β ββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β SELECT * FROM sales_stream; β β
β β β β
β β Results (includes current data + change metadata): β β
β β ββββββββ¬βββββββββββ¬βββββββββ¬βββββββββββββ¬ββββββββββ¬βββββββββββββ β β
β β β id β product β amount β updated_at β action β is_update β β β
β β ββββββββΌβββββββββββΌβββββββββΌβββββββββββββΌββββββββββΌβββββββββββββ€ β β
β β β 4 β Monitor β 349 β 2024-01-16 β INSERT β FALSE β β β
β β β 1 β Laptop β 949 β 2024-01-16 β UPDATE β TRUE β β β
β β β 2 β Phone β NULL β 2024-01-16 β DELETE β TRUE β β β
β β ββββββββ΄βββββββββββ΄βββββββββ΄βββββββββββββ΄ββββββββββ΄βββββββββββββ β β
β β β β
β β Process and acknowledge: β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β -- Process changes β β β
β β β INSERT INTO sales_target SELECT * FROM sales_stream; β β β
β β β β β β
β β β -- Stream offset automatically advances β β β
β β β -- Next query only returns new changes β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β STREAM TYPES: β
β ββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β 1. STANDARD STREAM (Default) β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Tracks INSERT, UPDATE, DELETE operations β β β
β β β β’ Returns current data with change metadata β β β
β β β β’ Shows both old and new values for updates β β β
β β β β’ Offset advances when queried β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β 2. APPEND-ONLY STREAM β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Tracks INSERT operations only β β β
β β β β’ Lower overhead than standard streams β β β
β β β β’ Suitable for immutable data ingestion β β β
β β β β’ Cannot detect updates or deletes β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β 3. INSERT-ONLY STREAM β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Tracks only new inserts β β β
β β β β’ Lowest overhead β β β
β β β β’ For append-only tables β β β
β β β β’ No change metadata required β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram 2: Tasks Scheduling Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TASKS SCHEDULING ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β TASK DEFINITION: β β
β β βββββββββββββββββ β β
β β β β
β β CREATE TASK daily_etl_task β β
β β WAREHOUSE = 'etl_wh' β β
β β SCHEDULE = 'USING CRON 0 2 * * * America/New_York' β β
β β ALLOW_OVERLAPPING_EXECUTION = FALSE β β
β β ERROR_INTEGRATION = 's3_error_notifications' β β
β β COMMENT = 'Daily ETL processing at 2 AM EST' β β
β β AS β β
β β BEGIN β β
β β -- ETL Logic β β
β β INSERT INTO target_table SELECT * FROM source_table; β β
β β END; β β
β β β β
β β Task Configuration: β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Schedule: CRON expression (0 2 * * *) β β β
β β β Warehouse: etl_wh (dedicated for task) β β β
β β β Overlap: FALSE (prevent concurrent runs) β β β
β β β Error Handling: Notifications to S3 β β β
β β β Timeout: Default (3600 seconds) β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β TASK EXECUTION STATES: β β
β β βββββββββββββββββββββββ β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β SCHEDULED βββββββββββΆ EXECUTING βββββββββββΆ SUCCEEDED β β β
β β β β β β β β β
β β β β β β β β β
β β β β βΌ β β β β
β β β β FAILED/ β β β β
β β β β BLOCKED β β β β
β β β β β β β β β
β β β β β β β β β
β β β βΌ βΌ βΌ β β β
β β β βββββββββββ βββββββββββ βββββββββββ β β β
β β β βWaiting β β Error β βComplete β β β β
β β β βfor nextβ β logged β βOffset β β β β
β β β βschedule β β β βadvanced β β β β
β β β βββββββββββ βββββββββββ βββββββββββ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β CRON EXPRESSION SYNTAX: β β
β β ββββββββββββββββββββββββ β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Format: minute hour day-of-month month day-of-week β β β
β β β β β β
β β β Examples: β β β
β β β β’ 0 2 * * * β Daily at 2:00 AM β β β
β β β β’ 0 */4 * * * β Every 4 hours β β β
β β β β’ 30 1 * * 1-5 β Weekdays at 1:30 AM β β β
β β β β’ 0 0 1 * * β Monthly on 1st at midnight β β β
β β β β’ 0 9-17 * * 1-5 β Weekdays every hour 9AM-5PM β β β
β β β β’ */5 * * * * β Every 5 minutes β β β
β β β β β β
β β β Special Characters: β β β
β β β β’ * β Any value β β β
β β β β’ , β List separator β β β
β β β β’ - β Range β β β
β β β β’ / β Step value β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram 3: Task Graphs & Dependencies
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TASK GRAPHS & DEPENDENCIES β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β TASK GRAPH: ETL Pipeline β
β βββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β βββββββββββββββββββ β β
β β β extract_raw β β β
β β β (Start Task) β β β
β β ββββββββββ¬βββββββββ β β
β β β β β
β β ββββββββββββββββΌβββββββββββββββ β β
β β βΌ βΌ βΌ β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β extract_salesβ βextract_inv β βextract_cust β β β
β β β (Parallel) β β (Parallel) β β (Parallel) β β β
β β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β β
β β β β β β β
β β ββββββββββββββββΌβββββββββββββββ β β
β β βΌ β β
β β βββββββββββββββββββ β β
β β β validate_data β β β
β β β (Synchronization β β β
β β β Point) β β β
β β ββββββββββ¬βββββββββ β β
β β β β β
β β ββββββββββββββΌβββββββββββββ β β
β β βΌ βΌ β β
β β βββββββββββββββ βββββββββββββββ β β
β β β transform β β load_raw β β β
β β β (Parallel) β β (Parallel) β β β
β β ββββββββ¬βββββββ ββββββββ¬βββββββ β β
β β β β β β
β β ββββββββββββββ¬βββββββββββββ β β
β β βΌ β β
β β βββββββββββββββββββ β β
β β β merge_fact β β β
β β β (Final Step) β β β
β β βββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β DEPENDENCY CONFIGURATION: β
β βββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Task: extract_sales β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β AFTER extract_raw β β β
β β β -- Runs after extract_raw completes β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Task: validate_data β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β AFTER extract_sales, extract_inv, extract_cust β β β
β β β -- Runs after ALL three tasks complete (synchronization) β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Task: transform β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β AFTER validate_data β β β
β β β -- Runs after validation succeeds β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Task: merge_fact β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β AFTER transform, load_raw β β β
β β β -- Waits for BOTH tasks to complete β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β TASK STATES IN GRAPH: β
β ββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β extract_raw: SUCCEEDED β β β β
β β β extract_sales: RUNNING βΆ β β β
β β β extract_inv: SUCCEEDED β β β β
β β β extract_cust: BLOCKED βΈ (waiting for sales) β β β
β β β validate_data: BLOCKED βΈ (waiting for all extracts) β β β
β β β transform: BLOCKED βΈ β β β
β β β load_raw: BLOCKED βΈ β β β
β β β merge_fact: BLOCKED βΈ β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Graph Execution Progress: 25% (2/8 tasks completed) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
A stream provides change data capture (CDC) by tracking INSERT, UPDATE, and DELETE operations on a source table. Streams maintain an append-only change log with metadata (METADATAISUPDATE, METADATA$ROW_ID) enabling consumers to process only changed data since the last consumption.
A task is a scheduling unit that executes SQL statements, stored procedures, or scripts on a defined schedule (CRON expression or interval). Tasks support dependencies (AFTER clause) for workflow orchestration, error handling, and automatic retry.
Streams support three types: STANDARD (all DML), APPEND-ONLY (inserts only), INSERT-ONLY (new rows only). Choose STANDARD for complete CDC, APPEND-ONLY for immutable data, INSERT-ONLY for simplest overhead.
- Streams: Append-only CDC with metadata for INSERT/UPDATE/DELETE tracking
- Tasks: CRON-based scheduling with dependency graphs (DAGs)
- Idempotent consumption: Same changes processed multiple times produce same result
- Task states: SCHEDULED β EXECUTING β SUCCEEDED/FAILED/BLOCKED
- Error propagation: Parent failure blocks all child tasks in the graph
Detailed Explanation
Streams: Change Data Capture
Snowflake Streams provide change data capture (CDC) capabilities by tracking changes to tables over time. When a stream is created on a table, it records all INSERT, UPDATE, and DELETE operations performed on the source table. Streams maintain an append-only log of changes, enabling downstream processes to consume and process only the data that has changed since the last consumption.
The stream metadata includes information about the type of change (METADATAISUPDATE), and unique identifiers for each change (METADATA$ROW_ID). This metadata enables consumers to distinguish between inserts, updates, and deletes, and to apply appropriate processing logic for each change type.
Streams support different tracking modes: STANDARD (tracks all DML operations), APPEND-ONLY (tracks only inserts), and INSERT-ONLY (tracks only new rows). The choice depends on the use case requirements and performance considerations. STANDARD streams provide complete change tracking but have higher overhead, while APPEND-ONLY and INSERT-ONLY streams are more efficient for simpler scenarios.
Stream Consumption Patterns
When a stream is queried, it returns the current state of changed data along with metadata indicating the type of change. For UPDATE operations, the stream contains both the old and new values, enabling consumers to identify what changed. After processing, the stream offset automatically advances, ensuring that subsequent queries only return new changes.
The consumption model is idempotentβprocessing the same changes multiple times produces the same result. This property enables safe retries and fault tolerance. If a consumer fails after processing changes but before acknowledging them, the changes can be reprocessed without data duplication or inconsistency.
Tasks: Scheduled Processing
Snowflake Tasks provide a scheduling framework for executing SQL statements, stored procedures, and scripts on a defined schedule. Tasks can be configured with CRON expressions for complex scheduling patterns or simple interval-based schedules. They run within assigned virtual warehouses, ensuring isolated compute resources for scheduled workloads.
Tasks support several execution states: SCHEDULED (waiting for next run time), EXECUTING (currently running), SUCCEEDED (completed successfully), FAILED (encountered an error), and BLOCKED (waiting for dependency or resource). These states enable monitoring and debugging of scheduled workflows.
The ALLOW_OVERLAPPING_EXECUTION parameter controls whether multiple instances of the same task can run concurrently. Setting this to FALSE ensures that a new execution doesn't start until the previous one completes, preventing resource contention and data inconsistencies.
Task Graphs and Dependencies
Task graphs enable complex workflow orchestration by defining dependencies between tasks. A task can specify one or more parent tasks using the AFTER clause, creating a directed acyclic graph (DAG) of task dependencies. Child tasks only execute after all parent tasks have completed successfully.
Task graphs support parallel execution of independent tasks, synchronization points where multiple branches converge, and error propagation across the graph. If a parent task fails, all child tasks are automatically blocked, preventing cascading failures and ensuring data consistency.
The graph execution engine manages task scheduling, dependency resolution, and state transitions. It provides visibility into graph execution progress, identifies bottlenecks, and handles error recovery. This automation eliminates the need for external orchestration tools for many common workflow patterns.
Error Handling and Monitoring
Tasks include comprehensive error handling capabilities. Failed tasks can be configured to send notifications via integration services (SNOWFLAKE.NOTIFICATION_INTEGRATION), log errors to tables, or trigger alerting systems. The ERROR_INTEGRATION parameter specifies where failure notifications are sent.
Task history is available through the TASK_HISTORY table function, providing detailed information about each execution including start time, end time, duration, error messages, and return values. This information enables monitoring, debugging, and performance analysis of scheduled workloads.
Key Concepts Table
| Stream Type | Tracks | Metadata | Use Case |
|---|---|---|---|
| STANDARD | INSERT, UPDATE, DELETE | Full change tracking | Complete CDC |
| APPEND-ONLY | INSERT only | Insert metadata | Immutable data |
| INSERT-ONLY | INSERT only | Minimal overhead | Append-only logs |
| Task State | Description | Next State |
|---|---|---|
| SCHEDULED | Waiting for execution time | EXECUTING |
| EXECUTING | Currently running | SUCCEEDED/FAILED |
| SUCCEEDED | Completed successfully | SCHEDULED |
| FAILED | Encountered error | SCHEDULED/BLOCKED |
| BLOCKED | Waiting for dependency | SCHEDULED |
| Dependency Type | Behavior | Use Case |
|---|---|---|
| AFTER (single) | Wait for one parent | Sequential processing |
| AFTER (multiple) | Wait for all parents | Synchronization point |
| Root task | No parents | Entry point for workflow |
Code Examples
-- Example 1: Create standard stream
CREATE OR REPLACE STREAM sales_stream
ON TABLE sales_transactions
SHOW_INITIAL_ROWS = TRUE
COMMENT = 'CDC stream for sales transactions';
-- Example 2: Create append-only stream
CREATE OR REPLACE STREAM immutable_log_stream
ON TABLE audit_log
APPEND_ONLY = TRUE
COMMENT = 'Append-only stream for audit logs';
-- Example 3: Query stream with change metadata
SELECT
*,
METADATA$ACTION as change_action,
METADATA$ISUPDATE as is_update,
METADATA$ROW_ID as row_id
FROM sales_stream;
-- Example 4: Process stream with merge
MERGE INTO sales_target t
USING sales_stream s
ON t.id = s.id
WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN
DELETE
WHEN MATCHED AND s.METADATA$ACTION = 'UPDATE' THEN
UPDATE SET
t.product = s.product,
t.amount = s.amount,
t.updated_at = s.updated_at
WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN
INSERT (id, product, amount, updated_at)
VALUES (s.id, s.product, s.amount, s.updated_at);
-- Example 5: Create basic task
CREATE OR REPLACE TASK daily_cleanup_task
WAREHOUSE = 'maintenance_wh'
SCHEDULE = 'USING CRON 0 3 * * * America/New_York'
COMMENT = 'Daily cleanup of old data'
AS
DELETE FROM audit_log
WHERE created_at < DATEADD(day, -90, CURRENT_TIMESTAMP());
-- Example 6: Create task with error handling
CREATE OR REPLACE TASK etl_task
WAREHOUSE = 'etl_wh'
SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
ERROR_INTEGRATION = 's3_error_notifications'
ALLOW_OVERLAPPING_EXECUTION = FALSE
AS
BEGIN
-- Log start
INSERT INTO task_log (task_name, start_time, status)
VALUES ('etl_task', CURRENT_TIMESTAMP(), 'RUNNING');
-- Execute ETL
INSERT INTO target_table
SELECT * FROM source_table
WHERE date = CURRENT_DATE();
-- Log success
INSERT INTO task_log (task_name, start_time, status, end_time)
VALUES ('etl_task', CURRENT_TIMESTAMP(), 'SUCCESS', CURRENT_TIMESTAMP());
EXCEPTION
WHEN OTHER THEN
-- Log failure
INSERT INTO task_log (task_name, start_time, status, error_message)
VALUES ('etl_task', CURRENT_TIMESTAMP(), 'FAILED', SQLERRM);
RAISE;
END;
-- Example 7: Create task graph
CREATE OR REPLACE TASK root_task
WAREHOUSE = 'etl_wh'
SCHEDULE = 'USING CRON 0 2 * * * America/New_York'
AS
INSERT INTO raw_sales SELECT * FROM stage_sales;
CREATE OR REPLACE TASK child_task_1
WAREHOUSE = 'etl_wh'
AFTER root_task
AS
INSERT INTO clean_sales SELECT * FROM raw_sales WHERE amount > 0;
CREATE OR REPLACE TASK child_task_2
WAREHOUSE = 'etl_wh'
AFTER root_task
AS
INSERT INTO raw_inventory SELECT * FROM stage_inventory;
CREATE OR REPLACE TASK final_task
WAREHOUSE = 'etl_wh'
AFTER child_task_1, child_task_2
AS
INSERT INTO analytics_sales
SELECT s.*, i.stock
FROM clean_sales s
JOIN raw_inventory i ON s.product_id = i.product_id;
-- Example 8: Monitor task execution
SELECT
task_name,
query_id,
state,
scheduled_time,
started_time,
completed_time,
error_code,
error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD(day, -7, CURRENT_TIMESTAMP())
))
ORDER BY scheduled_time DESC;
-- Example 9: Alter task schedule
ALTER TASK daily_cleanup_task RESUME;
ALTER TASK daily_cleanup_task SUSPEND;
ALTER TASK daily_cleanup_task SET SCHEDULE = 'USING CRON 0 4 * * * America/New_York';
-- Example 10: Stream consumption pattern
CREATE OR REPLACE PROCEDURE process_stream_changes()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
change_count INTEGER;
BEGIN
-- Count available changes
SELECT COUNT(*) INTO change_count FROM sales_stream;
-- Process only if changes exist
IF (change_count > 0) THEN
-- Process INSERT operations
INSERT INTO sales_archive (id, product, amount, updated_at)
SELECT id, product, amount, updated_at
FROM sales_stream
WHERE METADATA$ACTION = 'INSERT';
-- Process UPDATE operations
UPDATE sales_archive t
SET
product = s.product,
amount = s.amount,
updated_at = s.updated_at
FROM sales_stream s
WHERE t.id = s.id
AND s.METADATA$ACTION = 'UPDATE'
AND s.METADATA$ISUPDATE = TRUE;
-- Process DELETE operations
DELETE FROM sales_archive t
WHERE EXISTS (
SELECT 1 FROM sales_stream s
WHERE t.id = s.id
AND s.METADATA$ACTION = 'DELETE'
);
RETURN 'Processed ' || change_count || ' changes';
ELSE
RETURN 'No changes to process';
END IF;
END;
$$;
Performance Metrics
| Metric | Target | Warning | Critical |
|---|---|---|---|
| Stream Consumption Latency | < 5 min | 5-15 min | > 15 min |
| Task Execution Time | < 30 min | 30-60 min | > 60 min |
| Task Failure Rate | < 1% | 1-5% | > 5% |
| Stream Offset Lag | < 100 changes | 100-1000 | > 1000 |
| Task Graph Completion | < 2 hours | 2-4 hours | > 4 hours |
Best Practices
-
Monitor stream offsets: Regularly check stream consumption to prevent unbounded growth. Set up alerts for excessive offset lag.
-
Use appropriate stream types: Choose STANDARD for complete CDC, APPEND-ONLY for immutable data, and INSERT-ONLY for simple ingestion.
-
Implement idempotent processing: Design stream consumers to handle duplicate processing safely. Use MERGE operations for upserts.
-
Dedicate warehouses for tasks: Create separate warehouses for task execution to isolate scheduled workloads from interactive queries.
-
Use task graphs for workflows: Replace complex stored procedures with task graphs for better visibility, error handling, and maintainability.
-
Set appropriate schedules: Use CRON expressions for complex schedules. Consider time zones and business hours when scheduling.
-
Implement error handling: Configure ERROR_INTEGRATION for failure notifications. Log errors to tables for debugging.
-
Monitor task history: Regularly review task execution history to identify patterns, bottlenecks, and failures.
-
Use ALLOW_OVERLAPPING_EXECUTION carefully: Set to FALSE for tasks with state dependencies. Set to TRUE for independent tasks.
-
Clean up old streams: Remove streams that are no longer needed to reduce metadata overhead and improve performance.
See Also
- PySpark Iceberg - Iceberg CDC patterns
- Delta Lake on Databricks - Delta Lake Change Data Feed
- Data Warehouse Concepts - Data warehouse design principles