Dynamic SQL & Metadata Queries
Difficulty: Hard | Companies: Google, Amazon, Meta, Netflix, Uber
Dynamic SQL with EXECUTE
-- Basic dynamic SQL
CREATE OR REPLACE PROCEDURE dynamic_select(
table_name TEXT,
where_clause TEXT
)
LANGUAGE plpgsql
AS $$
DECLARE
result RECORD;
query TEXT;
BEGIN
query := format('SELECT * FROM %I WHERE %s', table_name, where_clause);
RAISE NOTICE 'Executing: %', query;
FOR result IN EXECUTE query
LOOP
RAISE NOTICE 'Row: %', to_json(result);
END LOOP;
END;
$$;
-- Call procedure
CALL dynamic_select('employees', 'department_id = 5');
βΉοΈ
Key Insight: Always use format() with %I for identifiers and %L for literals to prevent SQL injection. Never concatenate user input directly into dynamic SQL.
Parameterized Dynamic SQL
-- Safe parameterized queries
CREATE OR REPLACE PROCEDURE safe_dynamic_query(
table_name TEXT,
column_name TEXT,
search_value TEXT
)
LANGUAGE plpgsql
AS $$
DECLARE
result RECORD;
query TEXT;
BEGIN
-- Use %I for identifiers, $1 for parameters
query := format(
'SELECT * FROM %I WHERE %I = $1',
table_name,
column_name
);
RAISE NOTICE 'Query: %', query;
FOR result IN EXECUTE query USING search_value
LOOP
RAISE NOTICE 'Found: %', to_json(result);
END LOOP;
END;
$$;
Metadata Queries
-- Query table metadata
SELECT
column_name,
data_type,
character_maximum_length,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_name = 'employees'
AND table_schema = 'public'
ORDER BY ordinal_position;
-- Query all tables and their sizes
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
n_live_tup AS row_count
FROM pg_stat_user_tables
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
Dynamic Pivot Generation
-- Generate pivot query dynamically
CREATE OR REPLACE PROCEDURE dynamic_pivot(
source_table TEXT,
pivot_column TEXT,
value_column TEXT,
aggregate_func TEXT DEFAULT 'SUM'
)
LANGUAGE plpgsql
AS $$
DECLARE
pivot_values TEXT;
query TEXT;
result RECORD;
BEGIN
-- Get distinct values for pivot columns
EXECUTE format(
'SELECT STRING_AGG(DISTINCT %I, '', '') FROM %I',
pivot_column,
source_table
) INTO pivot_values;
-- Build pivot query
query := format(
'SELECT * FROM (
SELECT %I, %I FROM %I
) src
PIVOT (
%s(%I) FOR %I IN (%s)
)',
CASE WHEN pivot_column != 'category' THEN 'category' ELSE 'id' END,
value_column,
source_table,
aggregate_func,
value_column,
pivot_column,
pivot_values
);
RAISE NOTICE 'Generated query: %', query;
EXECUTE query;
END;
$$;
BigQuery Dynamic SQL
-- BigQuery dynamic SQL
CREATE OR REPLACE PROCEDURE `project.dataset.dynamic_query`(
table_name STRING,
filter_column STRING,
filter_value STRING
)
BEGIN
DECLARE query STRING;
DECLARE result STRUCT<id INT64, name STRING>;
SET query = FORMAT(
'SELECT id, name FROM `%s` WHERE %s = @val',
table_name,
filter_column
);
EXECUTE IMMEDIATE query
INTO result
USING filter_value AS val;
SELECT result;
END;
Schema Comparison
-- Compare schemas between environments
WITH source_schema AS (
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_schema = 'production'
),
target_schema AS (
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_schema = 'staging'
)
SELECT
'Missing in staging' AS difference_type,
s.table_name,
s.column_name,
s.data_type
FROM source_schema s
LEFT JOIN target_schema t
ON s.table_name = t.table_name
AND s.column_name = t.column_name
WHERE t.column_name IS NULL
UNION ALL
SELECT
'Extra in staging' AS difference_type,
t.table_name,
t.column_name,
t.data_type
FROM target_schema t
LEFT JOIN source_schema s
ON t.table_name = s.table_name
AND t.column_name = s.column_name
WHERE s.column_name IS NULL;
Dynamic DDL Generation
-- Generate CREATE TABLE from existing table
CREATE OR REPLACE PROCEDURE clone_table_structure(
source_table TEXT,
target_table TEXT
)
LANGUAGE plpgsql
AS $$
DECLARE
column_def TEXT;
query TEXT;
BEGIN
-- Build column definitions
SELECT STRING_AGG(
format('%I %s%s',
column_name,
data_type,
CASE WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END
),
E',\n '
)
INTO column_def
FROM information_schema.columns
WHERE table_name = source_table
AND table_schema = 'public'
ORDER BY ordinal_position;
-- Create table
query := format(
'CREATE TABLE IF NOT EXISTS %I (
%s
)',
target_table,
column_def
);
RAISE NOTICE 'Executing: %', query;
EXECUTE query;
END;
$$;
Dynamic Index Creation
-- Create indexes based on query patterns
CREATE OR REPLACE PROCEDURE create_missing_indexes()
LANGUAGE plpgsql
AS $$
DECLARE
index_record RECORD;
create_sql TEXT;
BEGIN
FOR index_record IN
SELECT
schemaname,
tablename,
attname AS column_name,
n_distinct,
correlation
FROM pg_stats
WHERE schemaname = 'public'
AND n_distinct > 100
AND correlation < 0.1
AND NOT EXISTS (
SELECT 1 FROM pg_indexes
WHERE tablename = pg_stats.tablename
AND indexdef LIKE '%' || attname || '%'
)
LOOP
create_sql := format(
'CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_%s_%s ON %I.%I (%I)',
index_record.tablename,
index_record.column_name,
index_record.schemaname,
index_record.tablename,
index_record.column_name
);
RAISE NOTICE 'Creating: %', create_sql;
EXECUTE create_sql;
END LOOP;
END;
$$;
Query Statistics Collection
-- Collect query performance statistics
CREATE TABLE query_performance_log (
id SERIAL PRIMARY KEY,
query_hash BIGINT,
query_text TEXT,
execution_time INTERVAL,
rows_returned INT,
calls BIGINT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Populate from pg_stat_statements
INSERT INTO query_performance_log (
query_hash, query_text, execution_time, rows_returned, calls
)
SELECT
queryid,
LEFT(query, 1000),
mean_time * INTERVAL '1 millisecond',
rows,
calls
FROM pg_stat_statements
WHERE dbid = (SELECT oid FROM pg_database WHERE datname = current_database());
Dynamic WHERE Clause Builder
-- Build dynamic WHERE clause
CREATE OR REPLACE FUNCTION build_where_clause(
filters JSONB
) RETURNS TEXT
LANGUAGE plpgsql
AS $$
DECLARE
clause TEXT := '';
key TEXT;
value TEXT;
BEGIN
FOR key, value IN SELECT * FROM jsonb_each(filters)
LOOP
IF clause != '' THEN
clause := clause || ' AND ';
END IF;
clause := clause || format('%I = %L', key, value);
END LOOP;
RETURN COALESCE(clause, '1=1');
END;
$$;
-- Use function
SELECT build_where_clause('{"status": "active", "department_id": "5"}'::jsonb);
Dynamic Batch Processing
-- Process tables dynamically
CREATE OR REPLACE PROCEDURE analyze_all_tables()
LANGUAGE plpgsql
AS $$
DECLARE
table_record RECORD;
BEGIN
FOR table_record IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
LOOP
EXECUTE format('ANALYZE %I', table_record.tablename);
RAISE NOTICE 'Analyzed: %', table_record.tablename;
END LOOP;
END;
$$;
β οΈ
Security Warning: Always validate and sanitize inputs for dynamic SQL. Use format() with proper type specifiers (%I for identifiers, %L for literals) to prevent SQL injection attacks.
Follow-Up Questions
- How do you prevent SQL injection in dynamic SQL?
- When would you use dynamic SQL vs static SQL?
- How do you debug dynamic SQL queries?
- Explain the performance implications of dynamic SQL.
- How would you implement row-level security using dynamic SQL?
- What's the best approach for testing dynamic SQL procedures?