Cái bẫy của giao tiếp đồng bộ
Trong những ngày đầu xây dựng microservices, REST API là “chiếc búa” tôi dùng cho mọi vấn đề. Nếu Order Service cần Inventory Service, nó sẽ gửi một yêu cầu POST và chờ đợi. Điều này hoạt động ổn với vài trăm người dùng. Tuy nhiên, khi chúng tôi đạt mức 5.000 request đồng thời, “ngôi nhà bài” đã sụp đổ. Một truy vấn cơ sở dữ liệu chậm trong một service hạ nguồn đã gây ra hiện tượng timeout 15 giây, làm tê liệt toàn bộ quy trình thanh toán. Mọi thứ bị ràng buộc quá chặt chẽ (tightly coupled).
Tôi nhanh chóng nhận ra rằng các tác vụ như gửi email chào mừng hoặc tạo hóa đơn PDF không cần phản hồi ngay lập tức. Khách hàng chỉ cần biết đơn hàng của họ đã được tiếp nhận. Bằng cách chuyển các tác vụ này sang mô hình hướng sự kiện (event-driven), chúng tôi đã loại bỏ sự phụ thuộc lẫn nhau giữa các service và cắt giảm hơn 80% thời gian phản hồi của API.
Tại sao Pub/Sub tiêu chuẩn không vượt qua bài kiểm tra độ tin cậy
Nhiều nhà phát triển bắt đầu với Redis Pub/Sub vì tốc độ của nó. Nó thực sự nhanh. Nhưng nó cũng là kiểu ‘gửi và quên’ (fire and forget). Nếu consumer service của bạn khởi động lại trong 10 giây để triển khai trong khi một tin nhắn được gửi đi, dữ liệu đó sẽ biến mất mãi mãi. Không có lịch sử và không có mạng lưới an toàn.
Apache Kafka là một lựa chọn thay thế mạnh mẽ, nhưng quản trị một cụm (cluster) Kafka là một công việc toàn thời gian. Đối với 90% các dự án, nó là quá mức cần thiết. Redis Streams lấp đầy khoảng trống này một cách hoàn hảo. Nó cung cấp cho bạn một nhật ký chỉ thêm (append-only log) bền bỉ và sức mạnh của ‘Consumer Groups’. Điều này có nghĩa là nhiều worker có thể chia sẻ khối lượng công việc, và nếu một worker gặp sự cố, worker khác có thể tiếp tục chính xác từ nơi worker cũ dừng lại—tất cả đều sử dụng chính instance Redis mà bạn đang chạy.
Thiết lập môi trường
Bạn sẽ cần Redis 5.0 trở lên. Tôi sử dụng Docker để tránh làm lộn xộn máy cục bộ với các phiên bản khác nhau.
# Khởi chạy Redis 7
docker run --name redis-streams -p 6379:6379 -d redis:7-alpine
Tiếp theo, hãy cài đặt thư viện Python. Tôi khuyên bạn nên sử dụng môi trường ảo (virtual environment) để giữ các dependency được tách biệt.
pip install redis
Xây dựng Pipeline
Thiết kế một hệ thống tin cậy không chỉ là di chuyển dữ liệu; đó còn là quản lý offset và xử lý lỗi một cách khéo léo. Hãy cùng xem xét cách triển khai.
1. Producer: Điều phối sự kiện
Hãy coi producer như một nhân viên nhập liệu. Thay vì ghi đè dữ liệu, nó sẽ thêm một bản ghi mới vào cuối stream bằng lệnh XADD.
import redis
import time
# decode_responses=True tự động chuyển đổi bytes sang strings
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def create_order(order_id, amount):
event = {
"order_id": order_id,
"amount": amount,
"status": "đã tạo"
}
# Dấu '*' tạo ra một ID duy nhất dựa trên timestamp (ví dụ: 1678912345-0)
msg_id = r.xadd("order_stream", event)
print(f"[Order Service] Đã tạo đơn hàng {order_id}. Stream ID: {msg_id}")
if __name__ == "__main__":
for i in range(1, 6):
create_order(f"ORD-100{i}", 25.50 * i)
time.sleep(0.5)
2. Khởi tạo Consumer Group
Trước khi các worker can bắt đầu, bạn phải định nghĩa một group. Điều này cho phép Redis theo dõi những tin nhắn nào đã được xác nhận (acknowledged). Sử dụng ký hiệu $ để yêu cầu group bỏ qua lịch sử cũ và chỉ bắt đầu với các tin nhắn mới.
# Chạy lệnh này trong terminal để khởi tạo group
redis-cli XGROUP CREATE order_stream inventory_group $ MKSTREAM
3. Consumer: Xử lý mạnh mẽ
Consumer sử dụng XREADGROUP. Điều kỳ diệu thực sự nằm ở XACK. Nếu code của bạn gặp lỗi trước khi xác nhận, tin nhắn sẽ nằm lại trong ‘Pending Entries List’ (PEL), chờ đợi để được thử lại thay vì biến mất.
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
GROUP = "inventory_group"
WORKER_ID = "worker_a"
def process_inventory():
print(f"[{WORKER_ID}] Đang chờ đơn hàng...")
while True:
# '>' nghĩa là: lấy các tin nhắn chưa từng được gửi tới bất kỳ worker nào khác
response = r.xreadgroup(GROUP, WORKER_ID, {"order_stream": ">"}, count=1, block=2000)
if not response:
continue
for stream, messages in response:
for msg_id, data in messages:
try:
print(f"[{WORKER_ID}] Đang trừ kho cho {data['order_id']}...")
# Giả sử việc cập nhật cơ sở dữ liệu diễn ra tại đây
# Thông báo cho Redis rằng chúng ta đã hoàn thành
r.xack("order_stream", GROUP, msg_id)
except Exception as e:
print(f"Lỗi xử lý: {e}")
if __name__ == "__main__":
process_inventory()
Kiểm tra sức khỏe vận hành
Đừng bao giờ chỉ tin vào log. Trong môi trường production, bạn cần biết liệu các consumer của mình có đang bị tụt lại phía sau hay không. Redis cung cấp lệnh XINFO để kiểm tra chi tiết bên trong.
Theo dõi độ trễ (Lag)
Chạy lệnh XINFO GROUPS order_stream. Hãy tìm trường ‘lag’. Nếu con số đó tăng đều đặn, đó là tín hiệu rõ ràng cho thấy bạn cần triển khai thêm nhiều worker instance để xử lý lưu lượng truy cập.
Giải cứu các tin nhắn bị bỏ rơi
Điều gì xảy ra nếu một worker bị ngừng hoạt động giữa chừng? Tin nhắn sẽ nằm trong Pending Entries List (PEL). Bạn có thể tìm thấy chúng bằng lệnh XPENDING. Một mô hình phổ biến là sử dụng một script ‘dead-letter’ kết hợp với XCLAIM để lấy các tin nhắn chờ cũ này và giao chúng cho một worker đang hoạt động tốt.
Việc chuyển đổi từ các chuỗi REST mong manh sang các stream dựa trên nhật ký (log-based streams) sẽ thay đổi hoàn toàn kiến trúc của bạn. Các service của bạn giờ đây có thể gặp sự cố, khởi động lại và mở rộng quy mô một cách độc lập mà không làm mất bất kỳ đơn hàng nào. Bạn không còn chỉ viết các script đơn thuần nữa; bạn đang xây dựng một hệ thống kiên cố được thiết kế để tồn tại trong thế giới thực.

