Command Line and Shell Scripting for Data Engineers

Module 1: FoundationsShell ScriptingFree Lesson

Advertisement

Why the Command Line Matters

The command line is the data engineer's Swiss Army knife. From running pipelines to managing servers, from processing log files to automating backups — shell scripting is indispensable.

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│              COMMAND LINE USE CASES                          │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│  ETL Automation      │  Cron jobs, shell scripts            │
│  Server Management   │  SSH, file transfers, monitoring     │
│  Data Processing     │  grep, awk, sed for quick transforms │
│  Pipeline Control    │  Airflow CLI, spark-submit           │
│  Log Analysis        │  grep, awk, sort, uniq               │
│  Backup & Recovery   │  rsync, tar, compression             │
│  CI/CD               │  Build scripts, deployment           │
│  Docker & K8s        │  Container management                │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

Bash Fundamentals

Variables and Operations

#!/bin/bash

# Variables
NAME="data-pipeline"
DATE=$(date +%Y-%m-%d)
LOG_DIR="/var/log/pipelines"
S3_BUCKET="s3://my-data-lake"

# String operations
echo "Pipeline: $NAME"
echo "Date: $DATE"
echo "Uppercase: ${NAME^^}"
echo "Length: ${#NAME}"

# Arithmetic
COUNT=10
INCREMENT=$((COUNT + 1))
echo "Next: $INCREMENT"

# Conditional
if [ -d "$LOG_DIR" ]; then
    echo "Log directory exists"
else
    mkdir -p "$LOG_DIR"
fi

# Loops
for TABLE in users orders products; do
    echo "Processing table: $TABLE"
    psql -c "SELECT COUNT(*) FROM $TABLE;"
done

# While loop with file
while IFS= read -r line; do
    echo "Processing: $line"
done < tables.txt

File Operations

# Directory operations
mkdir -p /data/raw /data/processed /data/archive

# File operations
touch data.csv
cp data.csv data_backup.csv
mv data.csv archived/
rm -f old_file.csv

# Find files
find /data -name "*.csv" -mtime +7         # CSV files older than 7 days
find /data -size +1G                        # Files larger than 1GB
find /data -name "*.log" -delete            # Delete old log files

# File information
ls -lh /data                               # List with sizes
du -sh /data/*                             # Directory sizes
wc -l data.csv                             # Count lines
head -5 data.csv                           # First 5 lines
tail -5 data.csv                           # Last 5 lines

# Permissions
chmod 755 script.sh                         # Make executable
chmod 600 secrets.env                       # Restrict access
chown -R user:group /data                   # Change ownership

# Symbolic links
ln -s /data/processed/latest.csv current.csv

Piping and Redirection

# Pipe output to next command
cat data.csv | grep "error" | wc -l

# Redirect output to file
echo "log message" >> pipeline.log          # Append
echo "new content" > output.txt             # Overwrite

# Redirect stderr
command 2> errors.log                       # stderr to file
command 2>&1                                # Merge stderr to stdout
command > output.log 2>&1                   # Both to file

# Suppress output
command > /dev/null 2>&1                    # Silent execution

# Here document
cat << EOF > config.yaml
database:
  host: localhost
  port: 5432
  name: production
EOF

# Here string
grep "error" <<< "$LOG_CONTENT"

grep — Pattern Searching

# Basic search
grep "error" application.log

# Case-insensitive
grep -i "error" application.log

# Recursive search
grep -r "TODO" /project/src/

# Count matches
grep -c "error" application.log

# Show line numbers
grep -n "error" application.log

# Invert match (lines NOT containing pattern)
grep -v "debug" application.log

# Regular expressions
grep -E "^[0-9]{4}-[0-9]{2}-[0-9]{2}" data.csv    # Date format
grep -E "^[A-Z][a-z]+" names.txt                   # Starts with uppercase
grep -E "\b[0-9]{3}-[0-9]{3}-[0-9]{4}\b" data.txt # Phone numbers

# Multiple patterns
grep -E "error|warning|critical" application.log

# Context lines
grep -B 2 -A 2 "exception" application.log          # 2 lines before/after

# Perl regex for complex patterns
grep -P "\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}" access.log  # IP addresses

awk — Text Processing

# Print specific columns
awk '{print $1, $3}' data.csv                    # Columns 1 and 3

# Custom delimiter
awk -F',' '{print $2, $4}' data.csv              # CSV with comma delimiter

# Conditional filtering
awk -F',' '$3 > 1000 {print $1, $3}' orders.csv  # Amount > 1000

# Calculate sum
awk -F',' '{sum += $3} END {print "Total:", sum}' orders.csv

# Pattern matching
awk '/error/ {print NR, $0}' application.log     # Lines containing "error"

# Built-in variables
awk '{print NR, NF, $0}' data.csv                # Line number, field count, full line

# Complex processing
awk -F',' '
    NR > 1 {  # Skip header
        total[$2] += $3
        count[$2]++
    }
    END {
        for (region in total) {
            printf "%s: $%.2f (avg: $%.2f)\n", region, total[region], total[region]/count[region]
        }
    }
' sales.csv

# Process log files
awk '
    /ERROR/ {errors++}
    /WARNING/ {warnings++}
    /INFO/ {infos++}
    END {
        print "Errors:", errors
        print "Warnings:", warnings
        print "Info:", infos
    }
' application.log

sed — Stream Editing

# Replace text
sed 's/error/warning/g' application.log          # Replace all occurrences
sed -i 's/error/warning/g' application.log       # In-place edit
sed -i.bak 's/error/warning/g' file.txt          # In-place with backup

# Delete lines
sed '/^#/d' config.yaml                          # Remove comments
sed '/^$/d' data.csv                             # Remove empty lines
sed '1d' data.csv                                # Remove header

# Insert/append
sed '3a\New line after line 3' file.txt          # Append after line 3
sed '1i\Header line' file.txt                    # Insert before line 1

# Extract ranges
sed -n '10,20p' data.csv                         # Lines 10-20
sed -n '/START/,/END/p' file.txt                 # Between markers

# Multiple operations
sed -e 's/foo/bar/g' -e 's/baz/qux/g' file.txt

# Address by regex
sed '/^2024-01/d' data.csv                       # Delete lines starting with 2024-01

Practical Data Processing

Log Analysis Script

#!/bin/bash
# Analyze web server access logs

LOG_FILE="$1"
REPORT_DIR="/var/reports/$(date +%Y-%m-%d)"

mkdir -p "$REPORT_DIR"

# Top 10 IP addresses
echo "Top 10 IP Addresses:" > "$REPORT_DIR/access_report.txt"
awk '{print $1}' "$LOG_FILE" | sort | uniq -c | sort -rn | head -10 >> "$REPORT_DIR/access_report.txt"

# Request statistics
echo -e "\nHTTP Status Codes:" >> "$REPORT_DIR/access_report.txt"
awk '{print $9}' "$LOG_FILE" | sort | uniq -c | sort -rn >> "$REPORT_DIR/access_report.txt"

# Top endpoints
echo -e "\nTop Endpoints:" >> "$REPORT_DIR/access_report.txt"
awk '{print $7}' "$LOG_FILE" | sort | uniq -c | sort -rn | head -10 >> "$REPORT_DIR/access_report.txt"

# Bandwidth by IP
echo -e "\nBandwidth by IP (MB):" >> "$REPORT_DIR/access_report.txt"
awk '{ip[$1] += $10} END {for (i in ip) printf "%s: %.2f MB\n", i, ip[i]/1048576}' "$LOG_FILE" | sort -t: -k2 -rn | head -10 >> "$REPORT_DIR/access_report.txt"

echo "Report generated: $REPORT_DIR/access_report.txt"

ETL Shell Script

#!/bin/bash
# Daily ETL pipeline

set -euo pipefail  # Exit on error, undefined vars, pipe failures

# Configuration
SOURCE_DB="postgresql://user:pass@source-host/production"
TARGET_DB="postgresql://user:pass@target-host/analytics"
S3_BUCKET="s3://data-lake"
DATE=$(date +%Y-%m-%d)
LOG_FILE="/var/log/etl/daily_etl_${DATE}.log"

# Logging function
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# Error handling
error_exit() {
    log "ERROR: $1"
    log "Pipeline failed at $(date)"
    exit 1
}

trap error_exit ERR

log "Starting daily ETL pipeline"

# Step 1: Extract
log "Step 1: Extracting data from source"
psql "$SOURCE_DB" -c "\COPY orders TO '/tmp/orders_${DATE}.csv' WITH CSV HEADER" || error_exit "Extraction failed"

# Step 2: Validate
log "Step 2: Validating extracted data"
ROW_COUNT=$(wc -l < "/tmp/orders_${DATE}.csv")
if [ "$ROW_COUNT" -lt 2 ]; then
    error_exit "No data extracted"
fi
log "Extracted $((ROW_COUNT - 1)) rows"

# Step 3: Transform
log "Step 3: Transforming data"
python3 << EOF
import pandas as pd
df = pd.read_csv('/tmp/orders_${DATE}.csv')
df = df.drop_duplicates(subset=['order_id'])
df = df[df['amount'] > 0]
df['processed_at'] = '${DATE}'
df.to_csv('/tmp/orders_transformed_${DATE}.csv', index=False)
print(f"Transformed {len(df)} rows")
EOF

# Step 4: Load to S3
log "Step 4: Uploading to S3"
aws s3 cp "/tmp/orders_transformed_${DATE}.csv" "${S3_BUCKET}/bronze/orders/${DATE}/"

# Step 5: Load to warehouse
log "Step 5: Loading to warehouse"
psql "$TARGET_DB" -c "\COPY orders FROM '/tmp/orders_transformed_${DATE}.csv' WITH CSV HEADER" || error_exit "Load failed"

# Step 6: Cleanup
log "Step 6: Cleaning up temporary files"
rm -f /tmp/orders_*.csv

log "Pipeline completed successfully"

Cron Jobs

Cron Syntax

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ minute (0-59)
│ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ hour (0-23)
│ │ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ day of month (1-31)
│ │ │ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ month (1-12)
│ │ │ │ ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ day of week (0-6, Sun=0)
│ │ │ │ │
* * * * * command

Common Cron Patterns

# Edit crontab
crontab -e

# View crontab
crontab -l

# Common schedules
0 2 * * * /path/to/daily_etl.sh          # Daily at 2 AM
0 */6 * * * /path/to/sync.sh             # Every 6 hours
30 1 * * 0 /path/to/weekly_report.sh     # Weekly Sunday 1:30 AM
0 0 1 * * /path/to/monthly_archive.sh    # Monthly at midnight

# Cron with logging
0 2 * * * /path/to/script.sh >> /var/log/script.log 2>&1

# Cron with environment
SHELL=/bin/bash
PATH=/usr/local/bin:/usr/bin:/bin
0 2 * * * /path/to/script.sh

SSH and Remote Operations

# Basic SSH
ssh user@remote-host

# SSH with key
ssh -i ~/.ssh/key.pem user@remote-host

# Execute remote command
ssh user@remote-host "df -h"

# SSH tunnel (port forwarding)
ssh -L 5432:localhost:5432 user@remote-host

# SCP (secure copy)
scp file.csv user@remote-host:/data/
scp user@remote-host:/data/file.csv ./local/

# Rsync (efficient sync)
rsync -avz /local/data/ user@remote-host:/remote/data/
rsync -avz --delete /local/data/ user@remote-host:/remote/data/
rsync -avz --exclude="*.log" /local/data/ user@remote-host:/remote/data/

# SSH config (~/.ssh/config)
Host production
    HostName 192.168.1.100
    User deploy
    IdentityFile ~/.ssh/prod_key
    Port 22

# Use: ssh production

Docker for Data Engineers

# Pull and run containers
docker pull postgres:15
docker run -d --name postgres-dev \
    -e POSTGRES_PASSWORD=secret \
    -p 5432:5432 \
    -v pgdata:/var/lib/postgresql/data \
    postgres:15

# Docker Compose
docker-compose up -d
docker-compose down
docker-compose logs -f

# Execute commands in containers
docker exec -it postgres-dev psql -U postgres

# Build custom image
docker build -t my-etl-tool .
docker run my-etl-tool

# Cleanup
docker system prune -a
docker volume prune

Key Takeaways

  1. Bash is essential — master variables, loops, conditionals, and functions
  2. grep, awk, sed are powerful — use them for quick data processing and log analysis
  3. Pipe everything — chain commands for complex transformations
  4. Set -euo pipefail — always use error handling in production scripts
  5. Cron automates tasks — schedule ETL jobs and maintenance tasks
  6. SSH and rsync — manage remote servers and synchronize data
  7. Shell scripts for ETL — combine Python, SQL, and shell commands
  8. Docker simplifies environments — containerize your data tools

Practice Exercises

  1. Log parser: Write a shell script that parses a web server log file and generates a report of top 10 IPs, status codes, and bandwidth.

  2. ETL script: Create a shell script that extracts data from PostgreSQL, transforms with awk/sed, and loads to S3.

  3. Cron automation: Set up a cron job that runs a data quality check every hour and sends an email alert on failure.

  4. Remote sync: Write a script that uses rsync to synchronize data between local and remote servers, with logging and error handling.

  5. Docker pipeline: Create a Docker Compose setup with PostgreSQL, Airflow, and a Jupyter notebook for data exploration.

Advertisement

Need Expert Data Engineering Help?

Professional DE consulting, pipeline architecture, and data platform services.

Advertisement