Lewati ke isi

Arsitektur Production-Grade AI Orchestration Layer (Cloudflare + Backend TS)

Dokumen ini merancang orchestration layer untuk menekan biaya token model premium (GPT/Claude) lewat kombinasi Workers AI, AI Gateway, Vectorize, Queues, semantic caching, dan routing multi-model.

1) Prinsip Arsitektur

  • Cheap-first inference: tugas ringan (klasifikasi intent, ringkas context, ekstraksi entitas) wajib lewat Workers AI.
  • Premium-last reasoning: GPT/Claude dipakai hanya bila confidence rendah atau kebutuhan reasoning kompleks.
  • Retrieve before generate: lakukan retrieval + context compression sebelum request premium model.
  • Cache aggressively: exact cache + semantic cache + partial cache (summary/chunk).
  • Observe everything: semua call model lewat AI Gateway untuk metrik terpusat.

2) Diagram Arsitektur

flowchart LR
    A[Flutter App] --> B[Node.js API Backend]
    B --> C[Cloudflare Worker Orchestrator]

    C --> D[Rate Limiter + Auth Guard]
    D --> E[Intent & Complexity Classifier<br/>Workers AI]

    E --> F{Cache Hit?}
    F -- Exact hit --> G[(KV/R2 Exact Cache)]
    F -- Semantic hit --> H[(Vectorize Semantic Cache)]
    F -- Miss --> I[Retrieval Pipeline]

    I --> J[(Vectorize Knowledge Index)]
    I --> K[Context Compressor<br/>Workers AI Summarizer]

    K --> L{Routing Policy Engine}
    L -- ringan --> M[Workers AI Gen Model]
    L -- berat --> N[AI Gateway -> GPT/Claude]

    M --> O[Postprocessor + Guardrails]
    N --> O

    O --> P[Response + Cache Write]
    P --> Q[Queues Async Jobs]
    Q --> R[(PostgreSQL Cost Ledger)]
    Q --> S[(Kafka Events)]
    S --> T[(ClickHouse Analytics)]

    C --> U[AI Gateway Logs/Trace]
    B --> V[Business DB PostgreSQL]

3) Folder Structure (Clean Architecture)

ai-orchestrator/
  apps/
    worker-orchestrator/
      src/
        entrypoints/
          http.ts
          queue.ts
        interfaces/
          controllers/
            chat-controller.ts
          presenters/
            response-presenter.ts
        application/
          use-cases/
            process-chat.usecase.ts
            embed-document.usecase.ts
          services/
            routing-policy.service.ts
            cache-policy.service.ts
        domain/
          entities/
            ai-request.ts
            ai-response.ts
            token-usage.ts
          value-objects/
            model-tier.ts
            complexity-score.ts
          repositories/
            semantic-cache.repository.ts
            usage-ledger.repository.ts
        infrastructure/
          cloudflare/
            ai-gateway.client.ts
            workers-ai.client.ts
            vectorize.repository.ts
            queues.publisher.ts
            kv-cache.repository.ts
          persistence/
            postgres-usage.repository.ts
            kafka-producer.ts
          observability/
            logger.ts
            metrics.ts
            tracing.ts
        config/
          env.ts
          wrangler-env.ts
        shared/
          errors/
          utils/
      wrangler.toml
      package.json
  packages/
    ai-contracts/
      src/
        dto/
        interfaces/
    lint-config/
    tsconfig/
  infra/
    terraform/
    dashboards/

4) Cloudflare Worker Setup

Komponen Worker: - HTTP handler (fetch) untuk request sinkron. - Queue consumer (queue) untuk pekerjaan async (embedding, refresh cache, cost rollup). - Bindings: AI, VECTORIZE, KV, R2, QUEUE, D1/Hyperdrive optional, AI_GATEWAY endpoint.

Contoh wrangler.toml

name = "ai-orchestrator"
main = "src/entrypoints/http.ts"
compatibility_date = "2026-05-23"

[vars]
ENV = "production"
AI_GATEWAY_BASE_URL = "https://gateway.ai.cloudflare.com/v1/<account>/<gateway>"
PREMIUM_ROUTER_TIMEOUT_MS = "25000"
CHEAP_ROUTER_TIMEOUT_MS = "8000"
SEMANTIC_THRESHOLD = "0.90"

[[kv_namespaces]]
binding = "EXACT_CACHE_KV"
id = "xxxxxxxx"

[[vectorize]]
binding = "SEMANTIC_CACHE_INDEX"
index_name = "semantic-cache-v1"

[[vectorize]]
binding = "KNOWLEDGE_INDEX"
index_name = "knowledge-rag-v1"

[[queues.producers]]
binding = "AI_JOBS_QUEUE"
queue = "ai-jobs"

[[queues.consumers]]
queue = "ai-jobs"
max_batch_size = 50
max_batch_timeout = 5
max_retries = 8

[observability.logs]
enabled = true
head_sampling_rate = 1

5) AI Gateway Integration

Semua request model (termasuk premium provider) lewat AI Gateway agar: - tracking token & latency lintas provider konsisten, - policy enforcement (headers, timeout, retries), - centralized audit trail.

Gateway Client (TypeScript)

export interface GatewayChatRequest {
  model: string;
  messages: Array<{ role: "system" | "user" | "assistant"; content: string }>;
  temperature?: number;
  max_tokens?: number;
  metadata?: Record<string, string>;
}

export class AIGatewayClient {
  constructor(private readonly baseUrl: string, private readonly apiKey: string) {}

  async chat(req: GatewayChatRequest): Promise<Response> {
    return fetch(`${this.baseUrl}/chat/completions`, {
      method: "POST",
      headers: {
        "content-type": "application/json",
        authorization: `Bearer ${this.apiKey}`,
      },
      body: JSON.stringify(req),
    });
  }
}

6) Workers AI Integration (Cheap Tasks)

Gunakan Workers AI untuk: - intent classification, - prompt compression, - extract key facts, - lightweight generation.

export class WorkersAiClient {
  constructor(private readonly ai: Ai) {}

  classifyIntent(text: string) {
    return this.ai.run("@cf/meta/llama-3.1-8b-instruct", {
      prompt: `Klasifikasikan intent dan complexity (0-1): ${text}`,
    });
  }

  summarizeContext(raw: string, maxTokens = 700) {
    return this.ai.run("@cf/meta/llama-3.1-8b-instruct", {
      prompt: `Ringkas context berikut maksimal ${maxTokens} token:\n${raw}`,
    });
  }
}

7) Semantic Cache Design

Layer cache

  1. L1 Exact Cache (KV): key = hash(normalized_prompt + tenant + policy_version).
  2. L2 Semantic Cache (Vectorize): simpan embedding pertanyaan + metadata jawaban.
  3. L3 Partial Cache: simpan ringkasan retrieval/chunk yang sering dipakai.

Metadata semantic cache

  • tenant_kode
  • intent
  • complexity_score
  • model_tier
  • prompt_fingerprint
  • answer_ref (R2 key)
  • ttl_expired_at
  • quality_score

Flow

  • hit exact → return.
  • miss exact → embedding query → topK semantic.
  • similarity >= threshold & policy cocok → return cached answer.
  • jika miss → lanjut RAG + inference, lalu tulis ke semua layer cache.

8) Embedding Pipeline

Sumber dokumen: - PostgreSQL (knowledge base operasional), - Kafka topics (event enrichment), - file batch (manual SOP).

Tahapan: 1. Ingestion job mendorong payload ke Queue. 2. Consumer memecah dokumen menjadi chunk (window + overlap). 3. Generate embedding (Workers AI embedding model). 4. Upsert ke Vectorize (KNOWLEDGE_INDEX). 5. Simpan doc manifest + version di PostgreSQL.

Skema chunk disarankan: - doc_id, chunk_id, tenant_kode, judul, isi_chunk, source_type, source_updated_at, embedding_version.


9) RAG Retrieval Flow

  1. Normalisasi query user.
  2. Intent + complexity scoring (Workers AI).
  3. Query expansion ringan (optional, cheap model).
  4. Vector search topK di KNOWLEDGE_INDEX.
  5. Re-rank ringan (Workers AI).
  6. Context compression (Workers AI summarizer).
  7. Kirim context final ke policy router.
  8. Pilih model cheap/premium.
  9. Simpan citation/source references.

10) Queue Async Pipeline

Gunakan Queue untuk task non-blocking: - write-behind cache, - embedding refresh, - cost aggregation, - fallback retries provider, - audit/event delivery ke Kafka.

Contoh Consumer

export default {
  async queue(batch: MessageBatch<AiJob>, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      try {
        switch (msg.body.type) {
          case "EMBED_DOC":
            await embedDocument(msg.body.payload, env);
            break;
          case "WRITE_USAGE":
            await writeUsageLedger(msg.body.payload, env);
            break;
          default:
            throw new Error(`Unknown job type: ${msg.body.type}`);
        }
        msg.ack();
      } catch (err) {
        if (msg.attempts >= 5) {
          await sendToDlq(msg.body, env);
          msg.ack();
        } else {
          msg.retry();
        }
      }
    }
  },
};

11) Retry Strategy

  • Sync path (HTTP): max 1 retry untuk idempotent provider errors (429/503).
  • Async path (Queue): exponential backoff + jitter; max retries 5-8.
  • Circuit breaker per provider/model (open jika error rate > threshold).
  • Fallback chain: 1) premium A, 2) premium B, 3) cheap safe-answer template bila semua gagal.
  • Dead-letter queue wajib untuk investigasi.

12) Multi-Model Routing

Routing input: - complexity score, - budget tersisa tenant, - latency SLO, - sensitivity policy.

Routing output: - cheap: Workers AI. - balanced: model menengah gateway. - premium: GPT/Claude.

Contoh policy: - Complexity < 0.45 + factual simple => cheap. - 0.45–0.75 => balanced dengan compressed context. - >0.75 atau butuh multi-step reasoning => premium.


13) Logging & Observability

Wajib log terstruktur JSON: - request_id, tenant_kode, user_id, model, provider, latency_ms, prompt_tokens, completion_tokens, cache_status, route_tier.

Observability stack: - AI Gateway analytics untuk metrik model, - Worker logs + traces, - Kafka stream ke ClickHouse untuk dashboard biaya/performa, - alerting: p95 latency, cache miss spike, biaya/tenant spike.


14) Cost Tracking

Buat usage ledger per request: - dimensi: tenant, user, fitur, model, provider. - metrik: prompt tokens, completion tokens, cache hit ratio, USD estimate.

Alur: 1. Worker publish event WRITE_USAGE ke queue. 2. Consumer tulis ke PostgreSQL (pusat.ai_usage_ledger). 3. CDC (Debezium) -> Kafka -> ClickHouse untuk analitik near-real-time.


15) Rate Limiting

Layered limit: - per-IP (edge worker), - per-user, - per-tenant, - per-feature endpoint.

Strategy: - token bucket sliding window. - dynamic throttling bila budget tenant hampir habis. - return header: x-ratelimit-limit, x-ratelimit-remaining, retry-after.


16) Error Handling

Error taxonomy: - ValidationError (400), - AuthError (401/403), - RateLimitError (429), - ProviderTimeoutError (504), - ProviderUnavailableError (503), - PolicyDeniedError (422).

Response selalu mengandung: - request_id, - error_code, - retryable boolean.


17) Security Best Practices

  • Simpan API key di secrets manager (wrangler secret).
  • HMAC-sign internal webhook/event.
  • PII redaction sebelum log dan sebelum cache.
  • Encrypt data at rest (R2/KV/DB default + policy).
  • Prompt injection defense: sanitize instruction, isolate tool context.
  • Tenant isolation ketat pada cache key + vector metadata filter tenant_kode.

18) Example API Endpoints

  • POST /v1/ai/chat -> sync orchestrated response.
  • POST /v1/ai/embed -> enqueue embedding job.
  • GET /v1/ai/usage -> usage summary tenant.
  • POST /v1/ai/cache/invalidate -> invalidate semantic cache by tag.
  • GET /v1/ai/health -> dependency and model routing health.

19) TypeScript Interfaces

export type ModelTier = "cheap" | "balanced" | "premium";

export interface AiOrchestrationRequest {
  tenantKode: string;
  userId: string;
  sessionId: string;
  prompt: string;
  feature: "chat" | "copilot" | "summary";
  maxLatencyMs?: number;
}

export interface RoutingDecision {
  tier: ModelTier;
  model: string;
  reason: string;
  complexityScore: number;
}

export interface CacheLookupResult {
  status: "exact_hit" | "semantic_hit" | "miss";
  answer?: string;
  similarity?: number;
}

export interface UsageEvent {
  requestId: string;
  tenantKode: string;
  model: string;
  provider: string;
  promptTokens: number;
  completionTokens: number;
  estimatedUsd: number;
  cacheStatus: "exact_hit" | "semantic_hit" | "miss";
  createdAt: string;
}

20) Example Environment Variables

ENV=production
AI_GATEWAY_BASE_URL=https://gateway.ai.cloudflare.com/v1/<account>/<gateway>
AI_GATEWAY_API_KEY=***
WORKERS_AI_MODEL_CHEAP=@cf/meta/llama-3.1-8b-instruct
WORKERS_AI_EMBED_MODEL=@cf/baai/bge-base-en-v1.5
PREMIUM_MODEL_PRIMARY=gpt-5
PREMIUM_MODEL_SECONDARY=claude-sonnet-4
SEMANTIC_CACHE_THRESHOLD=0.90
CACHE_TTL_SECONDS=86400
QUEUE_MAX_RETRIES=8
KAFKA_BROKERS=broker1:9092,broker2:9092
PG_URL=postgres://...
CLICKHOUSE_URL=https://...

21) Example Vectorize Schema (Metadata Contract)

Konsep record metadata untuk index:

{
  "id": "tenant_abcd:doc123:chunk07",
  "values": [0.123, 0.456],
  "metadata": {
    "tenant_kode": "abcd",
    "doc_id": "doc123",
    "chunk_id": "chunk07",
    "source_type": "sop",
    "updated_at": "2026-05-23T10:00:00Z",
    "tags": ["refund", "stok"]
  }
}

22) Contoh Flow Request End-to-End

  1. Flutter kirim prompt ke backend Node.js.
  2. Backend validasi auth + tenant lalu forward ke Worker orchestrator.
  3. Worker cek rate limit + exact cache.
  4. Jika miss, hit semantic cache via Vectorize.
  5. Jika miss, jalankan RAG retrieval dan summarization (Workers AI).
  6. Routing engine memutuskan cheap/premium.
  7. Call model via AI Gateway (premium) atau direct Workers AI (cheap).
  8. Simpan response ke cache + kirim usage event ke Queue.
  9. Queue consumer tulis ledger ke PostgreSQL + publish analytics ke Kafka.
  10. Kafka sink ke ClickHouse untuk dashboard biaya dan performa.

23) Deployment Strategy

  • Pisahkan env: dev, staging, prod dengan index/cache namespace berbeda.
  • Blue/green deploy Worker menggunakan environment route terpisah.
  • Canary policy: 5% traffic ke routing policy versi baru.
  • Versioning prompt template dan routing policy (policy_version).
  • Rollback cepat: revert route atau set feature flag pada config store.

24) CI/CD Recommendation

Pipeline minimum: 1. lint + typecheck + unit test, 2. integration test (mock AI Gateway + Vectorize), 3. policy regression test (routing decisions snapshot), 4. security scan secret/leak, 5. deploy staging, 6. smoke test, 7. manual approval, 8. deploy prod + post-deploy SLO check.

Tooling saran: - GitHub Actions / GitLab CI, - Wrangler deploy per environment, - OpenTelemetry export ke observability platform.


25) Scalability Considerations

  • Worker stateless -> horizontal scaling otomatis di edge.
  • Pisahkan queue berdasar workload (ai-jobs-embed, ai-jobs-usage, ai-jobs-retry).
  • Batasi ukuran context via summarizer agar latency stabil.
  • TTL cache adaptif berdasarkan popularitas query.
  • Backpressure ke premium provider saat error/latency naik.
  • Sharding analytics ClickHouse berdasar tenant/time untuk query biaya cepat.

26) Contoh Pseudocode Orchestrator Utama

async function processChat(req: AiOrchestrationRequest, env: Env) {
  await enforceRateLimit(req);

  const exact = await exactCache.get(req);
  if (exact) return exact;

  const semantic = await semanticCache.lookup(req);
  if (semantic.hit) return semantic.answer;

  const complexity = await workersAi.classifyIntent(req.prompt);
  const ragContext = await ragPipeline.buildContext(req, complexity);
  const compressed = await workersAi.summarizeContext(ragContext);
  const route = router.decide({ complexity, budget: await budgetOf(req.tenantKode) });

  const answer = route.tier === "premium"
    ? await aiGateway.chat(buildPremiumRequest(req, compressed))
    : await workersAi.generate(buildCheapRequest(req, compressed));

  await cacheWriter.writeAll(req, answer, route);
  await queue.publishUsage(req, route, answer.usage);

  return answer;
}

Dokumen ini bisa dijadikan baseline implementasi modular dan clean architecture tanpa mengunci Anda ke satu provider premium tertentu.