Kafka Topics¶
Konvensi topik dan consumer untuk pipeline CDC. Implementasi: ../../database/etl/clickhouse/consumer_clickhouse.py.
Pola penamaan topik¶
erp.<schema_postgres>.<nama_tabel>
| Komponen | Nilai | Contoh |
|---|---|---|
| Prefix | erp |
Tetap |
| Schema | tenant_<kode> |
tenant_sparepart |
| Tabel | nama tabel PG | transaksi |
Contoh topik aktif:
erp.tenant_sparepart.transaksierp.tenant_sparepart.transaksi_detailerp.tenant_sparepart.jurnalerp.tenant_sparepart.stokerp.tenant_leontech.transaksi- … (satu topik per kombinasi schema+tabel yang dipublish)
Tabel yang di-subscribe consumer¶
Daftar TABEL_CDC di consumer (subset dipublish Debezium — harus sinkron dengan table.include.list connector):
Fakta & akuntansi: transaksi, transaksi_detail, jurnal, stok, akun, akun_klas, akun_subklas
Master & pendukung: barang, barang_*, kontak, kontak_*, lokasi, satuan, harga, fifo_*, termin, transaksi_bayar, transaksi_draft*, …
Penting: Menambah tabel di PG tanpa update connector +
TENANT_SCHEMAS+ migrasi CH → consumer tidak konsumsi atau crash saat INSERT.
Topik internal Kafka Connect¶
| Topik | Fungsi |
|---|---|
my_connect_offsets |
Offset connector (compacted) |
my_connect_configs |
Config connector |
my_connect_statuses |
Status task |
Jangan dihapus pada reset terbatas; reset connector memakai prosedur tombstone/REST di RUNBOOK.
Consumer groups¶
| Group | Pemakai | Perilaku |
|---|---|---|
clickhouse-etl-group |
clickhouse-etl PM2 |
Steady-state CDC → CH |
clickhouse-replay-<job_id> |
Job replay produksi (rencana) | Isolasi replay per job |
Env: KAFKA_BROKER=localhost:9092, TENANT_SCHEMAS=tenant_sparepart,tenant_leontech.
Format pesan (Debezium)¶
JSON dengan payload before / after, metadata schema, op (create/update/delete/read).
Consumer memetakan ke kolom CH: versi, is_deleted, op, tenant_kode (diderivasi dari nama schema).
Operasi DEV: reset & replay¶
| Operasi | Tool | Mode |
|---|---|---|
| List lag per topik | migrasi-ui | DEV/ops |
Hapus topik prefix erp. |
migrasi-ui global_reset |
DEV only |
| Reset consumer offset | kafka_replay → earliest |
DEV — data CH bisa duplikat tanpa purge |
| Debezium connector reset | migrasi-ui / REST | DEV |
Produksi: OPERATING_MODEL.md — tenant-scoped job, tanpa global reset.
Menambah tenant ke Kafka¶
- Buat schema PG + migrasi tenant
ALTER PUBLICATION debezium_erp_pub ADD TABLE tenant_<kode>.transaksi, ...- PUT
table.include.listDebezium pm2 restart clickhouse-etldenganTENANT_SCHEMASdiperbarui
Rujuk ../../database/cdc/RUNBOOK.md.
Diagram¶
flowchart LR
PG[(PG WAL)] --> DEB[Debezium]
DEB --> T1[erp.tenant_x.transaksi]
DEB --> T2[erp.tenant_x.jurnal]
T1 --> CG[clickhouse-etl-group]
T2 --> CG
CG --> CH[(erp_clickhouse)]