Ngừng Polling Cơ sở dữ liệu: Hướng dẫn thực hành CDC với Debezium và Kafka

Database tutorial - IT technology blog
Database tutorial - IT technology blog

Cơn đau đầu mang tên lệch dữ liệu (Data Desync)

Tôi không nhớ nổi mình đã bao nhiêu lần phải đồng bộ hóa một index Elasticsearch hoặc một Redis cache với cơ sở dữ liệu production chính. Hồi mới vào nghề, tôi thường dựa vào pattern “Dual Write”. Ứng dụng của tôi sẽ ghi vào cơ sở dữ liệu và ngay sau đó thử cập nhật cache. Nó hoạt động ổn trong một thế giới hoàn hảo, nhưng thực tế thì luôn rắc rối. Nếu lần ghi thứ hai thất bại do sự cố mạng, bạn sẽ gặp phải tình trạng “data drift” (lệch dữ liệu) — một sự sai khác âm thầm nhưng đau đớn và là một cơn ác mộng để truy vết.

Tôi cũng từng thử nghiệm với polling. Tôi thiết lập một cron job để lấy các bản ghi có updated_at > last_sync_time mỗi 60 giây. Dù đơn giản, nhưng cách tiếp cận này cực kỳ ngốn tài nguyên. Trên một instance AWS t3.medium khiêm tốn, các truy vấn polling thường xuyên có thể làm CPU tăng vọt 15-20% ngay cả khi không có dữ liệu nào thay đổi. Thêm vào đó, polling hoàn toàn mù tịt trước các thao tác xóa cứng (hard delete); nếu một hàng dữ liệu bị xóa mất, truy vấn của bạn sẽ không bao giờ tìm thấy nó để thông báo cho phần còn lại của hệ thống.

Hãy coi cơ sở dữ liệu là một luồng dữ liệu (Stream), không phải một chiếc hộp

Sai lầm mà hầu hết chúng ta mắc phải là coi cơ sở dữ liệu như một thùng chứa tĩnh. Chúng ta đợi dữ liệu nằm yên đó rồi mới tìm cách kéo nó ra. Nhưng thực tế, mọi cơ sở dữ liệu hiện đại đều đã và đang ghi lại các thay đổi của chúng vào một file log. MySQL sử dụng Binlog, còn PostgreSQL sử dụng Write-Ahead Log (WAL).

Change Data Capture (CDC) thay đổi hoàn toàn cách tiếp cận này. Thay vì hỏi cơ sở dữ liệu để lấy dữ liệu, chúng ta lắng nghe các log nội bộ này. Debezium đóng vai trò là công cụ thực hiện các tác vụ nặng nề ở đây. Đây là một công cụ mã nguồn mở kết nối vào Apache Kafka, chuyển đổi các byte dữ liệu thô của DB thành một luồng sự kiện JSON sạch sẽ và liên tục. Nó giống như sự khác biệt giữa việc kiểm tra hòm thư mỗi ngày một lần và việc có một camera giám sát trực tiếp ngay trước hiên nhà bạn.

Thiết lập hệ thống đường ống

Để xây dựng một pipeline CDC sẵn sàng cho môi trường production, bạn cần ba trụ cột: cơ sở dữ liệu nguồn, Apache Kafka đóng vai trò là xương sống truyền tin, và Debezium chạy bên trong Kafka Connect. Để phát triển tại local, Docker Compose là người bạn đồng hành tốt nhất. Nó giúp bạn khởi chạy toàn bộ hệ sinh thái chỉ trong vài giây.

version: '3.8'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:2.4
    ports:
     - 2181:2181

  kafka:
    image: quay.io/debezium/kafka:2.4
    ports:
     - 9092:9092
    depends_on:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181

  mysql:
    image: quay.io/debezium/example-mysql:2.4
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw

  connect:
    image: quay.io/debezium/connect:2.4
    ports:
     - 8083:8083
    depends_on:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

Khởi chạy hệ thống này bằng lệnh docker-compose up -d. Image MySQL cụ thể này đã được cấu hình sẵn tính năng binary logging, giúp bạn tiết kiệm hàng tá bước cấu hình thủ công.

Cấu hình Debezium Connector

Nếu bạn đang trỏ Debezium vào một instance MySQL production hiện có, bạn sẽ cần tinh chỉnh file my.cnf của mình. Debezium yêu cầu logging ở cấp độ ROW để hiểu chính xác điều gì đã thay đổi trong từng hàng. Nếu không có cấu hình này, nó chỉ thấy được câu lệnh SQL, vốn không đủ để tái cấu trúc trạng thái dữ liệu.

[mysqld]
server-id         = 223344
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 7

Khi cơ sở dữ liệu đã sẵn sàng, chúng ta yêu cầu Debezium bắt đầu theo dõi. Chúng ta thực hiện việc này bằng cách gửi một cấu hình JSON đến Kafka Connect REST API. Tôi ưu tiên dùng curl cho việc này vì nó dễ dàng quản lý phiên bản trong một shell script.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "production_db",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schemahistory.inventory"
  }
}'

Hãy lưu ý đến topic.prefix. Chuỗi này sẽ là tiền tố của mọi tên topic trong Kafka. Ví dụ, một bảng có tên là customers sẽ truyền dữ liệu vào topic production_db.inventory.customers.

Quan sát luồng dữ liệu hoạt động

Sau khi connector hoạt động, Debezium sẽ thực hiện một bản “snapshot” dữ liệu hiện tại của bạn. Sau đó, nó chuyển sang chế độ streaming. Bạn có thể theo dõi quá trình này theo thời gian thực bằng cách tiêu thụ (consume) dữ liệu từ Kafka topic bằng console consumer có sẵn.

docker exec -it kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic production_db.inventory.customers \
    --from-beginning

Hãy thử chạy một truy vấn UPDATE trong MySQL. Gần như ngay lập tức, một đối tượng JSON sẽ xuất hiện trên terminal của bạn. Điểm hay của Debezium là cấu trúc payload của nó. Nó cung cấp một bản snapshot before (trước) và after (sau) của hàng dữ liệu. Điều này giúp việc xem chính xác những trường nào đã thay đổi trở nên cực kỳ dễ dàng mà không cần phải truy vấn lại cơ sở dữ liệu.

Bài học từ thực chiến

CDC rất mạnh mẽ, nhưng nó không phải là viên đạn bạc. Khi bạn mở rộng quy mô lên hàng trăm bảng, việc quản lý Kafka topic sẽ trở thành một công việc toàn thời gian. Bạn sẽ muốn tự động hóa việc tạo topic và thiết lập các chính sách lưu trữ (retention) nghiêm ngặt. Nếu không, dung lượng đĩa Kafka của bạn sẽ biến mất nhanh hơn bạn tưởng đấy.

Ngoài ra, hãy để mắt đến sức khỏe của connector. Tôi đã thấy nhiều connector bị lỗi do thay đổi schema đột ngột mà Debezium không lường trước được. Hãy luôn giám sát endpoint /status. Nếu trạng thái chuyển sang FAILED, quá trình đồng bộ sẽ dừng lại và độ trễ (lag) bắt đầu tích tụ. Trong một môi trường lưu lượng cao xử lý 10.000 sự kiện mỗi giây, chỉ cần vài phút gián đoạn cũng có thể dẫn đến một lượng dữ liệu tồn đọng khổng lồ.

Share: