Lewati ke isi

Kafka Topics

Konvensi topik dan consumer untuk pipeline CDC. Implementasi: ../../database/etl/clickhouse/consumer_clickhouse.py.


Pola penamaan topik

erp.<schema_postgres>.<nama_tabel>
Komponen Nilai Contoh
Prefix erp Tetap
Schema tenant_<kode> tenant_sparepart
Tabel nama tabel PG transaksi

Contoh topik aktif:

  • erp.tenant_sparepart.transaksi
  • erp.tenant_sparepart.transaksi_detail
  • erp.tenant_sparepart.jurnal
  • erp.tenant_sparepart.stok
  • erp.tenant_leontech.transaksi
  • … (satu topik per kombinasi schema+tabel yang dipublish)

Tabel yang di-subscribe consumer

Daftar TABEL_CDC di consumer (subset dipublish Debezium — harus sinkron dengan table.include.list connector):

Fakta & akuntansi: transaksi, transaksi_detail, jurnal, stok, akun, akun_klas, akun_subklas

Master & pendukung: barang, barang_*, kontak, kontak_*, lokasi, satuan, harga, fifo_*, termin, transaksi_bayar, transaksi_draft*, …

Penting: Menambah tabel di PG tanpa update connector + TENANT_SCHEMAS + migrasi CH → consumer tidak konsumsi atau crash saat INSERT.


Topik internal Kafka Connect

Topik Fungsi
my_connect_offsets Offset connector (compacted)
my_connect_configs Config connector
my_connect_statuses Status task

Jangan dihapus pada reset terbatas; reset connector memakai prosedur tombstone/REST di RUNBOOK.


Consumer groups

Group Pemakai Perilaku
clickhouse-etl-group clickhouse-etl PM2 Steady-state CDC → CH
clickhouse-replay-<job_id> Job replay produksi (rencana) Isolasi replay per job

Env: KAFKA_BROKER=localhost:9092, TENANT_SCHEMAS=tenant_sparepart,tenant_leontech.


Format pesan (Debezium)

JSON dengan payload before / after, metadata schema, op (create/update/delete/read).

Consumer memetakan ke kolom CH: versi, is_deleted, op, tenant_kode (diderivasi dari nama schema).


Operasi DEV: reset & replay

Operasi Tool Mode
List lag per topik migrasi-ui DEV/ops
Hapus topik prefix erp. migrasi-ui global_reset DEV only
Reset consumer offset kafka_replay → earliest DEV — data CH bisa duplikat tanpa purge
Debezium connector reset migrasi-ui / REST DEV

Produksi: OPERATING_MODEL.md — tenant-scoped job, tanpa global reset.


Menambah tenant ke Kafka

  1. Buat schema PG + migrasi tenant
  2. ALTER PUBLICATION debezium_erp_pub ADD TABLE tenant_<kode>.transaksi, ...
  3. PUT table.include.list Debezium
  4. pm2 restart clickhouse-etl dengan TENANT_SCHEMAS diperbarui

Rujuk ../../database/cdc/RUNBOOK.md.


Diagram

flowchart LR
  PG[(PG WAL)] --> DEB[Debezium]
  DEB --> T1[erp.tenant_x.transaksi]
  DEB --> T2[erp.tenant_x.jurnal]
  T1 --> CG[clickhouse-etl-group]
  T2 --> CG
  CG --> CH[(erp_clickhouse)]

Dokumen terkait