Kafka cho Java Backend Engineer
Table of contents
Trước khi nhìn vào Kafka, hãy nhìn vào thứ nó thay thế. Hầu hết bug nghiêm trọng liên quan đến Kafka không đến từ việc không biết API, mà đến từ việc không hiểu tại sao từng cơ chế được thiết kế theo cách đó. Bài này giải thích Kafka từ vấn đề, không phải từ định nghĩa.
Tổng quan kiến trúc
Trước khi đi vào từng khái niệm, đây là bức tranh toàn cảnh:
Producer gửi message vào Topic, chọn Partition dựa trên partition key. Mỗi Consumer Group đọc toàn bộ topic độc lập. Trong một group, các Partition được chia đều cho từng consumer instance để xử lý song song.
Tại sao Kafka tồn tại?
Xét một hệ thống e-commerce điển hình. Khi user đặt hàng, cần xảy ra nhiều việc: gửi email xác nhận, trừ tồn kho, ghi nhận analytics, cộng điểm loyalty. Cách tự nhiên nhất là gọi tuần tự:
@Service
public class OrderService {
@Transactional
public Order placeOrder(CreateOrderRequest req) {
Order order = orderRepository.save(new Order(req));
emailService.sendConfirmation(order); // HTTP call tới Email Service
inventoryService.reserveStock(order); // HTTP call tới Inventory Service
analyticsService.track(order); // HTTP call tới Analytics Service
loyaltyService.addPoints(order); // HTTP call tới Loyalty Service
return order;
}
}
Cái gì vỡ: Bốn điểm failure độc lập, nhưng chúng không thực sự độc lập. Nếu emailService timeout sau 3 giây, toàn bộ request chờ. Nếu inventoryService down, bạn phải rollback transaction, nhưng emailService đã gửi email rồi. Nếu analyticsService trả về 500, order có nên fail không? Câu trả lời nào bạn chọn đều sai theo một cách nào đó.
Tệ hơn: khi traffic tăng gấp 10 lần trong Black Friday, cả bốn service bị hammered đồng thời. Một service chậm kéo tất cả còn lại chậm theo. Thêm service mới (ví dụ fraud detection) đòi hỏi sửa OrderService.
Kafka tách bạch điều này. OrderService publish một event và return ngay lập tức. Mỗi downstream service consume độc lập, ở tốc độ của riêng nó, và nếu nó down thì nó tự catch up khi restart mà không ảnh hưởng ai.
@Service
public class OrderService {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public Order placeOrder(CreateOrderRequest req) {
Order order = orderRepository.save(new Order(req));
kafkaTemplate.send("order-events", order.getId().toString(), new OrderEvent(order));
return order; // return ngay, không chờ downstream
}
}
Trade-off: Bạn mất synchronous confirmation. Bạn không thể ngay lập tức trả lời user “điểm loyalty của bạn đã được cộng.” Hệ thống trở thành eventually consistent. Nếu business logic yêu cầu tất cả các bước phải thành công trước khi trả về response, Kafka không phải công cụ đúng cho use case đó, bạn cần distributed transaction hoặc saga pattern.
Topic và Partition: Tại sao không phải một queue đơn giản?
Topic là một named stream của records, append-only. Partition là subdivision của topic. Records trong cùng một partition được đảm bảo thứ tự. Records across partitions thì không.
Tại sao partitions tồn tại: Trong một Consumer Group, mỗi partition chỉ được assign cho đúng một consumer. Nghĩa là nếu topic có 6 partitions, bạn có thể chạy tối đa 6 consumer instances xử lý song song. Đây là cơ chế scaling duy nhất của Kafka ở phía consumer.
Cái gì vỡ nếu không có partition: Topic có 1 partition thì chỉ có 1 consumer xử lý tại một thời điểm. 50,000 orders/ngày, mỗi order mất 200ms để xử lý = tối đa 432,000 orders/ngày throughput. Nghe ổn cho đến khi có flash sale và 200,000 orders đổ vào trong 2 giờ. Bạn không thể scale bằng cách thêm consumer instance vì chỉ có 1 partition.
Production example: Partition key quyết định record vào partition nào. Chọn key sai phá vỡ ordering guarantee.
@Service
public class OrderEventProducer {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void publish(Order order) {
kafkaTemplate.send(
"order-events",
order.getCustomerId().toString(), // partition key
new OrderEvent(order)
);
}
}
Với customerId làm key, tất cả events của customer 42 đi vào cùng một partition, đảm bảo thứ tự xử lý per-customer. Nếu dùng orderId làm key thay vào đó, hai events của cùng customer (order placed, order cancelled) có thể đi vào partition khác nhau và được xử lý không theo thứ tự.
Trường hợp thực tế nơi điều này quan trọng: hệ thống reward cộng điểm khi order hoàn thành và trừ điểm khi order bị cancel. Nếu “cancel” được xử lý trước “complete” vì chúng ở partition khác nhau, bạn trừ điểm từ số dư chưa cộng, dẫn đến số dư âm hoặc lỗi.
Trade-off: Nhiều partitions hơn cho phép nhiều parallelism hơn, nhưng mỗi partition là một file trên disk của broker. 1000 partitions trên cluster nhỏ ngốn file descriptor, tốn memory cho metadata, và làm leader election chậm hơn khi có broker failure. Điểm bắt đầu thực tế: tính throughput cần thiết, chia cho throughput per-consumer, làm tròn lên số lẻ. Đừng set 200 partitions “để dễ scale sau.”
Consumer Group: Tại sao không phải mỗi consumer đọc riêng?
Consumer Group là tập hợp consumers chia nhau công việc consume một topic. Kafka assign mỗi partition cho đúng một consumer trong group.
Tại sao consumer group tồn tại: Nhiều service độc lập cần toàn bộ events, nhưng mỗi service lại muốn scale internally. Hai yêu cầu này mâu thuẫn nhau nếu không có cơ chế group.
Nếu Email Service, Inventory Service, và Analytics Service cùng join một consumer group, Kafka chia partitions giữa chúng. Email Service nhận partition 0, Inventory nhận partition 1, Analytics nhận partition 2. Mỗi service chỉ thấy 1/3 số events.
Topic: order-events (3 partitions)
Consumer Group "email-service":
email-instance-1 --> Partition 0
email-instance-2 --> Partition 1
email-instance-3 --> Partition 2
(mỗi instance xử lý 1/3 events, tổng = 100% events cho email)
Consumer Group "inventory-service":
inventory-instance-1 --> Partition 0, Partition 1
inventory-instance-2 --> Partition 2
(tổng = 100% events cho inventory)
Mỗi group đọc độc lập, có offset riêng, nhận toàn bộ events.
Bug thực tế khi dùng chung group:
// Wrong: ba service dùng chung groupId
@KafkaListener(topics = "order-events", groupId = "order-processors")
public void handleInEmailService(OrderEvent event) { ... }
@KafkaListener(topics = "order-events", groupId = "order-processors")
public void handleInInventoryService(OrderEvent event) { ... }
Kafka thấy hai consumer trong cùng group. Nó assign partition cho từng consumer. Mỗi event chỉ đến một trong hai. Email service gửi email cho một nửa orders, inventory service chỉ reserve stock cho nửa còn lại. Không có exception nào, chỉ có business logic âm thầm sai.
Production example đúng:
// EmailService application
@KafkaListener(topics = "order-events", groupId = "email-service")
public void sendConfirmation(OrderEvent event) {
emailClient.send(event.getCustomerEmail(), buildEmailTemplate(event));
}
// InventoryService application
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void reserveStock(OrderEvent event) {
event.getItems().forEach(item ->
inventoryRepository.decrementStock(item.getProductId(), item.getQuantity())
);
}
Trade-off: Mỗi consumer group giữ offset riêng. Một group bị stuck (bug khiến consumer không commit offset) không ảnh hưởng group khác, nhưng nó giữ Kafka phải retain messages lâu hơn, tốn disk. Monitor consumer lag trên từng group. Group nào có lag tăng không dừng là có vấn đề: consumer chậm, hoặc consumer crash.
Offset và Retention: Tại sao không xóa message sau khi đọc?
Message queue truyền thống xóa message sau khi consumer acknowledge. Kafka giữ message theo thời gian (default 7 ngày) hoặc theo dung lượng, bất kể đã đọc hay chưa. Consumer tự track offset của mình.
Tại sao thiết kế này tồn tại:
Thứ nhất: nhiều consumer groups đọc cùng tốc độ khác nhau mà không cản nhau. Thứ hai, và quan trọng hơn: bạn có thể replay.
Cái gì vỡ nếu không có retention:
14:00 - InventoryService deploy code mới với bug trừ stock sai
16:00 - Bug được phát hiện. Stock của 2000 products đã bị trừ sai
Với delete-on-consume queue:
2 giờ events đã bị xóa sau khi consume.
Bạn phải reconcile thủ công từ database backup.
Mất hàng giờ downtime để fix.
Với Kafka (7-day retention):
Reset offset của inventory-service về 14:00.
Fix bug, redeploy.
Inventory service reprocess 2 giờ events với logic đúng.
Stock tự động được correct.
Production example: programmatic offset reset
@Component
public class ConsumerOffsetService {
private final AdminClient adminClient;
private final ConsumerFactory<String, Object> consumerFactory;
public void resetGroupToTimestamp(String topic, String groupId, Instant targetTime) {
try (Consumer<String, Object> consumer =
consumerFactory.createConsumer(groupId, "offset-reset-client")) {
List<TopicPartition> partitions = adminClient
.describeTopics(List.of(topic))
.allTopicNames().get()
.get(topic)
.partitions()
.stream()
.map(p -> new TopicPartition(topic, p.partition()))
.toList();
consumer.assign(partitions);
Map<TopicPartition, Long> timestampMap = partitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> targetTime.toEpochMilli()));
consumer.offsetsForTimes(timestampMap).forEach((tp, offsetTs) -> {
if (offsetTs != null) {
consumer.seek(tp, offsetTs.offset());
}
});
consumer.commitSync();
log.info("Reset group {} on topic {} to {}", groupId, topic, targetTime);
}
}
}
Trade-off: Retention dài tốn disk. 7 ngày retention trên topic có throughput 500MB/giờ tốn 84GB/replica. Với replication factor 3, là 252GB chỉ cho topic đó. Tune retention theo nhu cầu replay thực tế. Analytics event cần replay 30 ngày? Cân nhắc log compaction thay vì time-based retention. Payment event chỉ cần replay trong SLA 24 giờ? Set retention.ms=86400000.
Producer Acknowledgment: Tại sao không fire-and-forget?
Khi producer gửi message, nó có thể chờ broker xác nhận ở ba mức độ khác nhau, được kiểm soát bởi acks.
acks=0: Gửi và không chờ. Throughput cao nhất, nhưng nếu broker crash sau khi nhận network packet mà chưa write to disk, message mất.acks=1: Chờ partition leader ghi vào local log. Nếu leader crash trước khi replicate sang follower, message mất.acks=all: Chờ tất cả in-sync replicas (ISR) xác nhận. Chậm nhất, bảo đảm durability cao nhất.
Cái gì vỡ khi chọn sai:
Scenario: payment-completed event dùng acks=0
- Producer gửi "payment #9981 completed, amount 2,500,000 VND"
- Broker nhận packet nhưng crash ngay trước khi flush to disk
- Message mất
- Downstream services (fulfillment, loyalty, accounting) không bao giờ nhận event
- Order mãi ở trạng thái PAYMENT_PENDING
- Customer đã bị trừ tiền nhưng hàng không được giao
Production config đúng:
@Configuration
public class KafkaProducerConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Cho business-critical events (payment, order, inventory)
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // tránh duplicate khi retry
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
return config;
}
@Bean("analyticsProducerFactory")
public ProducerFactory<String, Object> analyticsProducerFactory() {
Map<String, Object> config = new HashMap<>(producerConfigs());
// Analytics event: throughput quan trọng hơn durability tuyệt đối
config.put(ProducerConfig.ACKS_CONFIG, "1");
config.put(ProducerConfig.LINGER_MS_CONFIG, 5); // batch nhỏ để tăng throughput
return new DefaultKafkaProducerFactory<>(config);
}
}
Trade-off: acks=all thêm latency bằng thời gian replica chậm nhất trong ISR. Nếu một replica đang GC pause hoặc mạng chậm, mọi produce call đều chờ nó. Monitor ISR shrink alert. Khi ISR giảm từ 3 xuống 2 (một replica bị loại khỏi ISR vì chậm), acks=all thực chất chỉ chờ 2 replicas, tức là durability giảm. Đây là lúc cần kiểm tra infra, không phải lúc giảm acks.
At-least-once và Idempotency: Bài toán khó nhất
Kafka mặc định đảm bảo at-least-once delivery. Một message có thể được deliver nhiều hơn một lần nếu consumer crash sau khi xử lý nhưng trước khi commit offset.
Cái gì vỡ nếu không có idempotency:
Timeline:
1. Consumer đọc event "payment-completed" cho order #5566
2. Consumer gọi stripeClient.charge(orderId=5566, amount=1_200_000)
3. Stripe xử lý thành công, trả về charge ID
4. Consumer chuẩn bị commit offset -> crash (OOM, network drop, deployment)
5. Consumer restart, Kafka deliver lại cùng event vì offset chưa được commit
6. Consumer gọi stripeClient.charge(orderId=5566, amount=1_200_000) lần hai
7. Customer bị charge 2,400,000 VND thay vì 1,200,000 VND
Đây là một trong những bug tốn kém nhất trong fintech. Không có exception, không có error log, chỉ có tiền bị trừ hai lần.
Production example: idempotency key pattern
@KafkaListener(
topics = "payment-events",
groupId = "payment-processor",
containerFactory = "paymentKafkaListenerFactory"
)
public void processPayment(PaymentEvent event, Acknowledgment ack) {
String idempotencyKey = event.getEventId(); // UUID được gán lúc publish
if (processedEventRepository.existsByEventId(idempotencyKey)) {
log.info("Skipping duplicate event {}", idempotencyKey);
ack.acknowledge();
return;
}
try {
transactionTemplate.execute(status -> {
// Ghi event ID và xử lý payment trong cùng một transaction
processedEventRepository.save(new ProcessedEvent(
idempotencyKey,
Instant.now(),
event.getOrderId()
));
ChargeResult result = stripeClient.charge(
event.getOrderId(),
event.getAmount(),
idempotencyKey // Stripe cũng hỗ trợ idempotency key
);
orderRepository.updatePaymentStatus(event.getOrderId(), result.getChargeId());
return null;
});
ack.acknowledge(); // commit offset sau khi transaction thành công
} catch (DuplicateKeyException e) {
// Race condition: hai instances xử lý cùng event concurrently
// Một trong hai đã thành công, bỏ qua an toàn
log.info("Concurrent duplicate resolved for event {}", idempotencyKey);
ack.acknowledge();
} catch (Exception e) {
// Không acknowledge: Kafka sẽ redeliver
log.error("Failed to process payment event {}", idempotencyKey, e);
}
}
Lưu ý: processedEventRepository.save() và business logic phải nằm trong cùng một transaction. Nếu lưu event ID thành công nhưng stripe charge fail, transaction rollback, event ID bị xóa, và retry đúng đắn sẽ xảy ra. Nếu stripe charge thành công nhưng lưu event ID fail, transaction rollback, stripe charge không được rollback (side effect bên ngoài), nhưng Stripe idempotency key đảm bảo retry sẽ trả về kết quả cũ thay vì charge lại.
Trade-off: Exactly-once semantics tồn tại trong Kafka thông qua transactions và isolation.level=read_committed, nhưng đi kèm chi phí performance đáng kể (thêm một round-trip để commit transaction marker) và phức tạp khi vận hành. Với phần lớn use case, at-least-once với idempotent consumer là lựa chọn đúng. Chỉ dùng exactly-once khi side effect của consumer không thể tự make idempotent, ví dụ ghi vào external system không hỗ trợ idempotency key.
Tổng kết
| Concept | Tại sao tồn tại | Cài sai thì mất gì |
|---|---|---|
| Topic + Partition | Scale consumer throughput | 1 partition = không thể scale |
| Consumer Group | Nhiều services nhận toàn bộ events | Dùng chung group = mỗi service chỉ thấy 1/N events |
| Retention + Offset | Replay, nhiều consumer độc lập | Delete-on-consume = không thể reprocess khi có bug |
acks=all | Durability khi broker fail | acks=0 trên payment topic = mất giao dịch |
| Idempotency | At-least-once có thể deliver trùng | Thiếu idempotency = charge khách hàng nhiều lần |
Kafka không giải quyết distributed systems problems, nó cho bạn công cụ để giải quyết chúng theo cách có kiểm soát. Bạn vẫn phải tự xử lý idempotency, tự monitor consumer lag, và tự thiết kế cho eventual consistency. Hiểu tại sao từng cơ chế tồn tại là điều kiện tiên quyết để dùng đúng.