Change Data Capture với Debezium: Đồng bộ dữ liệu real-time cho Microservices
Posted on: 4/18/2026 12:10:44 PM
Table of contents
- 1. Change Data Capture là gì và tại sao quan trọng?
- 2. Các phương pháp CDC: từ thô sơ đến chuyên nghiệp
- 3. Debezium: Kiến trúc và cách hoạt động
- 4. Debezium Connectors: Hỗ trợ đa database
- 5. Hands-on: Setup Debezium với PostgreSQL
- 6. CDC Patterns nâng cao trong Production
- 7. Debezium Server: CDC không cần Kafka
- 8. Production Best Practices
- 9. Tích hợp CDC trong ứng dụng .NET
- 10. So sánh các CDC Tools năm 2026
- 11. Pitfalls thường gặp và cách xử lý
- 12. Kết luận
- Tham khảo
1. Change Data Capture là gì và tại sao quan trọng?
Change Data Capture (CDC) là kỹ thuật phát hiện và ghi nhận mọi thay đổi (INSERT, UPDATE, DELETE) xảy ra trong database, rồi truyền tải chúng dưới dạng event stream đến các hệ thống downstream. Thay vì các hệ thống phải liên tục polling database để kiểm tra "có gì mới không?", CDC đẩy thay đổi ra ngoài ngay khi chúng xảy ra — biến database thành một nguồn phát sự kiện real-time.
Trong kiến trúc microservices hiện đại, CDC giải quyết một bài toán cốt lõi: làm thế nào để đồng bộ dữ liệu giữa các service mà không cần tight coupling? Khi Service A ghi dữ liệu vào PostgreSQL, Service B (search), Service C (analytics), Service D (cache) đều cần biết về thay đổi đó — nhưng Service A không nên phải gọi API đến từng service. CDC giải quyết điều này bằng cách đọc transaction log của database và phát event tự động.
💡 Tại sao không dùng Dual Write?
Nhiều team chọn cách "ghi vào DB rồi publish event" (dual write). Nhưng pattern này không đảm bảo atomicity — nếu DB commit thành công nhưng message broker fail, dữ liệu sẽ không nhất quán. CDC loại bỏ hoàn toàn vấn đề này vì chỉ đọc từ transaction log đã committed.
Các use case chính của CDC
CDC không chỉ dùng cho data sync đơn thuần. Dưới đây là các scenario phổ biến trong production:
- Event-Driven Architecture: Biến mọi thay đổi trong database thành domain event, giúp các microservices phản ứng real-time mà không cần gọi API lẫn nhau
- Cache Invalidation: Tự động invalidate hoặc update Redis/Memcached khi dữ liệu gốc thay đổi — giải quyết bài toán cache consistency triệt để
- Search Index Sync: Đồng bộ dữ liệu từ OLTP database sang Elasticsearch/OpenSearch gần như real-time, không cần batch reindex
- Data Warehouse / Lakehouse: Stream data từ operational DB sang ClickHouse, Apache Iceberg, hoặc BigQuery để phân tích
- Audit Trail: Ghi nhận lịch sử thay đổi đầy đủ — ai thay đổi gì, lúc nào, giá trị cũ/mới
- Database Migration: Zero-downtime migration giữa database engines bằng cách CDC từ source sang target, rồi cutover khi đã sync xong
2. Các phương pháp CDC: từ thô sơ đến chuyên nghiệp
Có 4 phương pháp CDC chính, mỗi cách có trade-off riêng về hiệu năng, độ chính xác, và complexity:
graph LR
A["Timestamp-based
Polling"] --> B["Trigger-based
CDC"]
B --> C["Query-based
CDC"]
C --> D["Log-based
CDC"]
style A fill:#f8f9fa,stroke:#e94560,color:#2c3e50
style B fill:#f8f9fa,stroke:#e94560,color:#2c3e50
style C fill:#f8f9fa,stroke:#e94560,color:#2c3e50
style D fill:#e94560,stroke:#fff,color:#fff
Các phương pháp CDC xếp theo mức độ tinh vi — Log-based là giải pháp production-grade
| Phương pháp | Cách hoạt động | Ưu điểm | Nhược điểm |
|---|---|---|---|
| Timestamp-based | Query records có updated_at > last check |
Đơn giản, không cần tool đặc biệt | Bỏ sót DELETE, phụ thuộc vào column timestamp, gây load cho DB |
| Trigger-based | DB trigger ghi thay đổi vào shadow table | Bắt được mọi loại thay đổi | Overhead lớn lên write path, khó maintain, không portable |
| Query-based | Polling định kỳ với diff detection | Hoạt động với mọi DB | Latency cao (phút), gây load khi table lớn, bỏ sót intermediate changes |
| Log-based | Đọc transaction log (WAL/binlog/oplog) | Zero impact lên DB, capture mọi thay đổi, thứ tự chính xác, latency thấp | Phức tạp hơn, phụ thuộc DB-specific log format |
✅ Best Practice
Cho mọi production workload, luôn chọn log-based CDC. Các phương pháp khác chỉ phù hợp cho development/prototyping hoặc những database không hỗ trợ logical replication. Log-based CDC là phương pháp duy nhất đảm bảo: zero overhead, capture DELETE, giữ đúng thứ tự transaction, và latency dưới giây.
3. Debezium: Kiến trúc và cách hoạt động
Debezium là nền tảng CDC mã nguồn mở phổ biến nhất hiện nay, được phát triển bởi Red Hat và cộng đồng. Debezium đọc transaction log của database và chuyển đổi mỗi row-level change thành một structured event, rồi publish lên Kafka (hoặc các messaging system khác qua Debezium Server).
Kiến trúc tổng thể
graph TB
subgraph Sources["Source Databases"]
PG["PostgreSQL
WAL"]
MY["MySQL
Binlog"]
MG["MongoDB
Oplog"]
SS["SQL Server
CT/CDC"]
end
subgraph KC["Kafka Connect Cluster"]
DC1["Debezium
Connector 1"]
DC2["Debezium
Connector 2"]
DC3["Debezium
Connector N"]
end
subgraph Kafka["Apache Kafka"]
T1["Topic: db.schema.users"]
T2["Topic: db.schema.orders"]
T3["Topic: db.schema.products"]
SR["Schema Registry"]
end
subgraph Consumers["Downstream Consumers"]
ES["Elasticsearch"]
RD["Redis Cache"]
CH["ClickHouse"]
MS["Microservices"]
end
PG --> DC1
MY --> DC2
MG --> DC3
SS --> DC1
DC1 --> T1
DC2 --> T2
DC3 --> T3
DC1 --> SR
DC2 --> SR
T1 --> ES
T2 --> RD
T1 --> CH
T3 --> MS
style PG fill:#336791,stroke:#fff,color:#fff
style MY fill:#4479A1,stroke:#fff,color:#fff
style MG fill:#4DB33D,stroke:#fff,color:#fff
style SS fill:#CC2927,stroke:#fff,color:#fff
style DC1 fill:#e94560,stroke:#fff,color:#fff
style DC2 fill:#e94560,stroke:#fff,color:#fff
style DC3 fill:#e94560,stroke:#fff,color:#fff
style T1 fill:#f8f9fa,stroke:#e94560,color:#2c3e50
style T2 fill:#f8f9fa,stroke:#e94560,color:#2c3e50
style T3 fill:#f8f9fa,stroke:#e94560,color:#2c3e50
style SR fill:#f8f9fa,stroke:#e0e0e0,color:#2c3e50
style ES fill:#2c3e50,stroke:#fff,color:#fff
style RD fill:#DC382D,stroke:#fff,color:#fff
style CH fill:#FFCC01,stroke:#2c3e50,color:#2c3e50
style MS fill:#2c3e50,stroke:#fff,color:#fff
Kiến trúc end-to-end: Debezium đọc transaction log → publish event lên Kafka → consumers xử lý downstream
Ba thành phần chính trong kiến trúc Debezium:
- Debezium Connector: Chạy như một Kafka Connect plugin, mỗi connector giám sát một database instance. Connector đọc transaction log, parse thành change event, và produce lên Kafka topic
- Apache Kafka: Đóng vai trò message broker trung tâm. Mỗi table trong source database tương ứng với một Kafka topic. Kafka đảm bảo durability, ordering, và cho phép nhiều consumer đọc cùng stream
- Schema Registry: Lưu trữ schema của change event (Avro/JSON Schema/Protobuf), giúp consumers deserialize đúng format và hỗ trợ schema evolution
Cấu trúc một Change Event
Mỗi event Debezium phát ra có cấu trúc chuẩn gồm key (primary key của record) và value (chi tiết thay đổi):
{
"schema": { ... },
"payload": {
"before": {
"id": 1001,
"name": "Nguyễn Văn A",
"email": "a@example.com",
"status": "active"
},
"after": {
"id": 1001,
"name": "Nguyễn Văn A",
"email": "newemail@example.com",
"status": "active"
},
"source": {
"version": "2.7.0",
"connector": "postgresql",
"name": "myapp",
"ts_ms": 1713436800000,
"db": "mydb",
"schema": "public",
"table": "users",
"lsn": 123456789,
"txId": 5678
},
"op": "u",
"ts_ms": 1713436800123,
"transaction": {
"id": "5678",
"total_order": 3,
"data_collection_order": 1
}
}
}Trường op cho biết loại thao tác: "c" = create (INSERT), "u" = update, "d" = delete, "r" = read (snapshot). Trường before và after chứa trạng thái row trước và sau thay đổi — đặc biệt hữu ích cho audit trail và diff detection.
4. Debezium Connectors: Hỗ trợ đa database
Debezium hỗ trợ hầu hết các database phổ biến, mỗi connector tận dụng cơ chế replication riêng của từng engine:
| Database | Cơ chế CDC | Đặc điểm nổi bật |
|---|---|---|
| PostgreSQL | Logical Replication (pgoutput / wal2json) | Hỗ trợ tốt nhất, có thể dùng publication để filter table. Cần set wal_level=logical |
| MySQL/MariaDB | Binlog (ROW format) | Đọc binary log với GTID tracking. Cần binlog_format=ROW và binlog_row_image=FULL |
| SQL Server | Change Tracking / CDC tables | Dùng built-in CDC feature của SQL Server. Cần enable CDC trên database và từng table cần capture |
| MongoDB | Oplog / Change Streams | Sử dụng Change Streams API (MongoDB 3.6+). Hỗ trợ resume token để không mất event |
| Oracle | LogMiner / XStream | Đọc redo log qua LogMiner API. Yêu cầu supplemental logging enable |
| Cassandra | Commit Log | Community connector, đọc commit log segments. Phù hợp cho CDC từ wide-column store |
⚠️ Lưu ý với SQL Server
SQL Server CDC yêu cầu SQL Server Agent đang chạy và database đã bật CDC (sys.sp_cdc_enable_db). Nếu dùng Azure SQL Database, cần tier Standard trở lên. Debezium sẽ đọc từ các change table (cdc.*) thay vì trực tiếp transaction log.
5. Hands-on: Setup Debezium với PostgreSQL
Dưới đây là hướng dẫn thiết lập CDC pipeline hoàn chỉnh với Docker Compose — từ PostgreSQL source đến Kafka consumer:
Docker Compose cho môi trường CDC
version: '3.8'
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: myapp
POSTGRES_USER: appuser
POSTGRES_PASSWORD: secret
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
ports:
- "5432:5432"
kafka:
image: bitnami/kafka:3.7
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
ports:
- "9092:9092"
connect:
image: debezium/connect:2.7
depends_on:
- kafka
- postgres
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: cdc-connect-cluster
CONFIG_STORAGE_TOPIC: _connect-configs
OFFSET_STORAGE_TOPIC: _connect-offsets
STATUS_STORAGE_TOPIC: _connect-status
ports:
- "8083:8083"Đăng ký Debezium Connector
Sau khi các container đã chạy, đăng ký connector qua REST API:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "myapp-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "appuser",
"database.password": "secret",
"database.dbname": "myapp",
"topic.prefix": "myapp",
"schema.include.list": "public",
"table.include.list": "public.users,public.orders",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_pub",
"snapshot.mode": "initial",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}'Các config quan trọng:
topic.prefix: Prefix cho tên Kafka topic — event từ tablepublic.userssẽ vào topicmyapp.public.userssnapshot.mode: initial: Lần đầu chạy, Debezium snapshot toàn bộ data hiện có rồi mới chuyển sang đọc WALplugin.name: pgoutput: Dùng logical decoding plugin built-in của PostgreSQL (không cần cài thêm extension)ExtractNewRecordState: Transform giúp flatten event — thay vì before/after envelope, consumer nhận trực tiếp record state mới
6. CDC Patterns nâng cao trong Production
Transactional Outbox Pattern
Outbox Pattern là một trong những ứng dụng quan trọng nhất của CDC. Thay vì publish event trực tiếp lên message broker (dual write), service ghi event vào một outbox table trong cùng transaction với business data. Debezium đọc outbox table và publish event lên Kafka.
sequenceDiagram
participant App as Application
participant DB as PostgreSQL
participant DBZ as Debezium
participant K as Kafka
participant C as Consumer
App->>DB: BEGIN TRANSACTION
App->>DB: INSERT INTO orders (...)
App->>DB: INSERT INTO outbox (aggregate_type, payload)
App->>DB: COMMIT
DBZ->>DB: Read WAL (outbox table changes)
DBZ->>K: Publish OrderCreated event
K->>C: Consume event
C->>C: Update search index / Send email / Update cache
Outbox Pattern: event được ghi atomically cùng business data, Debezium relay lên Kafka
Debezium có sẵn Outbox Event Router SMT (Single Message Transform) để xử lý pattern này:
-- Outbox table schema
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Ví dụ: khi tạo order
BEGIN;
INSERT INTO orders (id, customer_id, total, status)
VALUES ('ord-001', 'cust-123', 599000, 'pending');
INSERT INTO outbox (aggregate_type, aggregate_id, type, payload)
VALUES (
'Order', 'ord-001', 'OrderCreated',
'{"orderId":"ord-001","customerId":"cust-123","total":599000}'::jsonb
);
COMMIT;Config connector cho Outbox:
{
"transforms": "outbox",
"transforms.outbox.type":
"io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.regex": "(.*)Order(.*)",
"transforms.outbox.route.topic.replacement": "order-events"
}✅ Tại sao Outbox Pattern tốt hơn Dual Write?
Atomicity: Business data và event nằm trong cùng transaction — không thể có trường hợp data committed mà event bị mất. Idempotency: Debezium track offset trong WAL, nên nếu restart sẽ replay từ điểm dừng, không duplicate. Ordering: Event giữ đúng thứ tự transaction commit trong WAL.
CQRS + Materialized View với CDC
CDC là cầu nối tự nhiên cho kiến trúc CQRS (Command Query Responsibility Segregation). Write side ghi vào OLTP database (PostgreSQL, SQL Server), CDC stream thay đổi sang read-optimized store (Elasticsearch, Redis, ClickHouse):
graph LR
subgraph Write["Write Side"]
API["API
.NET Core"] --> PG["PostgreSQL
OLTP"]
end
subgraph CDC["CDC Pipeline"]
PG --> DBZ["Debezium"]
DBZ --> KF["Kafka"]
end
subgraph Read["Read Side (Materialized Views)"]
KF --> ES["Elasticsearch
Full-text Search"]
KF --> RD["Redis
Hot Data Cache"]
KF --> CK["ClickHouse
Analytics"]
end
style API fill:#512BD4,stroke:#fff,color:#fff
style PG fill:#336791,stroke:#fff,color:#fff
style DBZ fill:#e94560,stroke:#fff,color:#fff
style KF fill:#2c3e50,stroke:#fff,color:#fff
style ES fill:#f8f9fa,stroke:#e94560,color:#2c3e50
style RD fill:#DC382D,stroke:#fff,color:#fff
style CK fill:#FFCC01,stroke:#2c3e50,color:#2c3e50
CQRS với CDC: write vào PostgreSQL, CDC tự động đồng bộ sang read stores
CDC-based Cache Invalidation
Một trong những bài toán khó nhất trong distributed systems là cache invalidation. Với CDC, bạn không cần thêm logic invalidation vào application code — Debezium tự động phát event khi data thay đổi, consumer lắng nghe và update/invalidate cache:
// .NET Consumer xử lý CDC event để invalidate Redis cache
public class CdcCacheInvalidator : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IConnectionMultiplexer _redis;
protected override async Task ExecuteAsync(
CancellationToken stoppingToken)
{
_consumer.Subscribe("myapp.public.products");
while (!stoppingToken.IsCancellationRequested)
{
var result = _consumer.Consume(stoppingToken);
var change = JsonSerializer
.Deserialize<DebeziumEvent>(result.Message.Value);
var db = _redis.GetDatabase();
var productId = change.After?.Id ?? change.Before?.Id;
switch (change.Op)
{
case "c":
case "u":
// Upsert vào cache với TTL
await db.StringSetAsync(
$"product:{productId}",
JsonSerializer.Serialize(change.After),
TimeSpan.FromMinutes(30));
break;
case "d":
// Xóa khỏi cache
await db.KeyDeleteAsync($"product:{productId}");
break;
}
}
}
}7. Debezium Server: CDC không cần Kafka
Không phải tổ chức nào cũng muốn vận hành Kafka cluster. Debezium Server là một standalone application chạy Debezium connector mà không cần Kafka Connect — thay vào đó, event được gửi trực tiếp đến các sink như Redis Streams, Amazon Kinesis, Google Pub/Sub, Azure Event Hubs, hoặc HTTP webhook.
graph LR
DB["Source DB"] --> DS["Debezium
Server"]
DS --> RS["Redis Streams"]
DS --> KN["Amazon Kinesis"]
DS --> PB["Google Pub/Sub"]
DS --> EH["Azure Event Hubs"]
DS --> HT["HTTP Webhook"]
style DB fill:#336791,stroke:#fff,color:#fff
style DS fill:#e94560,stroke:#fff,color:#fff
style RS fill:#DC382D,stroke:#fff,color:#fff
style KN fill:#FF9900,stroke:#fff,color:#fff
style PB fill:#4285F4,stroke:#fff,color:#fff
style EH fill:#0078D4,stroke:#fff,color:#fff
style HT fill:#f8f9fa,stroke:#e94560,color:#2c3e50
Debezium Server: lightweight deployment, sink trực tiếp đến cloud services
# application.properties cho Debezium Server debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.database.hostname=localhost debezium.source.database.port=5432 debezium.source.database.user=appuser debezium.source.database.password=secret debezium.source.database.dbname=myapp debezium.source.topic.prefix=myapp debezium.source.plugin.name=pgoutput debezium.source.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore debezium.source.offset.storage.file.filename=/data/offsets.dat # Sink: Redis Streams debezium.sink.type=redis debezium.sink.redis.address=localhost:6379 # Hoặc Sink: HTTP Webhook # debezium.sink.type=http # debezium.sink.http.url=https://api.example.com/webhooks/cdc
💡 Khi nào dùng Debezium Server thay vì Kafka Connect?
Chọn Debezium Server khi: team nhỏ không muốn operate Kafka, đã dùng cloud managed messaging (Kinesis, Event Hubs), hoặc chỉ cần CDC cho 1-2 database đơn giản. Chọn Kafka Connect khi: cần nhiều connector, yêu cầu exactly-once semantics, cần replay event từ Kafka topic, hoặc đã có Kafka infrastructure.
8. Production Best Practices
Snapshot Strategy
Khi Debezium connector khởi động lần đầu, nó cần snapshot dữ liệu hiện có. Có nhiều chế độ snapshot:
| Mode | Mô tả | Use case |
|---|---|---|
initial |
Snapshot toàn bộ rồi chuyển sang streaming | Lần đầu setup, cần toàn bộ dữ liệu hiện có |
initial_only |
Chỉ snapshot, không stream sau đó | One-time data migration |
when_needed |
Snapshot lại nếu offset bị mất | Fault-tolerant production setup |
no_data |
Chỉ capture schema, bỏ qua data hiện có | Chỉ quan tâm thay đổi từ thời điểm setup |
recovery |
Re-snapshot khi phát hiện gap trong WAL | Recovery sau sự cố WAL bị truncate |
Monitoring và Observability
CDC pipeline là thành phần critical — nếu Debezium dừng hoặc lag, downstream systems sẽ có stale data. Các metric cần giám sát:
# Prometheus scrape config cho Debezium metrics
- job_name: 'debezium'
metrics_path: '/metrics'
static_configs:
- targets: ['connect:8083']
# Alert rules quan trọng
groups:
- name: debezium_alerts
rules:
- alert: DebeziumHighLag
expr: debezium_metrics_MilliSecondsBehindSource > 30000
for: 5m
labels:
severity: warning
annotations:
summary: "Debezium lag > 30s trên {{ $labels.context }}"
- alert: DebeziumConnectorDown
expr: debezium_metrics_Connected == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Debezium connector mất kết nối DB"Schema Evolution
Database schema thay đổi liên tục (ALTER TABLE, ADD COLUMN...). Debezium xử lý điều này qua Schema History Topic — lưu trữ mọi DDL change để connector restart có thể rebuild schema. Kết hợp với Schema Registry, consumers có thể handle schema evolution gracefully:
- Adding columns: Event mới sẽ có field mới, consumers cũ bỏ qua field không biết (forward compatibility)
- Removing columns: Consumers mới handle missing field bằng default value (backward compatibility)
- Renaming columns: Cần đặc biệt cẩn thận — thường phải coordinate deploy giữa schema change và consumer update
⚠️ WAL Retention quan trọng
Nếu Debezium connector offline quá lâu, WAL segments có thể bị PostgreSQL recycle. Khi connector restart, nó sẽ không tìm thấy offset cũ → phải re-snapshot. Đặt wal_keep_size đủ lớn (hoặc dùng replication slot) và monitor pg_replication_slots regularly. Trên production, nên set max_slot_wal_keep_size để tránh disk full do WAL tích lũy.
9. Tích hợp CDC trong ứng dụng .NET
Trong hệ sinh thái .NET, có nhiều cách consume CDC events. Dưới đây là pattern sử dụng Confluent.Kafka NuGet package kết hợp với hosted service:
// Program.cs - Đăng ký CDC consumer service
builder.Services.AddSingleton<IConsumer<string, string>>(sp =>
{
var config = new ConsumerConfig
{
BootstrapServers = "kafka:9092",
GroupId = "order-projection-service",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // Manual commit cho exactly-once
EnablePartitionEof = true
};
return new ConsumerBuilder<string, string>(config).Build();
});
builder.Services.AddHostedService<OrderProjectionService>();
// OrderProjectionService.cs
public class OrderProjectionService : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OrderProjectionService> _logger;
protected override async Task ExecuteAsync(
CancellationToken ct)
{
_consumer.Subscribe(new[] {
"myapp.public.orders",
"myapp.public.order_items"
});
while (!ct.IsCancellationRequested)
{
try
{
var result = _consumer.Consume(ct);
if (result.IsPartitionEOF) continue;
var evt = JsonSerializer
.Deserialize<CdcEvent>(result.Message.Value);
using var scope = _scopeFactory.CreateScope();
var handler = scope.ServiceProvider
.GetRequiredService<ICdcEventHandler>();
await handler.HandleAsync(evt);
// Commit offset sau khi xử lý thành công
_consumer.Commit(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex,
"CDC consume error on {Topic}",
ex.ConsumerRecord?.Topic);
// Implement retry / dead letter logic
}
}
}
}
// ICdcEventHandler implementation
public class OrderCdcHandler : ICdcEventHandler
{
private readonly ElasticClient _elastic;
public async Task HandleAsync(CdcEvent evt)
{
if (evt.Source.Table == "orders")
{
switch (evt.Op)
{
case "c":
case "u":
await _elastic.IndexDocumentAsync(
MapToSearchModel(evt.After));
break;
case "d":
await _elastic.DeleteAsync<OrderSearch>(
evt.Before.Id);
break;
}
}
}
}10. So sánh các CDC Tools năm 2026
| Tool | Loại | Databases hỗ trợ | Sink | Giá |
|---|---|---|---|---|
| Debezium | Open source | PostgreSQL, MySQL, SQL Server, MongoDB, Oracle, Cassandra, DB2 | Kafka, Redis, HTTP, Kinesis, Pub/Sub, Event Hubs | Miễn phí |
| AWS DMS | Managed | Hầu hết RDBMS + MongoDB + S3 | RDS, Redshift, S3, Kinesis, OpenSearch | Pay-per-use (instance hours) |
| Fivetran | SaaS | 200+ connectors | Snowflake, BigQuery, Redshift, Databricks | Credits-based ($$) |
| Airbyte | Open source + Cloud | 300+ connectors | Warehouse, lakehouse | Free (self-hosted) / Credits (cloud) |
| RisingWave | Open source | PostgreSQL, MySQL via CDC | Stream processing + materialized views | Miễn phí (self-hosted) |
💡 Khi nào chọn gì?
Debezium — khi cần full control, self-hosted, event-driven architecture, hoặc đã có Kafka. AWS DMS — khi migration hoặc replication giữa các AWS services. Fivetran/Airbyte — khi focus data warehouse/lakehouse, cần nhiều SaaS connectors. RisingWave — khi cần stream processing trực tiếp trên CDC stream.
11. Pitfalls thường gặp và cách xử lý
1. Replication Slot bị "stuck"
Triệu chứng: Disk usage tăng liên tục vì PostgreSQL giữ WAL cho slot không active.
Nguyên nhân: Connector crash hoặc consumer lag quá lớn.
Giải pháp: Monitor pg_replication_slots, set max_slot_wal_keep_size, alert khi pg_wal_lsn_diff() vượt ngưỡng.
2. Snapshot quá chậm với table lớn
Triệu chứng: Initial snapshot chạy hàng giờ với table hàng trăm triệu row.
Giải pháp: Dùng snapshot.mode=no_data rồi tự backfill bằng batch job riêng. Hoặc dùng snapshot.fetch.size và snapshot.max.threads để tune.
3. Event ordering không đúng
Triệu chứng: Consumer nhận UPDATE trước INSERT.
Nguyên nhân: Kafka topic có nhiều partition, events cho cùng entity rơi vào partition khác nhau.
Giải pháp: Đảm bảo message key là primary key — Kafka guarantee ordering trong cùng partition cho cùng key.
4. Schema change gây lỗi connector
Triệu chứng: Connector crash sau ALTER TABLE.
Giải pháp: Bật schema.history.internal topic, dùng schema.history.internal.store.only.captured.tables.ddl=true để giảm noise. Test schema changes trên staging trước.
12. Kết luận
Change Data Capture với Debezium đã trở thành building block không thể thiếu trong kiến trúc microservices hiện đại. Thay vì buộc application phải "biết" về mọi downstream system, CDC cho phép database trở thành nguồn sự kiện duy nhất — đơn giản, đáng tin cậy, và dễ mở rộng.
Những điểm then chốt cần nhớ:
- Log-based CDC là lựa chọn duy nhất cho production — zero overhead, capture mọi thay đổi, giữ đúng thứ tự
- Outbox Pattern + Debezium giải quyết bài toán atomic event publishing mà dual write không làm được
- Debezium Server mở ra lựa chọn CDC không cần Kafka cho team nhỏ hoặc cloud-native workload
- Monitoring là bắt buộc — CDC lag = stale data downstream, cần alert trên MilliSecondsBehindSource và WAL accumulation
- Kết hợp CDC với CQRS, cache invalidation, search sync tạo nên kiến trúc event-driven hoàn chỉnh mà không cần sửa application code
CDC không phải là giải pháp cho mọi bài toán data integration. Nhưng khi bạn cần real-time data sync với đảm bảo consistency, Debezium + Kafka (hoặc Debezium Server) là stack đáng đầu tư nhất ở thời điểm 2026.
Tham khảo
- Debezium Official Documentation — Features
- Conduktor — Implementing CDC with Debezium
- Factor House — Complete Guide to Kafka CDC (2026)
- Redpanda — CDC Approaches, Architectures, and Best Practices
- Hevo Data — Change Data Capture in 2026
- RisingWave — Data Integration for Streaming: Tools, Patterns (2026)
DynamoDB Single-Table Design: Nghệ thuật thiết kế NoSQL cho hệ thống quy mô lớn
Speculation Rules API — Điều hướng web nhanh như chớp với Prefetch và Prerender
Disclaimer: The opinions expressed in this blog are solely my own and do not reflect the views or opinions of my employer or any affiliated organizations. The content provided is for informational and educational purposes only and should not be taken as professional advice. While I strive to provide accurate and up-to-date information, I make no warranties or guarantees about the completeness, reliability, or accuracy of the content. Readers are encouraged to verify the information and seek independent advice as needed. I disclaim any liability for decisions or actions taken based on the content of this blog.