Lewati ke isi

Event Flow โ€” PostgreSQL โ†’ Kafka โ†’ ClickHouse

Alur event CDC untuk analitik. Sumber kanonik operasional: ../../database/cdc/README.md, ../../database/cdc/RUNBOOK.md.


Alur utama (steady state)

sequenceDiagram
  participant App as NestJS / Trigger
  participant PG as PostgreSQL tenant_*
  participant DEB as Debezium
  participant KF as Kafka
  participant ETL as clickhouse-etl
  participant CH as ClickHouse

  App->>PG: INSERT/UPDATE/DELETE
  PG->>PG: Trigger stok/jurnal (sync)
  PG->>DEB: WAL logical replication
  DEB->>KF: JSON CDC event
  KF->>ETL: poll batch
  ETL->>CH: INSERT ReplacingMergeTree + _riwayat
  Note over CH: MV agregat terisi async

Tahap per komponen

1. PostgreSQL (source of truth)

  • Tabel fakta: transaksi, transaksi_detail, jurnal, stok, dimensi akun*, master barang, kontak, dll. (daftar lengkap di consumer).
  • Publication: debezium_erp_pub โ€” hanya tabel yang ditambahkan eksplisit per tenant.
  • Replication slot: debezium_erp, role erp_debezium.

Prinsip: Perubahan OLTP valid hanya jika trigger PG sukses; CDC mengikuti after commit.

2. Debezium Connect

  • Connector: database/connectors/debezium-postgres-erp.json
  • Format topik: erp.<schema_postgres>.<nama_tabel> โ€” contoh erp.tenant_sparepart.transaksi
  • Operasi: c create, u update, d delete, r snapshot/read

3. Kafka

  • Broker: localhost:9092 (Docker clickhouse-kafka-1 di compose CDC)
  • Consumer group utama: clickhouse-etl-group
  • Topik internal Connect: my_connect_offsets, my_connect_configs, my_connect_statuses โ€” jangan dihapus kecuali reset connector terkontrol

Rujuk: ../kafka/kafka-topics.md.

4. Consumer ETL (consumer_clickhouse.py)

Parameter Default / catatan
TENANT_SCHEMAS Wajib โ€” daftar tenant_<kode>
BATCH_SIZE 3000 (tuning per lingkungan)
FLUSH_INTERVAL 2 detik
Target DB erp_clickhouse โ€” isolasi tenant via kolom tenant_kode

Mapping operasi โ†’ ClickHouse:

Op PG Kolom op Tabel utama Tabel riwayat
INSERT c baris baru โœ… append
UPDATE u versi baru (ReplacingMergeTree) โœ… semua versi
DELETE d is_deleted=1 โœ… jejak
Snapshot r baseline โœ…

stok tidak punya _riwayat โ€” cache saldo; audit mutasi via transaksi_detail.

5. ClickHouse

  • Engine state: ReplacingMergeTree(versi) โ€” query pakai FINAL + is_deleted = 0
  • Engine audit: MergeTree pada *_riwayat
  • MV: dashboard penjualan, saldo akun โ€” rebuild via DROP+CREATE (tidak bisa ALTER MV)

Rujuk: ../clickhouse/clickhouse-patterns.md.

6. NestJS (read path)

  • ClickHouseService โ†’ dashboard/laporan
  • Tidak menulis transaksi ke CH; tulis hanya ke PG

Jalur alternatif: initial load (baseline)

Untuk tenant baru atau rebuild, urutan yang dipakai sekarang (bukan hanya CDC dari nol):

flowchart LR
  A[Reset tenant PG] --> B[ETL / rebuild PG]
  B --> C[Direct load PG โ†’ CH]
  C --> D[Validasi count PG = CH]
  D --> E[Start CDC delta]
  E --> F[Consumer clickhouse-etl]

Prosedur: ../../database/SOP_INITIAL_LOAD_DIRECT_CH.md, ../../database/cdc/RUNBOOK.md ยง Rebuild Full.


Retry & replay

Pola Lingkungan Mekanisme Rujuk
Consumer error loop Runtime Log + sleep 5s, retry poll consumer_clickhouse.py
DLQ Runtime Insert ke tabel DLQ tenant/global bila batch gagal consumer_clickhouse.py
Kafka replay DEV / UI Reset offset group ke earliest โ€” langkah kafka_replay migrasi-ui/main.py
Job replay produksi PROD (rencana) replay_cdc via control plane, consumer group clickhouse-replay-<job_id> OPERATING_MODEL.md
Debezium snapshot ulang DEV Hapus offset topic / connector reset RUNBOOK
Backend job retry API POST /ops/cdc/jobs/{id}/aksi retry 01-CONTROL-PLANE-CDC.md

Guardrail produksi: global reset (global_reset, kafka_replay) diblokir di mode production pada UI (OPERATING_MODEL.md).

Metadata replay: pusat.cdc_replay_checkpoint (migrasi pusat V001).


Notifikasi realtime (terpisah dari CDC)

flowchart LR
  TRG[Trigger PG] -->|pg_notify| BE[NestJS listener]
  BE --> FCM[Firebase FCM]
  FCM --> FL[Flutter]

CDC tidak menggantikan FCM; ini jalur UX realtime. Rujuk 10-ARSITEKTUR-KLIEN-AUTH-REALTIME.md.


DDL change & event pipeline

Saat menambah kolom:

  1. Migrasi ClickHouse (V1##__*.sql) โ€” kolom DEFAULT atau Nullable
  2. Migrasi PostgreSQL tenant/pusat
  3. Update publication / include-list jika tabel baru
  4. Restart consumer

Saat menghapus kolom: PostgreSQL dulu, lalu ClickHouse.

Rujuk ../../AGENTS.md.


Dokumen terkait