Change Data Capture với Debezium: Đồng bộ dữ liệu real-time cho Microservices

Posted on: 4/18/2026 12:10:44 PM

< 1sĐộ trễ trung bình log-based CDC
200+Connectors hỗ trợ bởi cộng đồng
0%Overhead lên source DB (log-based)
100K+GitHub stars hệ sinh thái Debezium

1. Change Data Capture là gì và tại sao quan trọng?

Change Data Capture (CDC) là kỹ thuật phát hiện và ghi nhận mọi thay đổi (INSERT, UPDATE, DELETE) xảy ra trong database, rồi truyền tải chúng dưới dạng event stream đến các hệ thống downstream. Thay vì các hệ thống phải liên tục polling database để kiểm tra "có gì mới không?", CDC đẩy thay đổi ra ngoài ngay khi chúng xảy ra — biến database thành một nguồn phát sự kiện real-time.

Trong kiến trúc microservices hiện đại, CDC giải quyết một bài toán cốt lõi: làm thế nào để đồng bộ dữ liệu giữa các service mà không cần tight coupling? Khi Service A ghi dữ liệu vào PostgreSQL, Service B (search), Service C (analytics), Service D (cache) đều cần biết về thay đổi đó — nhưng Service A không nên phải gọi API đến từng service. CDC giải quyết điều này bằng cách đọc transaction log của database và phát event tự động.

💡 Tại sao không dùng Dual Write?

Nhiều team chọn cách "ghi vào DB rồi publish event" (dual write). Nhưng pattern này không đảm bảo atomicity — nếu DB commit thành công nhưng message broker fail, dữ liệu sẽ không nhất quán. CDC loại bỏ hoàn toàn vấn đề này vì chỉ đọc từ transaction log đã committed.

Các use case chính của CDC

CDC không chỉ dùng cho data sync đơn thuần. Dưới đây là các scenario phổ biến trong production:

  • Event-Driven Architecture: Biến mọi thay đổi trong database thành domain event, giúp các microservices phản ứng real-time mà không cần gọi API lẫn nhau
  • Cache Invalidation: Tự động invalidate hoặc update Redis/Memcached khi dữ liệu gốc thay đổi — giải quyết bài toán cache consistency triệt để
  • Search Index Sync: Đồng bộ dữ liệu từ OLTP database sang Elasticsearch/OpenSearch gần như real-time, không cần batch reindex
  • Data Warehouse / Lakehouse: Stream data từ operational DB sang ClickHouse, Apache Iceberg, hoặc BigQuery để phân tích
  • Audit Trail: Ghi nhận lịch sử thay đổi đầy đủ — ai thay đổi gì, lúc nào, giá trị cũ/mới
  • Database Migration: Zero-downtime migration giữa database engines bằng cách CDC từ source sang target, rồi cutover khi đã sync xong

2. Các phương pháp CDC: từ thô sơ đến chuyên nghiệp

Có 4 phương pháp CDC chính, mỗi cách có trade-off riêng về hiệu năng, độ chính xác, và complexity:

graph LR
    A["Timestamp-based
Polling"] --> B["Trigger-based
CDC"] B --> C["Query-based
CDC"] C --> D["Log-based
CDC"] style A fill:#f8f9fa,stroke:#e94560,color:#2c3e50 style B fill:#f8f9fa,stroke:#e94560,color:#2c3e50 style C fill:#f8f9fa,stroke:#e94560,color:#2c3e50 style D fill:#e94560,stroke:#fff,color:#fff

Các phương pháp CDC xếp theo mức độ tinh vi — Log-based là giải pháp production-grade

Phương phápCách hoạt độngƯu điểmNhược điểm
Timestamp-based Query records có updated_at > last check Đơn giản, không cần tool đặc biệt Bỏ sót DELETE, phụ thuộc vào column timestamp, gây load cho DB
Trigger-based DB trigger ghi thay đổi vào shadow table Bắt được mọi loại thay đổi Overhead lớn lên write path, khó maintain, không portable
Query-based Polling định kỳ với diff detection Hoạt động với mọi DB Latency cao (phút), gây load khi table lớn, bỏ sót intermediate changes
Log-based Đọc transaction log (WAL/binlog/oplog) Zero impact lên DB, capture mọi thay đổi, thứ tự chính xác, latency thấp Phức tạp hơn, phụ thuộc DB-specific log format

✅ Best Practice

Cho mọi production workload, luôn chọn log-based CDC. Các phương pháp khác chỉ phù hợp cho development/prototyping hoặc những database không hỗ trợ logical replication. Log-based CDC là phương pháp duy nhất đảm bảo: zero overhead, capture DELETE, giữ đúng thứ tự transaction, và latency dưới giây.

3. Debezium: Kiến trúc và cách hoạt động

Debezium là nền tảng CDC mã nguồn mở phổ biến nhất hiện nay, được phát triển bởi Red Hat và cộng đồng. Debezium đọc transaction log của database và chuyển đổi mỗi row-level change thành một structured event, rồi publish lên Kafka (hoặc các messaging system khác qua Debezium Server).

Kiến trúc tổng thể

graph TB
    subgraph Sources["Source Databases"]
        PG["PostgreSQL
WAL"] MY["MySQL
Binlog"] MG["MongoDB
Oplog"] SS["SQL Server
CT/CDC"] end subgraph KC["Kafka Connect Cluster"] DC1["Debezium
Connector 1"] DC2["Debezium
Connector 2"] DC3["Debezium
Connector N"] end subgraph Kafka["Apache Kafka"] T1["Topic: db.schema.users"] T2["Topic: db.schema.orders"] T3["Topic: db.schema.products"] SR["Schema Registry"] end subgraph Consumers["Downstream Consumers"] ES["Elasticsearch"] RD["Redis Cache"] CH["ClickHouse"] MS["Microservices"] end PG --> DC1 MY --> DC2 MG --> DC3 SS --> DC1 DC1 --> T1 DC2 --> T2 DC3 --> T3 DC1 --> SR DC2 --> SR T1 --> ES T2 --> RD T1 --> CH T3 --> MS style PG fill:#336791,stroke:#fff,color:#fff style MY fill:#4479A1,stroke:#fff,color:#fff style MG fill:#4DB33D,stroke:#fff,color:#fff style SS fill:#CC2927,stroke:#fff,color:#fff style DC1 fill:#e94560,stroke:#fff,color:#fff style DC2 fill:#e94560,stroke:#fff,color:#fff style DC3 fill:#e94560,stroke:#fff,color:#fff style T1 fill:#f8f9fa,stroke:#e94560,color:#2c3e50 style T2 fill:#f8f9fa,stroke:#e94560,color:#2c3e50 style T3 fill:#f8f9fa,stroke:#e94560,color:#2c3e50 style SR fill:#f8f9fa,stroke:#e0e0e0,color:#2c3e50 style ES fill:#2c3e50,stroke:#fff,color:#fff style RD fill:#DC382D,stroke:#fff,color:#fff style CH fill:#FFCC01,stroke:#2c3e50,color:#2c3e50 style MS fill:#2c3e50,stroke:#fff,color:#fff

Kiến trúc end-to-end: Debezium đọc transaction log → publish event lên Kafka → consumers xử lý downstream

Ba thành phần chính trong kiến trúc Debezium:

  • Debezium Connector: Chạy như một Kafka Connect plugin, mỗi connector giám sát một database instance. Connector đọc transaction log, parse thành change event, và produce lên Kafka topic
  • Apache Kafka: Đóng vai trò message broker trung tâm. Mỗi table trong source database tương ứng với một Kafka topic. Kafka đảm bảo durability, ordering, và cho phép nhiều consumer đọc cùng stream
  • Schema Registry: Lưu trữ schema của change event (Avro/JSON Schema/Protobuf), giúp consumers deserialize đúng format và hỗ trợ schema evolution

Cấu trúc một Change Event

Mỗi event Debezium phát ra có cấu trúc chuẩn gồm key (primary key của record) và value (chi tiết thay đổi):

{
  "schema": { ... },
  "payload": {
    "before": {
      "id": 1001,
      "name": "Nguyễn Văn A",
      "email": "a@example.com",
      "status": "active"
    },
    "after": {
      "id": 1001,
      "name": "Nguyễn Văn A",
      "email": "newemail@example.com",
      "status": "active"
    },
    "source": {
      "version": "2.7.0",
      "connector": "postgresql",
      "name": "myapp",
      "ts_ms": 1713436800000,
      "db": "mydb",
      "schema": "public",
      "table": "users",
      "lsn": 123456789,
      "txId": 5678
    },
    "op": "u",
    "ts_ms": 1713436800123,
    "transaction": {
      "id": "5678",
      "total_order": 3,
      "data_collection_order": 1
    }
  }
}

Trường op cho biết loại thao tác: "c" = create (INSERT), "u" = update, "d" = delete, "r" = read (snapshot). Trường beforeafter chứa trạng thái row trước và sau thay đổi — đặc biệt hữu ích cho audit trail và diff detection.

4. Debezium Connectors: Hỗ trợ đa database

Debezium hỗ trợ hầu hết các database phổ biến, mỗi connector tận dụng cơ chế replication riêng của từng engine:

DatabaseCơ chế CDCĐặc điểm nổi bật
PostgreSQL Logical Replication (pgoutput / wal2json) Hỗ trợ tốt nhất, có thể dùng publication để filter table. Cần set wal_level=logical
MySQL/MariaDB Binlog (ROW format) Đọc binary log với GTID tracking. Cần binlog_format=ROWbinlog_row_image=FULL
SQL Server Change Tracking / CDC tables Dùng built-in CDC feature của SQL Server. Cần enable CDC trên database và từng table cần capture
MongoDB Oplog / Change Streams Sử dụng Change Streams API (MongoDB 3.6+). Hỗ trợ resume token để không mất event
Oracle LogMiner / XStream Đọc redo log qua LogMiner API. Yêu cầu supplemental logging enable
Cassandra Commit Log Community connector, đọc commit log segments. Phù hợp cho CDC từ wide-column store

⚠️ Lưu ý với SQL Server

SQL Server CDC yêu cầu SQL Server Agent đang chạy và database đã bật CDC (sys.sp_cdc_enable_db). Nếu dùng Azure SQL Database, cần tier Standard trở lên. Debezium sẽ đọc từ các change table (cdc.*) thay vì trực tiếp transaction log.

5. Hands-on: Setup Debezium với PostgreSQL

Dưới đây là hướng dẫn thiết lập CDC pipeline hoàn chỉnh với Docker Compose — từ PostgreSQL source đến Kafka consumer:

Docker Compose cho môi trường CDC

version: '3.8'
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: myapp
      POSTGRES_USER: appuser
      POSTGRES_PASSWORD: secret
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
      - "-c"
      - "max_replication_slots=4"
      - "-c"
      - "max_wal_senders=4"
    ports:
      - "5432:5432"

  kafka:
    image: bitnami/kafka:3.7
    environment:
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    ports:
      - "9092:9092"

  connect:
    image: debezium/connect:2.7
    depends_on:
      - kafka
      - postgres
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: cdc-connect-cluster
      CONFIG_STORAGE_TOPIC: _connect-configs
      OFFSET_STORAGE_TOPIC: _connect-offsets
      STATUS_STORAGE_TOPIC: _connect-status
    ports:
      - "8083:8083"

Đăng ký Debezium Connector

Sau khi các container đã chạy, đăng ký connector qua REST API:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "myapp-postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "appuser",
      "database.password": "secret",
      "database.dbname": "myapp",
      "topic.prefix": "myapp",
      "schema.include.list": "public",
      "table.include.list": "public.users,public.orders",
      "plugin.name": "pgoutput",
      "slot.name": "debezium_slot",
      "publication.name": "debezium_pub",
      "snapshot.mode": "initial",
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.drop.tombstones": "false",
      "transforms.unwrap.delete.handling.mode": "rewrite",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }'

Các config quan trọng:

  • topic.prefix: Prefix cho tên Kafka topic — event từ table public.users sẽ vào topic myapp.public.users
  • snapshot.mode: initial: Lần đầu chạy, Debezium snapshot toàn bộ data hiện có rồi mới chuyển sang đọc WAL
  • plugin.name: pgoutput: Dùng logical decoding plugin built-in của PostgreSQL (không cần cài thêm extension)
  • ExtractNewRecordState: Transform giúp flatten event — thay vì before/after envelope, consumer nhận trực tiếp record state mới

6. CDC Patterns nâng cao trong Production

Transactional Outbox Pattern

Outbox Pattern là một trong những ứng dụng quan trọng nhất của CDC. Thay vì publish event trực tiếp lên message broker (dual write), service ghi event vào một outbox table trong cùng transaction với business data. Debezium đọc outbox table và publish event lên Kafka.

sequenceDiagram
    participant App as Application
    participant DB as PostgreSQL
    participant DBZ as Debezium
    participant K as Kafka
    participant C as Consumer

    App->>DB: BEGIN TRANSACTION
    App->>DB: INSERT INTO orders (...)
    App->>DB: INSERT INTO outbox (aggregate_type, payload)
    App->>DB: COMMIT

    DBZ->>DB: Read WAL (outbox table changes)
    DBZ->>K: Publish OrderCreated event
    K->>C: Consume event
    C->>C: Update search index / Send email / Update cache

Outbox Pattern: event được ghi atomically cùng business data, Debezium relay lên Kafka

Debezium có sẵn Outbox Event Router SMT (Single Message Transform) để xử lý pattern này:

-- Outbox table schema
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id VARCHAR(255) NOT NULL,
    type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Ví dụ: khi tạo order
BEGIN;
  INSERT INTO orders (id, customer_id, total, status)
    VALUES ('ord-001', 'cust-123', 599000, 'pending');

  INSERT INTO outbox (aggregate_type, aggregate_id, type, payload)
    VALUES (
      'Order', 'ord-001', 'OrderCreated',
      '{"orderId":"ord-001","customerId":"cust-123","total":599000}'::jsonb
    );
COMMIT;

Config connector cho Outbox:

{
  "transforms": "outbox",
  "transforms.outbox.type":
    "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.table.field.event.id": "id",
  "transforms.outbox.table.field.event.key": "aggregate_id",
  "transforms.outbox.table.field.event.type": "type",
  "transforms.outbox.table.field.event.payload": "payload",
  "transforms.outbox.route.by.field": "aggregate_type",
  "transforms.outbox.route.topic.regex": "(.*)Order(.*)",
  "transforms.outbox.route.topic.replacement": "order-events"
}

✅ Tại sao Outbox Pattern tốt hơn Dual Write?

Atomicity: Business data và event nằm trong cùng transaction — không thể có trường hợp data committed mà event bị mất. Idempotency: Debezium track offset trong WAL, nên nếu restart sẽ replay từ điểm dừng, không duplicate. Ordering: Event giữ đúng thứ tự transaction commit trong WAL.

CQRS + Materialized View với CDC

CDC là cầu nối tự nhiên cho kiến trúc CQRS (Command Query Responsibility Segregation). Write side ghi vào OLTP database (PostgreSQL, SQL Server), CDC stream thay đổi sang read-optimized store (Elasticsearch, Redis, ClickHouse):

graph LR
    subgraph Write["Write Side"]
        API["API
.NET Core"] --> PG["PostgreSQL
OLTP"] end subgraph CDC["CDC Pipeline"] PG --> DBZ["Debezium"] DBZ --> KF["Kafka"] end subgraph Read["Read Side (Materialized Views)"] KF --> ES["Elasticsearch
Full-text Search"] KF --> RD["Redis
Hot Data Cache"] KF --> CK["ClickHouse
Analytics"] end style API fill:#512BD4,stroke:#fff,color:#fff style PG fill:#336791,stroke:#fff,color:#fff style DBZ fill:#e94560,stroke:#fff,color:#fff style KF fill:#2c3e50,stroke:#fff,color:#fff style ES fill:#f8f9fa,stroke:#e94560,color:#2c3e50 style RD fill:#DC382D,stroke:#fff,color:#fff style CK fill:#FFCC01,stroke:#2c3e50,color:#2c3e50

CQRS với CDC: write vào PostgreSQL, CDC tự động đồng bộ sang read stores

CDC-based Cache Invalidation

Một trong những bài toán khó nhất trong distributed systems là cache invalidation. Với CDC, bạn không cần thêm logic invalidation vào application code — Debezium tự động phát event khi data thay đổi, consumer lắng nghe và update/invalidate cache:

// .NET Consumer xử lý CDC event để invalidate Redis cache
public class CdcCacheInvalidator : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly IConnectionMultiplexer _redis;

    protected override async Task ExecuteAsync(
        CancellationToken stoppingToken)
    {
        _consumer.Subscribe("myapp.public.products");

        while (!stoppingToken.IsCancellationRequested)
        {
            var result = _consumer.Consume(stoppingToken);
            var change = JsonSerializer
                .Deserialize<DebeziumEvent>(result.Message.Value);

            var db = _redis.GetDatabase();
            var productId = change.After?.Id ?? change.Before?.Id;

            switch (change.Op)
            {
                case "c":
                case "u":
                    // Upsert vào cache với TTL
                    await db.StringSetAsync(
                        $"product:{productId}",
                        JsonSerializer.Serialize(change.After),
                        TimeSpan.FromMinutes(30));
                    break;

                case "d":
                    // Xóa khỏi cache
                    await db.KeyDeleteAsync($"product:{productId}");
                    break;
            }
        }
    }
}

7. Debezium Server: CDC không cần Kafka

Không phải tổ chức nào cũng muốn vận hành Kafka cluster. Debezium Server là một standalone application chạy Debezium connector mà không cần Kafka Connect — thay vào đó, event được gửi trực tiếp đến các sink như Redis Streams, Amazon Kinesis, Google Pub/Sub, Azure Event Hubs, hoặc HTTP webhook.

graph LR
    DB["Source DB"] --> DS["Debezium
Server"] DS --> RS["Redis Streams"] DS --> KN["Amazon Kinesis"] DS --> PB["Google Pub/Sub"] DS --> EH["Azure Event Hubs"] DS --> HT["HTTP Webhook"] style DB fill:#336791,stroke:#fff,color:#fff style DS fill:#e94560,stroke:#fff,color:#fff style RS fill:#DC382D,stroke:#fff,color:#fff style KN fill:#FF9900,stroke:#fff,color:#fff style PB fill:#4285F4,stroke:#fff,color:#fff style EH fill:#0078D4,stroke:#fff,color:#fff style HT fill:#f8f9fa,stroke:#e94560,color:#2c3e50

Debezium Server: lightweight deployment, sink trực tiếp đến cloud services

# application.properties cho Debezium Server
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=appuser
debezium.source.database.password=secret
debezium.source.database.dbname=myapp
debezium.source.topic.prefix=myapp
debezium.source.plugin.name=pgoutput
debezium.source.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
debezium.source.offset.storage.file.filename=/data/offsets.dat

# Sink: Redis Streams
debezium.sink.type=redis
debezium.sink.redis.address=localhost:6379

# Hoặc Sink: HTTP Webhook
# debezium.sink.type=http
# debezium.sink.http.url=https://api.example.com/webhooks/cdc

💡 Khi nào dùng Debezium Server thay vì Kafka Connect?

Chọn Debezium Server khi: team nhỏ không muốn operate Kafka, đã dùng cloud managed messaging (Kinesis, Event Hubs), hoặc chỉ cần CDC cho 1-2 database đơn giản. Chọn Kafka Connect khi: cần nhiều connector, yêu cầu exactly-once semantics, cần replay event từ Kafka topic, hoặc đã có Kafka infrastructure.

8. Production Best Practices

Snapshot Strategy

Khi Debezium connector khởi động lần đầu, nó cần snapshot dữ liệu hiện có. Có nhiều chế độ snapshot:

ModeMô tảUse case
initial Snapshot toàn bộ rồi chuyển sang streaming Lần đầu setup, cần toàn bộ dữ liệu hiện có
initial_only Chỉ snapshot, không stream sau đó One-time data migration
when_needed Snapshot lại nếu offset bị mất Fault-tolerant production setup
no_data Chỉ capture schema, bỏ qua data hiện có Chỉ quan tâm thay đổi từ thời điểm setup
recovery Re-snapshot khi phát hiện gap trong WAL Recovery sau sự cố WAL bị truncate

Monitoring và Observability

CDC pipeline là thành phần critical — nếu Debezium dừng hoặc lag, downstream systems sẽ có stale data. Các metric cần giám sát:

LagMilliSecondsBehindSource — khoảng cách giữa event time và process time
QueueQueueTotalCapacity vs QueueRemainingCapacity — buffer overflow risk
ErrorsNumberOfErroneousEvents — parse/transform failures
ThroughputTotalNumberOfEventsSeen — events per second processed
# Prometheus scrape config cho Debezium metrics
- job_name: 'debezium'
  metrics_path: '/metrics'
  static_configs:
    - targets: ['connect:8083']

# Alert rules quan trọng
groups:
  - name: debezium_alerts
    rules:
      - alert: DebeziumHighLag
        expr: debezium_metrics_MilliSecondsBehindSource > 30000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Debezium lag > 30s trên {{ $labels.context }}"

      - alert: DebeziumConnectorDown
        expr: debezium_metrics_Connected == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Debezium connector mất kết nối DB"

Schema Evolution

Database schema thay đổi liên tục (ALTER TABLE, ADD COLUMN...). Debezium xử lý điều này qua Schema History Topic — lưu trữ mọi DDL change để connector restart có thể rebuild schema. Kết hợp với Schema Registry, consumers có thể handle schema evolution gracefully:

  • Adding columns: Event mới sẽ có field mới, consumers cũ bỏ qua field không biết (forward compatibility)
  • Removing columns: Consumers mới handle missing field bằng default value (backward compatibility)
  • Renaming columns: Cần đặc biệt cẩn thận — thường phải coordinate deploy giữa schema change và consumer update

⚠️ WAL Retention quan trọng

Nếu Debezium connector offline quá lâu, WAL segments có thể bị PostgreSQL recycle. Khi connector restart, nó sẽ không tìm thấy offset cũ → phải re-snapshot. Đặt wal_keep_size đủ lớn (hoặc dùng replication slot) và monitor pg_replication_slots regularly. Trên production, nên set max_slot_wal_keep_size để tránh disk full do WAL tích lũy.

9. Tích hợp CDC trong ứng dụng .NET

Trong hệ sinh thái .NET, có nhiều cách consume CDC events. Dưới đây là pattern sử dụng Confluent.Kafka NuGet package kết hợp với hosted service:

// Program.cs - Đăng ký CDC consumer service
builder.Services.AddSingleton<IConsumer<string, string>>(sp =>
{
    var config = new ConsumerConfig
    {
        BootstrapServers = "kafka:9092",
        GroupId = "order-projection-service",
        AutoOffsetReset = AutoOffsetReset.Earliest,
        EnableAutoCommit = false, // Manual commit cho exactly-once
        EnablePartitionEof = true
    };
    return new ConsumerBuilder<string, string>(config).Build();
});

builder.Services.AddHostedService<OrderProjectionService>();

// OrderProjectionService.cs
public class OrderProjectionService : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<OrderProjectionService> _logger;

    protected override async Task ExecuteAsync(
        CancellationToken ct)
    {
        _consumer.Subscribe(new[] {
            "myapp.public.orders",
            "myapp.public.order_items"
        });

        while (!ct.IsCancellationRequested)
        {
            try
            {
                var result = _consumer.Consume(ct);
                if (result.IsPartitionEOF) continue;

                var evt = JsonSerializer
                    .Deserialize<CdcEvent>(result.Message.Value);

                using var scope = _scopeFactory.CreateScope();
                var handler = scope.ServiceProvider
                    .GetRequiredService<ICdcEventHandler>();

                await handler.HandleAsync(evt);

                // Commit offset sau khi xử lý thành công
                _consumer.Commit(result);
            }
            catch (ConsumeException ex)
            {
                _logger.LogError(ex,
                    "CDC consume error on {Topic}",
                    ex.ConsumerRecord?.Topic);
                // Implement retry / dead letter logic
            }
        }
    }
}

// ICdcEventHandler implementation
public class OrderCdcHandler : ICdcEventHandler
{
    private readonly ElasticClient _elastic;

    public async Task HandleAsync(CdcEvent evt)
    {
        if (evt.Source.Table == "orders")
        {
            switch (evt.Op)
            {
                case "c":
                case "u":
                    await _elastic.IndexDocumentAsync(
                        MapToSearchModel(evt.After));
                    break;
                case "d":
                    await _elastic.DeleteAsync<OrderSearch>(
                        evt.Before.Id);
                    break;
            }
        }
    }
}

10. So sánh các CDC Tools năm 2026

ToolLoạiDatabases hỗ trợSinkGiá
Debezium Open source PostgreSQL, MySQL, SQL Server, MongoDB, Oracle, Cassandra, DB2 Kafka, Redis, HTTP, Kinesis, Pub/Sub, Event Hubs Miễn phí
AWS DMS Managed Hầu hết RDBMS + MongoDB + S3 RDS, Redshift, S3, Kinesis, OpenSearch Pay-per-use (instance hours)
Fivetran SaaS 200+ connectors Snowflake, BigQuery, Redshift, Databricks Credits-based ($$)
Airbyte Open source + Cloud 300+ connectors Warehouse, lakehouse Free (self-hosted) / Credits (cloud)
RisingWave Open source PostgreSQL, MySQL via CDC Stream processing + materialized views Miễn phí (self-hosted)

💡 Khi nào chọn gì?

Debezium — khi cần full control, self-hosted, event-driven architecture, hoặc đã có Kafka. AWS DMS — khi migration hoặc replication giữa các AWS services. Fivetran/Airbyte — khi focus data warehouse/lakehouse, cần nhiều SaaS connectors. RisingWave — khi cần stream processing trực tiếp trên CDC stream.

11. Pitfalls thường gặp và cách xử lý

1. Replication Slot bị "stuck"

Triệu chứng: Disk usage tăng liên tục vì PostgreSQL giữ WAL cho slot không active.
Nguyên nhân: Connector crash hoặc consumer lag quá lớn.
Giải pháp: Monitor pg_replication_slots, set max_slot_wal_keep_size, alert khi pg_wal_lsn_diff() vượt ngưỡng.

2. Snapshot quá chậm với table lớn

Triệu chứng: Initial snapshot chạy hàng giờ với table hàng trăm triệu row.
Giải pháp: Dùng snapshot.mode=no_data rồi tự backfill bằng batch job riêng. Hoặc dùng snapshot.fetch.sizesnapshot.max.threads để tune.

3. Event ordering không đúng

Triệu chứng: Consumer nhận UPDATE trước INSERT.
Nguyên nhân: Kafka topic có nhiều partition, events cho cùng entity rơi vào partition khác nhau.
Giải pháp: Đảm bảo message key là primary key — Kafka guarantee ordering trong cùng partition cho cùng key.

4. Schema change gây lỗi connector

Triệu chứng: Connector crash sau ALTER TABLE.
Giải pháp: Bật schema.history.internal topic, dùng schema.history.internal.store.only.captured.tables.ddl=true để giảm noise. Test schema changes trên staging trước.

12. Kết luận

Change Data Capture với Debezium đã trở thành building block không thể thiếu trong kiến trúc microservices hiện đại. Thay vì buộc application phải "biết" về mọi downstream system, CDC cho phép database trở thành nguồn sự kiện duy nhất — đơn giản, đáng tin cậy, và dễ mở rộng.

Những điểm then chốt cần nhớ:

  • Log-based CDC là lựa chọn duy nhất cho production — zero overhead, capture mọi thay đổi, giữ đúng thứ tự
  • Outbox Pattern + Debezium giải quyết bài toán atomic event publishing mà dual write không làm được
  • Debezium Server mở ra lựa chọn CDC không cần Kafka cho team nhỏ hoặc cloud-native workload
  • Monitoring là bắt buộc — CDC lag = stale data downstream, cần alert trên MilliSecondsBehindSource và WAL accumulation
  • Kết hợp CDC với CQRS, cache invalidation, search sync tạo nên kiến trúc event-driven hoàn chỉnh mà không cần sửa application code

CDC không phải là giải pháp cho mọi bài toán data integration. Nhưng khi bạn cần real-time data sync với đảm bảo consistency, Debezium + Kafka (hoặc Debezium Server) là stack đáng đầu tư nhất ở thời điểm 2026.

Tham khảo