Arsitektur Production-Grade AI Orchestration Layer (Cloudflare + Backend TS)¶
Dokumen ini merancang orchestration layer untuk menekan biaya token model premium (GPT/Claude) lewat kombinasi Workers AI, AI Gateway, Vectorize, Queues, semantic caching, dan routing multi-model.
1) Prinsip Arsitektur¶
- Cheap-first inference: tugas ringan (klasifikasi intent, ringkas context, ekstraksi entitas) wajib lewat Workers AI.
- Premium-last reasoning: GPT/Claude dipakai hanya bila confidence rendah atau kebutuhan reasoning kompleks.
- Retrieve before generate: lakukan retrieval + context compression sebelum request premium model.
- Cache aggressively: exact cache + semantic cache + partial cache (summary/chunk).
- Observe everything: semua call model lewat AI Gateway untuk metrik terpusat.
2) Diagram Arsitektur¶
flowchart LR
A[Flutter App] --> B[Node.js API Backend]
B --> C[Cloudflare Worker Orchestrator]
C --> D[Rate Limiter + Auth Guard]
D --> E[Intent & Complexity Classifier<br/>Workers AI]
E --> F{Cache Hit?}
F -- Exact hit --> G[(KV/R2 Exact Cache)]
F -- Semantic hit --> H[(Vectorize Semantic Cache)]
F -- Miss --> I[Retrieval Pipeline]
I --> J[(Vectorize Knowledge Index)]
I --> K[Context Compressor<br/>Workers AI Summarizer]
K --> L{Routing Policy Engine}
L -- ringan --> M[Workers AI Gen Model]
L -- berat --> N[AI Gateway -> GPT/Claude]
M --> O[Postprocessor + Guardrails]
N --> O
O --> P[Response + Cache Write]
P --> Q[Queues Async Jobs]
Q --> R[(PostgreSQL Cost Ledger)]
Q --> S[(Kafka Events)]
S --> T[(ClickHouse Analytics)]
C --> U[AI Gateway Logs/Trace]
B --> V[Business DB PostgreSQL]
3) Folder Structure (Clean Architecture)¶
ai-orchestrator/
apps/
worker-orchestrator/
src/
entrypoints/
http.ts
queue.ts
interfaces/
controllers/
chat-controller.ts
presenters/
response-presenter.ts
application/
use-cases/
process-chat.usecase.ts
embed-document.usecase.ts
services/
routing-policy.service.ts
cache-policy.service.ts
domain/
entities/
ai-request.ts
ai-response.ts
token-usage.ts
value-objects/
model-tier.ts
complexity-score.ts
repositories/
semantic-cache.repository.ts
usage-ledger.repository.ts
infrastructure/
cloudflare/
ai-gateway.client.ts
workers-ai.client.ts
vectorize.repository.ts
queues.publisher.ts
kv-cache.repository.ts
persistence/
postgres-usage.repository.ts
kafka-producer.ts
observability/
logger.ts
metrics.ts
tracing.ts
config/
env.ts
wrangler-env.ts
shared/
errors/
utils/
wrangler.toml
package.json
packages/
ai-contracts/
src/
dto/
interfaces/
lint-config/
tsconfig/
infra/
terraform/
dashboards/
4) Cloudflare Worker Setup¶
Komponen Worker:
- HTTP handler (fetch) untuk request sinkron.
- Queue consumer (queue) untuk pekerjaan async (embedding, refresh cache, cost rollup).
- Bindings: AI, VECTORIZE, KV, R2, QUEUE, D1/Hyperdrive optional, AI_GATEWAY endpoint.
Contoh wrangler.toml¶
name = "ai-orchestrator"
main = "src/entrypoints/http.ts"
compatibility_date = "2026-05-23"
[vars]
ENV = "production"
AI_GATEWAY_BASE_URL = "https://gateway.ai.cloudflare.com/v1/<account>/<gateway>"
PREMIUM_ROUTER_TIMEOUT_MS = "25000"
CHEAP_ROUTER_TIMEOUT_MS = "8000"
SEMANTIC_THRESHOLD = "0.90"
[[kv_namespaces]]
binding = "EXACT_CACHE_KV"
id = "xxxxxxxx"
[[vectorize]]
binding = "SEMANTIC_CACHE_INDEX"
index_name = "semantic-cache-v1"
[[vectorize]]
binding = "KNOWLEDGE_INDEX"
index_name = "knowledge-rag-v1"
[[queues.producers]]
binding = "AI_JOBS_QUEUE"
queue = "ai-jobs"
[[queues.consumers]]
queue = "ai-jobs"
max_batch_size = 50
max_batch_timeout = 5
max_retries = 8
[observability.logs]
enabled = true
head_sampling_rate = 1
5) AI Gateway Integration¶
Semua request model (termasuk premium provider) lewat AI Gateway agar: - tracking token & latency lintas provider konsisten, - policy enforcement (headers, timeout, retries), - centralized audit trail.
Gateway Client (TypeScript)¶
export interface GatewayChatRequest {
model: string;
messages: Array<{ role: "system" | "user" | "assistant"; content: string }>;
temperature?: number;
max_tokens?: number;
metadata?: Record<string, string>;
}
export class AIGatewayClient {
constructor(private readonly baseUrl: string, private readonly apiKey: string) {}
async chat(req: GatewayChatRequest): Promise<Response> {
return fetch(`${this.baseUrl}/chat/completions`, {
method: "POST",
headers: {
"content-type": "application/json",
authorization: `Bearer ${this.apiKey}`,
},
body: JSON.stringify(req),
});
}
}
6) Workers AI Integration (Cheap Tasks)¶
Gunakan Workers AI untuk: - intent classification, - prompt compression, - extract key facts, - lightweight generation.
export class WorkersAiClient {
constructor(private readonly ai: Ai) {}
classifyIntent(text: string) {
return this.ai.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `Klasifikasikan intent dan complexity (0-1): ${text}`,
});
}
summarizeContext(raw: string, maxTokens = 700) {
return this.ai.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `Ringkas context berikut maksimal ${maxTokens} token:\n${raw}`,
});
}
}
7) Semantic Cache Design¶
Layer cache¶
- L1 Exact Cache (KV): key = hash(normalized_prompt + tenant + policy_version).
- L2 Semantic Cache (Vectorize): simpan embedding pertanyaan + metadata jawaban.
- L3 Partial Cache: simpan ringkasan retrieval/chunk yang sering dipakai.
Metadata semantic cache¶
tenant_kodeintentcomplexity_scoremodel_tierprompt_fingerprintanswer_ref(R2 key)ttl_expired_atquality_score
Flow¶
- hit exact → return.
- miss exact → embedding query → topK semantic.
- similarity >= threshold & policy cocok → return cached answer.
- jika miss → lanjut RAG + inference, lalu tulis ke semua layer cache.
8) Embedding Pipeline¶
Sumber dokumen: - PostgreSQL (knowledge base operasional), - Kafka topics (event enrichment), - file batch (manual SOP).
Tahapan:
1. Ingestion job mendorong payload ke Queue.
2. Consumer memecah dokumen menjadi chunk (window + overlap).
3. Generate embedding (Workers AI embedding model).
4. Upsert ke Vectorize (KNOWLEDGE_INDEX).
5. Simpan doc manifest + version di PostgreSQL.
Skema chunk disarankan:
- doc_id, chunk_id, tenant_kode, judul, isi_chunk, source_type, source_updated_at, embedding_version.
9) RAG Retrieval Flow¶
- Normalisasi query user.
- Intent + complexity scoring (Workers AI).
- Query expansion ringan (optional, cheap model).
- Vector search topK di
KNOWLEDGE_INDEX. - Re-rank ringan (Workers AI).
- Context compression (Workers AI summarizer).
- Kirim context final ke policy router.
- Pilih model cheap/premium.
- Simpan citation/source references.
10) Queue Async Pipeline¶
Gunakan Queue untuk task non-blocking: - write-behind cache, - embedding refresh, - cost aggregation, - fallback retries provider, - audit/event delivery ke Kafka.
Contoh Consumer¶
export default {
async queue(batch: MessageBatch<AiJob>, env: Env): Promise<void> {
for (const msg of batch.messages) {
try {
switch (msg.body.type) {
case "EMBED_DOC":
await embedDocument(msg.body.payload, env);
break;
case "WRITE_USAGE":
await writeUsageLedger(msg.body.payload, env);
break;
default:
throw new Error(`Unknown job type: ${msg.body.type}`);
}
msg.ack();
} catch (err) {
if (msg.attempts >= 5) {
await sendToDlq(msg.body, env);
msg.ack();
} else {
msg.retry();
}
}
}
},
};
11) Retry Strategy¶
- Sync path (HTTP): max 1 retry untuk idempotent provider errors (429/503).
- Async path (Queue): exponential backoff + jitter; max retries 5-8.
- Circuit breaker per provider/model (open jika error rate > threshold).
- Fallback chain: 1) premium A, 2) premium B, 3) cheap safe-answer template bila semua gagal.
- Dead-letter queue wajib untuk investigasi.
12) Multi-Model Routing¶
Routing input: - complexity score, - budget tersisa tenant, - latency SLO, - sensitivity policy.
Routing output:
- cheap: Workers AI.
- balanced: model menengah gateway.
- premium: GPT/Claude.
Contoh policy: - Complexity < 0.45 + factual simple => cheap. - 0.45–0.75 => balanced dengan compressed context. - >0.75 atau butuh multi-step reasoning => premium.
13) Logging & Observability¶
Wajib log terstruktur JSON:
- request_id, tenant_kode, user_id, model, provider, latency_ms, prompt_tokens, completion_tokens, cache_status, route_tier.
Observability stack: - AI Gateway analytics untuk metrik model, - Worker logs + traces, - Kafka stream ke ClickHouse untuk dashboard biaya/performa, - alerting: p95 latency, cache miss spike, biaya/tenant spike.
14) Cost Tracking¶
Buat usage ledger per request: - dimensi: tenant, user, fitur, model, provider. - metrik: prompt tokens, completion tokens, cache hit ratio, USD estimate.
Alur:
1. Worker publish event WRITE_USAGE ke queue.
2. Consumer tulis ke PostgreSQL (pusat.ai_usage_ledger).
3. CDC (Debezium) -> Kafka -> ClickHouse untuk analitik near-real-time.
15) Rate Limiting¶
Layered limit: - per-IP (edge worker), - per-user, - per-tenant, - per-feature endpoint.
Strategy:
- token bucket sliding window.
- dynamic throttling bila budget tenant hampir habis.
- return header: x-ratelimit-limit, x-ratelimit-remaining, retry-after.
16) Error Handling¶
Error taxonomy:
- ValidationError (400),
- AuthError (401/403),
- RateLimitError (429),
- ProviderTimeoutError (504),
- ProviderUnavailableError (503),
- PolicyDeniedError (422).
Response selalu mengandung:
- request_id,
- error_code,
- retryable boolean.
17) Security Best Practices¶
- Simpan API key di secrets manager (
wrangler secret). - HMAC-sign internal webhook/event.
- PII redaction sebelum log dan sebelum cache.
- Encrypt data at rest (R2/KV/DB default + policy).
- Prompt injection defense: sanitize instruction, isolate tool context.
- Tenant isolation ketat pada cache key + vector metadata filter
tenant_kode.
18) Example API Endpoints¶
POST /v1/ai/chat-> sync orchestrated response.POST /v1/ai/embed-> enqueue embedding job.GET /v1/ai/usage-> usage summary tenant.POST /v1/ai/cache/invalidate-> invalidate semantic cache by tag.GET /v1/ai/health-> dependency and model routing health.
19) TypeScript Interfaces¶
export type ModelTier = "cheap" | "balanced" | "premium";
export interface AiOrchestrationRequest {
tenantKode: string;
userId: string;
sessionId: string;
prompt: string;
feature: "chat" | "copilot" | "summary";
maxLatencyMs?: number;
}
export interface RoutingDecision {
tier: ModelTier;
model: string;
reason: string;
complexityScore: number;
}
export interface CacheLookupResult {
status: "exact_hit" | "semantic_hit" | "miss";
answer?: string;
similarity?: number;
}
export interface UsageEvent {
requestId: string;
tenantKode: string;
model: string;
provider: string;
promptTokens: number;
completionTokens: number;
estimatedUsd: number;
cacheStatus: "exact_hit" | "semantic_hit" | "miss";
createdAt: string;
}
20) Example Environment Variables¶
ENV=production
AI_GATEWAY_BASE_URL=https://gateway.ai.cloudflare.com/v1/<account>/<gateway>
AI_GATEWAY_API_KEY=***
WORKERS_AI_MODEL_CHEAP=@cf/meta/llama-3.1-8b-instruct
WORKERS_AI_EMBED_MODEL=@cf/baai/bge-base-en-v1.5
PREMIUM_MODEL_PRIMARY=gpt-5
PREMIUM_MODEL_SECONDARY=claude-sonnet-4
SEMANTIC_CACHE_THRESHOLD=0.90
CACHE_TTL_SECONDS=86400
QUEUE_MAX_RETRIES=8
KAFKA_BROKERS=broker1:9092,broker2:9092
PG_URL=postgres://...
CLICKHOUSE_URL=https://...
21) Example Vectorize Schema (Metadata Contract)¶
Konsep record metadata untuk index:
{
"id": "tenant_abcd:doc123:chunk07",
"values": [0.123, 0.456],
"metadata": {
"tenant_kode": "abcd",
"doc_id": "doc123",
"chunk_id": "chunk07",
"source_type": "sop",
"updated_at": "2026-05-23T10:00:00Z",
"tags": ["refund", "stok"]
}
}
22) Contoh Flow Request End-to-End¶
- Flutter kirim prompt ke backend Node.js.
- Backend validasi auth + tenant lalu forward ke Worker orchestrator.
- Worker cek rate limit + exact cache.
- Jika miss, hit semantic cache via Vectorize.
- Jika miss, jalankan RAG retrieval dan summarization (Workers AI).
- Routing engine memutuskan cheap/premium.
- Call model via AI Gateway (premium) atau direct Workers AI (cheap).
- Simpan response ke cache + kirim usage event ke Queue.
- Queue consumer tulis ledger ke PostgreSQL + publish analytics ke Kafka.
- Kafka sink ke ClickHouse untuk dashboard biaya dan performa.
23) Deployment Strategy¶
- Pisahkan env:
dev,staging,proddengan index/cache namespace berbeda. - Blue/green deploy Worker menggunakan environment route terpisah.
- Canary policy: 5% traffic ke routing policy versi baru.
- Versioning prompt template dan routing policy (
policy_version). - Rollback cepat: revert route atau set feature flag pada config store.
24) CI/CD Recommendation¶
Pipeline minimum: 1. lint + typecheck + unit test, 2. integration test (mock AI Gateway + Vectorize), 3. policy regression test (routing decisions snapshot), 4. security scan secret/leak, 5. deploy staging, 6. smoke test, 7. manual approval, 8. deploy prod + post-deploy SLO check.
Tooling saran: - GitHub Actions / GitLab CI, - Wrangler deploy per environment, - OpenTelemetry export ke observability platform.
25) Scalability Considerations¶
- Worker stateless -> horizontal scaling otomatis di edge.
- Pisahkan queue berdasar workload (
ai-jobs-embed,ai-jobs-usage,ai-jobs-retry). - Batasi ukuran context via summarizer agar latency stabil.
- TTL cache adaptif berdasarkan popularitas query.
- Backpressure ke premium provider saat error/latency naik.
- Sharding analytics ClickHouse berdasar tenant/time untuk query biaya cepat.
26) Contoh Pseudocode Orchestrator Utama¶
async function processChat(req: AiOrchestrationRequest, env: Env) {
await enforceRateLimit(req);
const exact = await exactCache.get(req);
if (exact) return exact;
const semantic = await semanticCache.lookup(req);
if (semantic.hit) return semantic.answer;
const complexity = await workersAi.classifyIntent(req.prompt);
const ragContext = await ragPipeline.buildContext(req, complexity);
const compressed = await workersAi.summarizeContext(ragContext);
const route = router.decide({ complexity, budget: await budgetOf(req.tenantKode) });
const answer = route.tier === "premium"
? await aiGateway.chat(buildPremiumRequest(req, compressed))
: await workersAi.generate(buildCheapRequest(req, compressed));
await cacheWriter.writeAll(req, answer, route);
await queue.publishUsage(req, route, answer.usage);
return answer;
}
Dokumen ini bisa dijadikan baseline implementasi modular dan clean architecture tanpa mengunci Anda ke satu provider premium tertentu.