Apache Kafka cho Xử Lý Dữ Liệu Thời Gian Thực: Đánh Giá Sau 6 Tháng Chạy Production

Programming tutorial - IT technology blog
Programming tutorial - IT technology blog

Sáu Tháng Với Kafka: Những Điều Tôi Ước Ai Đó Đã Nói Trước

Lần đầu tiên nhóm tôi đề xuất thay thế pipeline ETL batch bằng Apache Kafka, tôi thực sự hoài nghi. Chúng tôi đang xử lý khoảng 2 triệu sự kiện mỗi ngày — đủ để cảm nhận nỗi đau từ những độ trễ hàng giờ, nhưng chưa đến mức chi phí vận hành của một nền tảng streaming phân tán cảm thấy xứng đáng.

Sáu tháng sau, sự hoài nghi đó đã biến mất. Kafka không chỉ giải quyết vấn đề độ trễ của chúng tôi. Nó thay đổi hoàn toàn cách chúng tôi nghĩ về luồng dữ liệu. Đây là hành trình thực tế đã diễn ra như thế nào.

Khởi Động Nhanh: Chạy Kafka Trong 5 Phút

Trước hết, hãy chạy Kafka trên máy local để bạn có thứ gì đó thực tế để thử nghiệm. Docker Compose là con đường nhanh nhất.

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker-compose up -d

# Tạo topic đầu tiên của bạn
docker exec -it <kafka-container> kafka-topics \
  --bootstrap-server localhost:9092 \
  --create --topic user-events \
  --partitions 3 \
  --replication-factor 1

# Kiểm tra topic đã tồn tại chưa
docker exec -it <kafka-container> kafka-topics \
  --bootstrap-server localhost:9092 \
  --list

Bây giờ gửi tin nhắn đầu tiên và nhận nó:

# Producer (terminal 1)
docker exec -it <kafka-container> kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic user-events

# Consumer (terminal 2)
docker exec -it <kafka-container> kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --from-beginning

Gõ gì đó ở terminal 1 và quan sát nó xuất hiện ở terminal 2 trong vài mili giây. Đó chính là vòng lặp cốt lõi của mọi thứ Kafka làm, ở quy mô khổng lồ.

Tìm Hiểu Sâu: Kafka Thực Sự Hoạt Động Như Thế Nào

Hầu hết các hướng dẫn đều bỏ qua hoàn toàn phần nội tại — đó là lý do tại sao Kafka trên production hay gây bất ngờ cho mọi người. Đây là mô hình tư duy bạn cần trước khi triển khai bất cứ thứ gì nghiêm túc.

Topics, Partitions và Offsets

Một topic đơn giản chỉ là một log sự kiện có tên. Các sự kiện được ghi thêm vào theo thứ tự và lưu trữ bền vững trên đĩa. Khác với message queue, Kafka không xóa message sau khi tiêu thụ. Đó chính là điều làm Kafka trở thành một log, không chỉ là một broker.

Các topic được chia thành partition. Mỗi partition là một chuỗi bản ghi có thứ tự, bất biến. Với 3 partition và 3 consumer trong một group, mỗi consumer sở hữu đúng một partition. Nhiều partition đồng nghĩa với nhiều tính song song hơn — đó là cách Kafka mở rộng theo chiều ngang.

Mỗi message trong một partition có một offset — một số nguyên tuần tự. Consumer tự theo dõi offset của mình, nhờ đó họ có thể phát lại, bỏ qua hoặc xử lý lại sự kiện bất cứ lúc nào. Khi còn quen với các hệ thống mà message đã tiêu thụ là biến mất mãi mãi, lần đầu dùng tính năng này tôi cảm giác như có siêu năng lực.

Producer và Consumer Trong Python

Thư viện confluent-kafka là thứ tôi dùng trong production. Nó bọc thư viện C chính thức và benchmark nhanh hơn khoảng 3–5× so với kafka-python cho các workload thông lượng cao — sự khác biệt thể hiện rõ ràng khi vượt ngưỡng ~50k message/giây. Nếu bạn mới bắt đầu với Python cho hệ thống, nắm vững các khái niệm về subprocess và I/O sẽ giúp ích nhiều khi debug consumer lag.

pip install confluent-kafka
# producer.py
from confluent_kafka import Producer
import json
import time

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    if err:
        print(f'Gửi thất bại: {err}')
    else:
        print(f'Đã gửi tới {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')

for i in range(100):
    event = {
        'user_id': f'user_{i % 10}',
        'action': 'page_view',
        'timestamp': time.time()
    }
    producer.produce(
        'user-events',
        key=event['user_id'],  # Cùng key luôn đi vào cùng partition
        value=json.dumps(event).encode('utf-8'),
        callback=delivery_report
    )
    producer.poll(0)

producer.flush()
# consumer.py
from confluent_kafka import Consumer
import json

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'analytics-service',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['user-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Lỗi consumer: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))
        print(f"Đang xử lý: user={event['user_id']}, action={event['action']}")
finally:
    consumer.close()

Một điều tôi đã học được theo cách đau đớn: luôn đặt consumer group ID có ý nghĩa. Nếu hai service vô tình dùng chung group ID, chúng sẽ chia nhau các partition và mỗi service sẽ bỏ lỡ một nửa sự kiện. Mất dữ liệu âm thầm là kiểu lỗi tệ nhất — không có error, không có cảnh báo, chỉ là những con số thiếu trên dashboard ba ngày sau.

Sử Dụng Nâng Cao: Xây Dựng Pipeline Thực Tế

Một producer và consumer đơn lẻ chỉ là demo. Pipeline thực tế có nhiều giai đoạn — và đó là lúc thiết kế của Kafka bắt đầu thể hiện giá trị.

Kết Nối Các Topic Cho Xử Lý Đa Giai Đoạn

Mỗi topic trở thành một giai đoạn trong pipeline của bạn:

  • raw-events — tất cả dữ liệu đến từ ứng dụng
  • validated-events — sau khi xác thực schema và loại trùng lặp
  • enriched-events — sau khi kết hợp với dữ liệu hồ sơ người dùng
  • analytics-aggregates — các chỉ số đã được tính toán cuối cùng

Mỗi microservice tiêu thụ từ một topic và sản xuất sang topic tiếp theo. Nếu bạn đang xây dựng các service backend đi kèm, REST API với FastAPI là lựa chọn tốt để expose dữ liệu đã được xử lý. Mở rộng độc lập, ranh giới lỗi rõ ràng, và khả năng phát lại bất kỳ giai đoạn nào mà không ảnh hưởng các giai đoạn khác.

Consumer Groups Để Mở Rộng Theo Chiều Ngang

# Mở rộng bằng cách chạy nhiều instance với cùng group.id
# Kafka tự động cân bằng lại partition giữa các consumer đang hoạt động

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'analytics-service',  # Cùng group = cân bằng tải
    'auto.offset.reset': 'latest',
    'enable.auto.commit': False  # Commit thủ công để đảm bảo xử lý đúng một lần
})

consumer.subscribe(['validated-events'])

while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        process_event(json.loads(msg.value()))
        consumer.commit(asynchronous=False)  # Commit sau khi xử lý thành công

Kafka Streams Cho Xử Lý Có Trạng Thái

Các phép tổng hợp như “đếm sự kiện theo người dùng trong 5 phút qua” yêu cầu xử lý stream có trạng thái. Trên JVM, Kafka Streams là lựa chọn tự nhiên. Với Python, Faust xử lý điều này khá gọn gàng:

pip install faust
import faust

app = faust.App('analytics', broker='kafka://localhost:9092')

events_topic = app.topic('validated-events')

# Bảng theo cửa sổ thời gian: đếm sự kiện theo user trong cửa sổ 5 phút
user_event_counts = app.Table(
    'user-event-counts',
    default=int,
).tumbling(300.0)  # Cửa sổ tumbling 5 phút

@app.agent(events_topic)
async def process(events):
    async for event in events.group_by(lambda e: e['user_id']):
        user_event_counts[event['user_id']] += 1
        print(f"User {event['user_id']}: {user_event_counts[event['user_id']].current()} sự kiện")

Mẹo Thực Tế Từ Môi Trường Production

Chạy được Kafka chỉ là phần dễ. Duy trì nó hoạt động ổn định dưới tải nặng mà không bị gọi lúc 3 giờ sáng — đó mới là phần thú vị. Những mẹo dưới đây là những gì tôi học được theo cách tốn kém nhất.

Số Lượng Partition Là Quyết Định Một Chiều

Bạn có thể tăng partition sau này, nhưng không thể giảm. Tệ hơn, việc tăng partition phá vỡ thứ tự dựa trên key — các message có cùng key giờ có thể rơi vào các partition khác nhau. Hãy suy nghĩ kỹ về số lượng partition ngay từ đầu. Một quy tắc ngón tay cái có thể áp dụng: nhắm tới 2–3× số consumer peak dự kiến. Với một topic lưu lượng trung bình xử lý ~10k sự kiện/giây, 50 partition thường là hơn đủ.

Theo Dõi Lag, Không Phải Throughput

Throughput cho bạn biết bạn đang xử lý nhanh đến đâu. Lag cho bạn biết bạn đang chậm trễ bao nhiêu. Một consumer có thể hiển thị throughput tốt và vẫn đang tụt lại phía sau nếu producer nhanh hơn. Hãy thiết lập cảnh báo lag — tôi dùng Burrow, hoặc kết hợp với hệ thống log tập trung như Grafana Loki để có cái nhìn toàn cảnh hơn. CLI kafka-consumer-groups tích hợp sẵn cũng dùng được khi cần:

kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe \
  --group analytics-service

Nếu cột LAG đang tăng, bạn cần thêm consumer. Bạn có thể thêm tối đa một consumer mỗi partition trước khi chạm trần.

Đặt Thời Gian Lưu Trữ Dựa Trên Cửa Sổ Phát Lại Của Bạn

Thời gian lưu trữ mặc định của Kafka là 7 ngày. Hãy tự hỏi: trong một sự cố nghiêm trọng, bạn có thể cần phát lại bao xa? Với luồng chính của chúng tôi, 3 ngày là đủ. Các topic quan trọng được giữ 30 ngày, cấu hình ở cấp độ topic:

kafka-configs \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name user-events \
  --alter \
  --add-config retention.ms=2592000000  # 30 ngày tính bằng mili giây

Dùng Schema Registry Trên Production

JSON thuần túy ổn trong môi trường phát triển. Trên production, schema drift cuối cùng sẽ làm hỏng pipeline của bạn — một trường được đổi tên ở upstream sẽ âm thầm phá vỡ các consumer ở downstream. Confluent Schema Registry với Avro hoặc Protobuf thực thi tính tương thích ở cấp độ broker. Cần thiết lập thêm, đúng. Nhưng tôi đã chứng kiến nó bắt được những thay đổi phá vỡ tương thích có thể gây mất dữ liệu ít nhất ba lần riêng biệt.

Đừng Sợ Rebalancing, Nhưng Hãy Xử Lý Nó Đúng Cách

Khi một consumer tham gia hoặc rời khỏi group, Kafka kích hoạt rebalance. Quá trình xử lý tạm dừng vài giây trên toàn bộ group. Nếu bạn đang commit offset thủ công, hãy commit trong callback on_revoke trước khi các partition được phân công lại — bỏ qua bước này và bạn sẽ xử lý lại cùng các sự kiện sau mỗi lần rebalance:

from confluent_kafka import Consumer, TopicPartition

def on_revoke(consumer, partitions):
    # Commit trước khi partition bị thu hồi
    consumer.commit(asynchronous=False)

consumer.subscribe(['user-events'], on_revoke=on_revoke)

Kafka Có Phù Hợp Với Trường Hợp Của Bạn Không?

Kafka tỏa sáng với event streaming thông lượng cao, log bền vững và tách rời các service ở quy mô lớn. Với task queue đơn giản, đây là lựa chọn quá mức cần thiết — Redis hoặc RabbitMQ sẽ phục vụ bạn tốt hơn với một phần nhỏ độ phức tạp vận hành. Kafka cũng đòi hỏi công việc ops thực sự: trạng thái phân tán, quyết định về replication factor, chính sách lưu trữ và giám sát consumer lag đều cần được chú ý.

Sau sáu tháng: nếu bạn đang xử lý hàng chục nghìn sự kiện mỗi giây qua nhiều service, Kafka xứng đáng với từng chút effort bỏ ra để học. Nếu bạn đang ở mức vài trăm sự kiện mỗi phút, hãy bắt đầu đơn giản hơn và chuyển sang khi nỗi đau trở nên không thể chối từ. Kafka vẫn sẽ ở đó — và bạn sẽ trân trọng nó hơn khi thực sự cần đến.

Share: