PostgreSQL Không Phải "Chỉ Là Web App Database"
PostgreSQL hiện đại (v14+) có đủ để làm trung tâm data pipeline production: JSONB cho schemaless ingestion, Partitioning theo thời gian, Logical Replication cho CDC native, Materialized Views cho aggregation nặng, và LISTEN/NOTIFY cho event streaming nhẹ. Với quy mô dưới 1TB và dưới 50K events/giây, đây là stack cost-effective nhất hiện có — không cần Kafka, Spark, hay Snowflake.
ETL vs ELT: Chọn Đúng Pattern
| Tiêu chí | ETL (Python-first) | ELT (SQL-first) |
|---|
| Data size | Dưới 100MB/batch | Hàng GB+ |
| Transformation | Logic Python phức tạp | SQL + dbt |
| Performance | Memory bottleneck | Push computation to DB |
| Tool | Pandas, Polars | dbt, SQLMesh |
Khuyến nghị thực tế: Dùng ELT cho analytical workloads — PostgreSQL xử lý JOIN và aggregation tốt hơn Pandas cho large datasets. Dùng ETL khi business logic quá phức tạp để diễn đạt bằng SQL.
Data Sources (API / Webhook / CSV)
↓
[Python Ingestion Layer — httpx, asyncpg, pandas]
↓
[PostgreSQL Staging — raw_events JSONB, PARTITION BY time]
↓ SQL transforms / dbt
[Fact Tables + Materialized Views]
↓
[Serving — Metabase / Grafana / Application APIs]
Schema Design Quan Trọng Nhất
Staging table nhận raw data không validate schema chặt:
CREATE TABLE raw_events (
id BIGSERIAL PRIMARY KEY,
source TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
received_at TIMESTAMPTZ DEFAULT NOW(),
processed BOOLEAN DEFAULT FALSE,
batch_id UUID
) PARTITION BY RANGE (received_at);
CREATE INDEX idx_unprocessed ON raw_events (received_at)
WHERE processed = FALSE;
CREATE INDEX idx_payload_gin ON raw_events USING GIN (payload);
Python Stack: 3 Thư Viện Cốt Lõi
SQLAlchemy 2.0 với QueuePool và pool_pre_ping=True. Dùng COPY thay INSERT cho bulk loads — nhanh hơn 10-100x:
engine = create_engine(
"postgresql+psycopg2://user:pass@localhost/db",
pool_size=10, max_overflow=20,
pool_pre_ping=True,
)
def fetch_unprocessed(limit=1000):
with engine.connect() as conn:
return conn.execute(text("""
SELECT id, payload FROM raw_events
WHERE processed = FALSE
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""), {"limit": limit}).fetchall()
Pandas / Polars: pd.read_sql() đọc thẳng từ PostgreSQL. Polars + ConnectorX nhanh hơn Pandas 5-20x cho datasets lớn. Ghi về PostgreSQL dùng to_sql(..., method='multi', chunksize=5000).
Concurrency: 3 Pattern Phải Biết
Pattern 1 — ThreadPoolExecutor cho I/O-bound tasks (API calls, DB queries). max_workers=8 là sweet spot cho hầu hết pipeline.
Pattern 2 — asyncpg + asyncio cho high concurrency (500+ concurrent operations):
pool = await asyncpg.create_pool("postgresql://...", min_size=5, max_size=20)
async def fetch_events(batch_size=500):
async with pool.acquire() as conn:
async with conn.transaction():
return await conn.fetch("""
SELECT id, event_type, payload FROM raw_events
WHERE processed = FALSE LIMIT $1
FOR UPDATE SKIP LOCKED
""", batch_size)
Pattern 3 — FOR UPDATE SKIP LOCKED: khi Worker 1 lock 100 rows, Worker 2 tự động SKIP qua — không deadlock, không duplicate processing. Đây là pattern quan trọng nhất khi chạy nhiều workers song song.
Orchestration: Prefect 3 vs Airflow
| Tiêu chí | Apache Airflow | Prefect 3 |
|---|
| Setup | Cao (Celery/K8s) | Thấp (pip install) |
| Code style | DAG definitions phức tạp | Python functions tự nhiên |
| Dynamic flows | Khó, cần workarounds | Native support |
| Phù hợp | Enterprise, 100+ DAGs | Startup, 1-50 flows |
Prefect: @flow + @task decorators, retry tự động, UI built-in. Đủ dùng cho 95% startup pipeline.
Query Optimization: 5 Index Strategies
CREATE INDEX idx_unprocessed ON raw_events (received_at)
WHERE processed = FALSE;
CREATE INDEX idx_user_time ON fact_user_events (user_id, event_at DESC);
CREATE INDEX idx_payload ON raw_events USING GIN (payload);
CREATE INDEX idx_user_id_str ON raw_events ((payload->>'user_id'));
CREATE INDEX idx_time_brin ON raw_events USING BRIN (received_at);
Dùng REFRESH MATERIALIZED VIEW CONCURRENTLY cho heavy aggregations — không lock đọc khi refresh.
Luôn debug với EXPLAIN (ANALYZE, BUFFERS). Chú ý: Seq Scan = thiếu index; Rows estimate sai xa = cần ANALYZE; cache hit ratio mục tiêu trên 95%.
Dead Letter Queue — Không Để Mất Records
Records lỗi không crash toàn batch — đẩy vào DLQ table với error_msg, attempt_count, last_failed_at. Alert khi DLQ tăng đột biến. Cho phép manual review và replay.
Khi Nào Dùng PostgreSQL (và Không Nên)
| Tình huống | Quyết định | Alternative |
|---|
| Dưới 500GB, 1-50K events/s, team nhỏ | ✅ Lý tưởng | — |
| Cần ACID qua toàn pipeline | ✅ Lý tưởng | — |
| Trên 1TB, analytics nặng | ⚠️ Xem xét thêm | DuckDB, ClickHouse |
| Trên 100K events/giây | ❌ Không phù hợp | Kafka + Flink |
| Pure time-series | ⚠️ Cần extension | TimescaleDB, InfluxDB |
Production Checklist
PostgreSQL + Python là stack thực dụng nhất cho startup đến mid-size. Khi data vượt 1TB hoặc cần Kafka-scale thì mới bổ sung thêm — đừng over-engineer từ đầu.
Muốn tích hợp AI vào data pipeline? Xem hướng dẫn xây dựng AI agents.