πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Time Series Databases

⭐ Premium

Advertisement

Time Series Databases

Time series data is everywhere – metrics, IoT sensors, financial prices. Specialized databases handle the unique patterns of temporal data efficiently.

Time Series Data Patterns

Time Series Database ArchitectureData SourcesMetricsIoT SensorsLogsIngestionHigh WriteThroughput1M+ pts/secTime-OrderedAppend-OnlyCompressionPartitioningQuery EngineRange QueriesAggregationsDownsamplingRetentionAuto CleanupTTL PoliciesArchivingKey: Write-Optimized | Time-Partitioned | Compression | Retention Policies

InfluxDB

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas as pd
from datetime import datetime, timedelta

class InfluxDBManager:
    def __init__(self, url, token, org, bucket):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.org = org
        self.bucket = bucket
    
    def write_point(self, measurement, tags, fields, timestamp=None):
        """Write a single data point"""
        point = Point(measurement)
        
        for tag_key, tag_value in tags.items():
            point = point.tag(tag_key, tag_value)
        
        for field_key, field_value in fields.items():
            point = point.field(field_key, field_value)
        
        if timestamp:
            point = point.time(timestamp)
        
        self.write_api.write(self.bucket, self.org, point)
    
    def write_batch(self, measurement, data_list):
        """Write multiple points efficiently"""
        points = []
        for data in data_list:
            point = Point(measurement)
            for tag_key, tag_value in data.get('tags', {}).items():
                point = point.tag(tag_key, tag_value)
            for field_key, field_value in data['fields'].items():
                point = point.field(field_key, field_value)
            if 'timestamp' in data:
                point = point.time(data['timestamp'])
            points.append(point)
        
        self.write_api.write(self.bucket, self.org, points)
    
    def query(self, flux_query):
        """Execute a Flux query"""
        tables = self.query_api.query(flux_query, org=self.org)
        
        results = []
        for table in tables:
            for record in table.records:
                results.append({
                    'time': record.get_time(),
                    'measurement': record.get_measurement(),
                    'field': record.get_field(),
                    'value': record.get_value(),
                    **{k: v for k, v in record.values.items() 
                       if k not in ['time', '_measurement', '_field', '_value']}
                })
        
        return pd.DataFrame(results)
    
    def get_cpu_metrics(self, host, hours=24):
        """Query CPU metrics for a specific host"""
        query = f'''
        from(bucket: "{self.bucket}")
            |> range(start: -{hours}h)
            |> filter(fn: (r) => r._measurement == "cpu")
            |> filter(fn: (r) => r.host == "{host}")
            |> filter(fn: (r) => r._field == "usage_percent")
            |> aggregateWindow(every: 1m, fn: mean)
        '''
        return self.query(query)
    
    def downsample(self, measurement, from_interval, to_interval):
        """Downsample data to reduce storage"""
        task_name = f"downsample_{measurement}_{from_interval}_{to_interval}"
        
        flux = f'''
        option task = {{name: "{task_name}", every: {to_interval}}}

        from(bucket: "{self.bucket}")
            |> range(start: -{from_interval})
            |> filter(fn: (r) => r._measurement == "{measurement}")
            |> aggregateWindow(every: {to_interval}, fn: mean, createEmpty: false)
            |> to(bucket: "{self.bucket}", org: "{self.org}")
        '''
        
        return flux

# Usage
manager = InfluxDBManager(
    url="http://localhost:8086",
    token="my-token",
    org="my-org",
    bucket="metrics"
)

# Write metrics
manager.write_point(
    measurement="cpu",
    tags={"host": "server-01", "region": "us-east"},
    fields={"usage_percent": 75.3, "cores": 8}
)

# Query data
df = manager.get_cpu_metrics("server-01", hours=24)

TimescaleDB

import psycopg2
from psycopg2 import sql
import pandas as pd
from datetime import datetime

class TimescaleDBManager:
    def __init__(self, dbname, user, password, host, port):
        self.conn = psycopg2.connect(
            dbname=dbname, user=user, password=password,
            host=host, port=port
        )
        self.conn.autocommit = True
    
    def create_hypertable(self, table_name, time_column='time'):
        """Convert a table to a TimescaleDB hypertable"""
        with self.conn.cursor() as cur:
            cur.execute(f"""
                SELECT create_hypertable('{table_name}', '{time_column}',
                    if_not_exists => TRUE);
            """)
    
    def create_continuous_aggregate(self, view_name, source_table, 
                                    time_bucket='1 hour'):
        """Create a continuous aggregate for efficient querying"""
        query = f"""
        CREATE MATERIALIZED VIEW {view_name}
        WITH (timescaledb.continuous) AS
        SELECT
            time_bucket('{time_bucket}', time) AS bucket,
            device_id,
            AVG(temperature) AS avg_temp,
            MAX(temperature) AS max_temp,
            MIN(temperature) AS min_temp,
            COUNT(*) AS sample_count
        FROM {source_table}
        GROUP BY bucket, device_id;
        """
        
        with self.conn.cursor() as cur:
            cur.execute(query)
    
    def add_retention_policy(self, table_name, interval='30 days'):
        """Automatically drop old data"""
        query = f"""
        SELECT add_retention_policy('{table_name}', INTERVAL '{interval}');
        """
        
        with self.conn.cursor() as cur:
            cur.execute(query)
    
    def add_compression(self, table_name, compress_after='7 days'):
        """Compress old data to save space"""
        query = f"""
        ALTER TABLE {table_name} SET (
            timescaledb.compress,
            timescaledb.compress_segmentby = 'device_id',
            timescaledb.compress_orderby = 'time DESC'
        );
        
        SELECT add_compression_policy('{table_name}', 
            INTERVAL '{compress_after}');
        """
        
        with self.conn.cursor() as cur:
            cur.execute(query)
    
    def query_timeseries(self, device_id, start_time, end_time, 
                         bucket='5 minutes'):
        """Query time-bucketed data"""
        query = f"""
        SELECT
            time_bucket('{bucket}', time) AS bucket,
            AVG(temperature) AS avg_temp,
            MAX(humidity) AS max_humidity,
            COUNT(*) AS samples
        FROM sensor_data
        WHERE device_id = %s
          AND time BETWEEN %s AND %s
        GROUP BY bucket
        ORDER BY bucket;
        """
        
        return pd.read_sql(query, self.conn, params=(device_id, start_time, end_time))
    
    def detect_anomalies(self, table_name, z_threshold=3.0):
        """Detect anomalous values using z-score"""
        query = f"""
        WITH stats AS (
            SELECT
                AVG(temperature) AS mean_temp,
                STDDEV(temperature) AS std_temp
            FROM {table_name}
            WHERE time > NOW() - INTERVAL '24 hours'
        )
        SELECT time, temperature, device_id,
            (temperature - mean_temp) / std_temp AS z_score
        FROM {table_name}, stats
        WHERE ABS((temperature - mean_temp) / std_temp) > {z_threshold}
        ORDER BY time DESC;
        """
        
        return pd.read_sql(query, self.conn)

# Usage
db = TimescaleDBManager(
    dbname="metrics", user="admin", password="secret",
    host="localhost", port=5432
)

db.create_hypertable("sensor_data", "time")
db.add_retention_policy("sensor_data", "90 days")
db.add_compression("sensor_data", "7 days")
db.create_continuous_aggregate("hourly_stats", "sensor_data", "1 hour")

Prometheus

import requests
import json
from datetime import datetime, timedelta

class PrometheusClient:
    def __init__(self, base_url):
        self.base_url = base_url.rstrip('/')
    
    def query_instant(self, promql):
        """Execute an instant query"""
        response = requests.get(
            f"{self.base_url}/api/v1/query",
            params={"query": promql}
        )
        return response.json()
    
    def query_range(self, promql, start, end, step="15s"):
        """Execute a range query"""
        response = requests.get(
            f"{self.base_url}/api/v1/query_range",
            params={
                "query": promql,
                "start": start,
                "end": end,
                "step": step
            }
        )
        return response.json()
    
    def get_cpu_usage(self, instance, hours=1):
        """Get CPU usage for an instance"""
        promql = f'100 - (avg by(instance) (irate(node_cpu_seconds_total{{instance="{instance}",mode="idle"}}[5m])) * 100)'
        
        end = datetime.now()
        start = end - timedelta(hours=hours)
        
        result = self.query_range(promql, start.isoformat(), end.isoformat())
        
        if result['status'] == 'success':
            return result['data']['result']
        return []
    
    def get_memory_usage(self, instance):
        """Get memory usage percentage"""
        promql = f'(1 - node_memory_MemAvailable_bytes{{instance="{instance}"}} / node_memory_MemTotal_bytes{{instance="{instance}"}}) * 100'
        
        result = self.query_instant(promql)
        return result
    
    def get_disk_usage(self, instance, mountpoint='/'):
        """Get disk usage percentage"""
        promql = f'(1 - node_filesystem_avail_bytes{{instance="{instance}",mountpoint="{mountpoint}"}} / node_filesystem_size_bytes{{instance="{instance}",mountpoint="{mountpoint}"}}) * 100'
        
        result = self.query_instant(promql)
        return result

# Alerting rules
ALERT_RULES = """
groups:
  - name: ml_system_alerts
    rules:
      - alert: HighPredictionLatency
        expr: histogram_quantile(0.99, rate(ml_prediction_duration_seconds_bucket[5m])) > 0.1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High prediction latency"
          
      - alert: ModelAccuracyDrop
        expr: ml_model_accuracy < 0.85
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Model accuracy below threshold"
          
      - alert: DataDriftDetected
        expr: ml_data_drift_score > 0.3
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Data drift detected"
"""

Query Patterns

# Time series specific queries

# Resampling
def resample_timeseries(df, rule='5T', method='mean'):
    """Resample time series to different frequency"""
    df.index = pd.to_datetime(df.index)
    return df.resample(rule).agg(method)

# Rolling statistics
def rolling_features(df, windows=[7, 14, 30]):
    """Compute rolling statistics"""
    features = pd.DataFrame(index=df.index)
    
    for window in windows:
        features[f'rolling_mean_{window}'] = df['value'].rolling(window).mean()
        features[f'rolling_std_{window}'] = df['value'].rolling(window).std()
        features[f'rolling_min_{window}'] = df['value'].rolling(window).min()
        features[f'rolling_max_{window}'] = df['value'].rolling(window).max()
    
    return features

# Lag features
def lag_features(df, lags=[1, 7, 14, 30]):
    """Create lag features"""
    features = pd.DataFrame(index=df.index)
    
    for lag in lags:
        features[f'lag_{lag}'] = df['value'].shift(lag)
    
    return features

# Exponential moving average
def ewm_features(df, spans=[7, 14, 30]):
    """Exponential weighted moving average"""
    features = pd.DataFrame(index=df.index)
    
    for span in spans:
        features[f'ewm_{span}'] = df['value'].ewm(span=span).mean()
    
    return features

Best Practices

  1. Use hypertables in TimescaleDB for automatic partitioning
  2. Downsample aggressively for older data to save storage
  3. Set retention policies to automatically clean up old data
  4. Create continuous aggregates for common query patterns
  5. Monitor query performance and add indexes as needed

Advertisement