Trang chủ Engineering

Kafka cho người mới bắt đầu: Hoạt động như thế nào và tại sao tồn tại

03 March 2026 · 19 phút đọc
Mục lục

Hầu hết tài liệu về Kafka bắt đầu bằng câu “Kafka là một nền tảng event streaming phân tán.” Câu đó chính xác nhưng không giúp ích được gì. Bài này bắt đầu từ vấn đề Kafka được xây dựng để giải quyết, sau đó giải thích từng thành phần từ góc độ tại sao nó tồn tại, không phải từ tên gọi của nó.


Phần 1: Tại sao Kafka tồn tại

Vấn đề với việc gọi trực tiếp giữa các service

Giả sử bạn có một hệ thống e-commerce. Khi người dùng đặt hàng, nhiều việc phải xảy ra: inventory service trừ tồn kho, notification service gửi email xác nhận, analytics service ghi lại đơn hàng, và fraud detection service chạy kiểm tra.

Cách đơn giản nhất: order service gọi trực tiếp đến từng downstream service.

Order Service
    |
    |-- HTTP --> Inventory Service
    |-- HTTP --> Notification Service
    |-- HTTP --> Analytics Service
    |-- HTTP --> Fraud Detection Service

Cách này hoạt động cho đến khi không còn hoạt động nữa. Những gì bị phá vỡ:

Tight coupling. Order service phải biết về mọi downstream consumer. Khi team analytics muốn thêm event mới, họ sửa order service. Khi fraud detection thay đổi API, order service cũng phải thay đổi. Mỗi consumer mới là một dependency mới mà order service phải quản lý.

Chuỗi khả dụng. Nếu notification service đang down, order service hoặc phải fail toàn bộ đơn hàng, hoặc phải tự implement retry logic cho từng downstream service. Bốn consumer nghĩa là bốn failure mode khác nhau cần xử lý.

Tốc độ không đồng đều. Fraud detection có thể mất 300ms. Khách hàng không nên đợi 300ms chỉ vì một downstream service chậm. Order service hoặc block chờ tất cả, hoặc tự quản lý parallel call với timeout riêng cho từng service.

Không thể replay. Nếu analytics bị down hai tiếng và bỏ lỡ 10,000 event, những event đó mất đi vĩnh viễn. Không có cách nào gửi lại chúng.

Kafka thay đổi điều gì

Kafka đưa một broker vào giữa producer và consumer. Order service publish một event. Mọi downstream service đọc event đó độc lập.

Order Service --> Kafka --> Inventory Service
                       --> Notification Service
                       --> Analytics Service
                       --> Fraud Detection Service

Lúc này:

  • Order service không biết và không quan tâm ai đọc event. Thêm consumer mới không cần thay đổi producer.
  • Nếu notification bị down, event của nó tích lũy trong Kafka. Khi phục hồi, nó đọc từ chỗ nó dừng lại.
  • Nếu analytics bỏ lỡ hai tiếng, nó replay hai tiếng đó từ log được lưu trong Kafka.
  • Fraud detection chạy độc lập theo tốc độ của nó mà không làm chậm response của đơn hàng.

Trade-off: Bạn đã đổi sự đơn giản đồng bộ lấy sự phức tạp bất đồng bộ. Bạn không còn nhận được xác nhận ngay lập tức rằng mọi downstream service đã thành công. Đơn hàng được chấp nhận, nhưng email có được gửi hay không là một câu hỏi riêng được trả lời bất đồng bộ. Đây là trade-off đúng đắn cho hệ thống throughput cao. Đây là trade-off sai khi bạn cần phản hồi đồng bộ ngay lập tức từ downstream service trước khi tiếp tục.


Phần 2: Topic và Partition

Topic: kênh được đặt tên cho các event

Topic là một log được đặt tên của các event. Producer ghi vào topic; consumer đọc từ topic. Hãy nghĩ topic như một bảng trong database, ngoại trừ bạn chỉ có thể append vào nó và việc đọc không xóa bản ghi.

// Producer ghi vào topic "orders"
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, orderJson);
producer.send(record);

Tại sao partition tồn tại

Một topic có thể nhận hàng triệu event mỗi giây. Một máy không thể ghi hoặc đọc nhanh như vậy. Kafka chia topic thành các partition, mỗi partition là một log độc lập có thứ tự được lưu trên một broker khác nhau.

Topic: "orders"

Partition 0: [msg0] [msg3] [msg6] [msg9] ...
Partition 1: [msg1] [msg4] [msg7] [msg10] ...
Partition 2: [msg2] [msg5] [msg8] [msg11] ...

Trong một partition, thứ tự được đảm bảo. Giữa các partition, không có đảm bảo thứ tự. Đây là trade-off cơ bản: parallelism đổi lấy thứ tự toàn cục.

Điều gì xảy ra nếu không có partition: Nếu một topic chỉ có một partition, một consumer có thể xử lý nó và throughput bị giới hạn bởi một máy. Với 10 partition, 10 consumer có thể đọc song song, tăng throughput lên khoảng 10 lần.

Kafka quyết định message nào vào partition nào như thế nào:

// Nếu cung cấp key, Kafka hash key để chọn partition
// Tất cả message có cùng key luôn vào cùng partition
// -> thứ tự được đảm bảo theo từng key
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    customerId,   // key: cùng customer luôn vào cùng partition
    orderJson     // value
);

// Nếu không có key, Kafka phân phối round-robin qua các partition
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderJson);

Production example: Order service gửi event với key là customerId. Mọi event của customer 42 đều vào partition 3 theo thứ tự. Fraud detection consumer đọc partition 3 thấy tất cả event của customer 42 theo đúng thứ tự, điều cần thiết để phát hiện velocity patterns (ba đơn hàng trong một phút).

Drag · Scroll to zoom

Trade-off: Số partition được đặt khi tạo topic và khó thay đổi sau này (tăng partition làm thay đổi ánh xạ hash cho key, phá vỡ thứ tự per-key cho consumer hiện tại). Bắt đầu với nhiều partition hơn nhu cầu hiện tại. Điểm khởi đầu phổ biến cho topic traffic trung bình là 12 đến 24 partition.


Phần 3: Broker và Cluster

Kafka cluster là một nhóm server broker. Mỗi broker lưu một số partition. Kafka replicate mỗi partition qua nhiều broker để nếu một broker bị lỗi, dữ liệu không bị mất.

Broker 1: Partition 0 (leader), Partition 1 (replica)
Broker 2: Partition 1 (leader), Partition 2 (replica)
Broker 3: Partition 2 (leader), Partition 0 (replica)

Mỗi partition có một leader broker xử lý tất cả reads và writes cho partition đó. Các broker khác giữ bản sao gọi là follower, replicate từ leader.

Điều gì xảy ra nếu không có replication: Nếu Broker 1 giữ bản sao duy nhất của Partition 0 và Broker 1 bị crash, tất cả message trong Partition 0 mất đi và producer không thể ghi message mới cho đến khi broker phục hồi. Với replication factor 3, Broker 1 bị lỗi không phải là thảm họa: Broker 3 (đang giữ replica) được tự động promote lên làm leader.

Replication factor kiểm soát số bản sao tồn tại. Replication factor 1 nghĩa là không có dự phòng. Replication factor 3 nghĩa là dữ liệu sống sót qua việc mất bất kỳ một broker nào. Hầu hết topic production dùng replication factor 3.


Phần 4: Producer

Producer làm gì

Producer là bất kỳ code nào ghi message vào Kafka. Java producer client tự động xử lý quản lý kết nối, serialization, chọn partition, batching, và retry.

// Cấu hình producer tối giản
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// send là bất đồng bộ theo mặc định
Future<RecordMetadata> future = producer.send(
    new ProducerRecord<>("orders", customerId, orderJson)
);

// Luôn close để flush các record đang buffer và giải phóng kết nối
producer.close();

Mức độ acknowledgment

Cài đặt acks kiểm soát số broker phải xác nhận write trước khi Kafka thông báo producer thành công. Đây là cấu hình producer quan trọng nhất cho độ tin cậy.

// acks=0: fire and forget
// Producer không đợi bất kỳ xác nhận nào
// Nhanh nhất, nhưng message có thể mất nếu broker crash ngay sau khi nhận
props.put(ProducerConfig.ACKS_CONFIG, "0");

// acks=1: leader xác nhận
// Broker leader xác nhận write vào log của nó
// Message mất nếu leader crash trước khi replicate sang follower
props.put(ProducerConfig.ACKS_CONFIG, "1");

// acks=all (hoặc -1): tất cả in-sync replica xác nhận
// Message bền vững miễn là có ít nhất một replica còn sống
// Chậm nhất, nhưng không mất dữ liệu khi một broker đơn bị lỗi
props.put(ProducerConfig.ACKS_CONFIG, "all");

Quy tắc production: Dùng acks=all cho bất kỳ dữ liệu quan trọng nào. Dùng acks=1 cho metrics hoặc log volume cao nơi mất mát occasional là chấp nhận được. Không bao giờ dùng acks=0 trừ khi đang benchmark throughput và có thể chấp nhận mất message.

Batching

Producer không gửi một network request cho mỗi message. Nó buffer record và gửi theo batch, giảm đáng kể network overhead.

// Producer đợi bao lâu để fill batch trước khi gửi
props.put(ProducerConfig.LINGER_MS_CONFIG, "5"); // đợi tối đa 5ms

// Kích thước batch tối đa tính bằng bytes
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB mặc định

// Bật nén snappy cho batch
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

Với linger.ms=5, producer đợi 5ms trước khi gửi. Nếu 1,000 message đến trong 5ms đó, chúng đi ra trong một network call thay vì 1,000 lần. Trade-off: mỗi message đợi tối đa 5ms trước khi được gửi. Với path nhạy cảm về latency, giữ linger.ms ở 0 hoặc 1.


Phần 5: Consumer và Consumer Group

Offset: Kafka theo dõi những gì bạn đã đọc như thế nào

Mỗi message trong partition có một số tuần tự gọi là offset. Offset 0 là message đầu tiên được ghi, offset 1 là message thứ hai, cứ thế tiếp tục. Khi consumer đọc message, nó commit offset hiện tại về Kafka. Nếu consumer crash và khởi động lại, nó tiếp tục từ offset đã commit cuối cùng.

Partition 0 offsets:
  0       1       2       3       4       5
[msg0] [msg1] [msg2] [msg3] [msg4] [msg5]
                      ^
              committed offset = 2
              (consumer đã xử lý đến msg2)
              lần đọc tiếp theo bắt đầu từ offset 3

Điều gì xảy ra nếu không commit offset: Nếu bạn đọc message nhưng không bao giờ commit offset, mỗi lần khởi động lại đều đọc từ đầu topic. Nếu topic có 6 tháng lịch sử đơn hàng, mỗi lần restart sẽ xử lý lại tất cả.

Điều gì xảy ra với auto-commit: Cài đặt mặc định của Kafka (enable.auto.commit=true) commit offset mỗi 5 giây bất kể code của bạn có xử lý thành công message hay không. Nếu code của bạn đọc một batch, auto-commit kích hoạt, rồi việc xử lý crash trước khi hoàn thành: những message đó được đánh dấu là xong dù không được xử lý. Bạn mất chúng.

// An toàn hơn: tắt auto-commit và commit thủ công sau khi xử lý
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    processOrder(record.value()); // xử lý trước
}
consumer.commitSync(); // rồi commit: chỉ đến đây nếu xử lý thành công

Consumer group: đọc song song

Consumer group là tập hợp consumer cùng hợp tác đọc một topic. Kafka gán mỗi partition cho đúng một consumer trong group. Không có hai consumer trong cùng group đọc cùng partition tại cùng thời điểm.

Topic "orders" với 3 partition
Consumer Group "fraud-detection" với 3 consumer:

  Partition 0 --> Consumer A
  Partition 1 --> Consumer B
  Partition 2 --> Consumer C

Mỗi consumer xử lý partition được gán độc lập. Để tăng gấp đôi throughput, thêm consumer vào group (tối đa bằng số partition). Để có hai hệ thống độc lập đọc cùng topic, dùng hai consumer group riêng biệt: mỗi group nhận một bản sao độc lập đầy đủ của mọi message.

Topic "orders" với 3 partition

Group "fraud-detection":         Group "analytics":
  Partition 0 --> Consumer A       Partition 0 --> Consumer X
  Partition 1 --> Consumer B       Partition 1 --> Consumer Y
  Partition 2 --> Consumer C       Partition 2 --> Consumer Z

Điều gì xảy ra nếu thêm nhiều consumer hơn partition: Consumer thừa ngồi idle. Topic với 3 partition có thể dùng tối đa 3 consumer trong cùng group cho xử lý song song.

Drag · Scroll to zoom

Rebalancing

Khi consumer gia nhập hoặc rời group, Kafka tái phân phối partition giữa các consumer còn lại. Đây gọi là rebalance. Trong quá trình rebalance, tất cả consumer trong group tạm dừng đọc cho đến khi phân công mới hoàn thành.

Điều gì xảy ra nếu không xử lý rebalance: Nếu consumer đang giữ một database transaction mở hoặc HTTP request đang chạy khi rebalance bắt đầu, Kafka thu hồi partition trong lúc đang xử lý. Consumer tiếp theo nhận partition từ offset đã commit cuối cùng và xử lý lại các message đó. Nếu logic xử lý của bạn không idempotent, bạn sẽ gặp duplicate insert, duplicate email, và duplicate charge.

consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Được gọi trước khi partition bị thu hồi
        // Hoàn thành công việc đang xử lý và commit offset hiện tại
        consumer.commitSync(currentOffsets);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Được gọi sau khi partition mới được gán
        // Load state per-partition cho các partition mới được gán
    }
});

Phần 6: Đảm bảo Delivery

Kafka cung cấp ba delivery semantics. Hiểu hệ thống của bạn đang dùng cái nào theo mặc định không phải là tùy chọn.

At-most-once

Message có thể mất nhưng không bao giờ được xử lý hai lần. Đạt được bằng cách commit offset trước khi xử lý.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
consumer.commitSync(); // commit trước
for (ConsumerRecord<String, String> record : records) {
    processOrder(record.value()); // nếu crash ở đây, message bị bỏ qua
}

Use case: metrics, log, và bất kỳ dữ liệu nào mà mất mát occasional là chấp nhận được và duplicate thì tốn kém.

At-least-once

Message không bao giờ mất nhưng có thể được xử lý nhiều hơn một lần. Đạt được bằng cách commit offset chỉ sau khi xử lý.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    processOrder(record.value()); // nếu crash ở đây, message được retry từ offset đã commit
}
consumer.commitSync(); // commit sau: không mất dữ liệu, nhưng có thể xử lý lại khi crash

Đây là mặc định Kafka cho hầu hết hệ thống production. Logic xử lý của bạn phải idempotent: gọi nó hai lần với cùng message phải cho kết quả giống như gọi một lần.

// Idempotent: an toàn khi gọi nhiều lần với cùng đơn hàng
public void processOrder(Order order) {
    // Dùng INSERT ... ON CONFLICT DO NOTHING hoặc INSERT ... ON CONFLICT DO UPDATE
    // để đảm bảo event duplicate không tạo ra bản ghi duplicate
    orderRepository.upsert(order);
}

Exactly-once

Message được xử lý đúng một lần. Kafka hỗ trợ điều này qua transaction, ghi vào Kafka và commit consumer offset cùng nhau một cách atomic. Phức tạp hơn để implement và có latency cao hơn.

// Phía producer: bật idempotence và transaction
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-1");

producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("processed-orders", key, value));
    // Commit consumer offset cùng lúc với produce một cách atomic
    producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

Dùng exactly-once khi bạn đang transform một Kafka topic thành topic khác và duplicate thực sự không thể chấp nhận được. Với hầu hết use case consumer (ghi vào database, gửi email), at-least-once với xử lý idempotent đơn giản hơn và đủ dùng.


Phần 7: Ví dụ Spring Boot hoàn chỉnh

// Cấu hình Producer
@Configuration
public class KafkaProducerConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }
}

// Publish order event
@Service
public class OrderService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    public Order placeOrder(OrderRequest request) {
        Order order = createOrder(request);
        orderRepository.save(order);

        try {
            String payload = objectMapper.writeValueAsString(order);
            // key = orderId: tất cả event của cùng order vào cùng partition
            kafkaTemplate.send("orders", order.getId(), payload);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize order event", e);
        }

        return order;
    }
}
// Cấu hình Consumer
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "fraud-detection");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // Manual offset commit: Spring chỉ commit sau khi listener method return thành công
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        return factory;
    }
}

// Xử lý order event
@Service
public class FraudDetectionConsumer {

    @KafkaListener(topics = "orders", groupId = "fraud-detection")
    public void onOrder(ConsumerRecord<String, String> record) {
        Order order = deserialize(record.value());

        FraudResult result = fraudEngine.evaluate(order);

        if (result.isSuspicious()) {
            alertRepository.save(new FraudAlert(order.getId(), result.getReason()));
        }
        // Spring commit offset sau khi method này return mà không throw exception
        // Nếu exception được throw, offset không được commit và message được retry
    }
}

application.yml cho consumer:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: fraud-detection
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: record

Phần 8: Retention và Replay

Kafka lưu trữ message trên disk trong một khoảng thời gian có thể cấu hình. Mặc định là 7 ngày. Trong khoảng thời gian đó, bất kỳ consumer nào cũng có thể replay từ bất kỳ offset nào.

# Tạo topic với retention 30 ngày
kafka-topics.sh --create \
  --topic orders \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=2592000000  # 30 ngày tính bằng milliseconds

# Xem cấu hình topic
kafka-topics.sh --describe --topic orders

Production example về replay: Analytics service crash và mất state. Nó reset committed offset về 7 ngày trước và replay tất cả. Order service (producer) không cần thay đổi gì và thậm chí không biết điều này đã xảy ra.

// Reset offset của consumer group về đầu (trong code, cho use case cụ thể)
consumer.subscribe(List.of("orders"));
consumer.poll(Duration.ofMillis(0)); // kích hoạt partition assignment
consumer.seekToBeginning(consumer.assignment());

Trade-off: Retention dài nghĩa là tốn nhiều disk hơn trên broker. Topic nhận 1 GB mỗi giờ với retention 30 ngày dùng 720 GB mỗi partition replica. Cần tính toán disk broker phù hợp hoặc dùng tiered storage (có trong Kafka 3.6+) để offload dữ liệu cũ hơn sang object storage như S3.


Phần 9: Những lỗi phổ biến của người mới

Lỗi 1: Không đặt auto.offset.reset rõ ràng

# Mặc định là "latest": consumer group mới bắt đầu đọc từ HIỆN TẠI
# Message đến trước khi consumer start bị bỏ qua

# Cho consumer mới cần đọc tất cả message lịch sử:
auto-offset-reset: earliest

# Cho consumer mới chỉ cần đọc message mới từ đây trở đi:
auto-offset-reset: latest

Nếu bạn deploy analytics consumer mới và thắc mắc tại sao không có dữ liệu từ tuần trước, auto.offset.reset=latest hầu như luôn là lý do.

Lỗi 2: Ít partition hơn consumer

Topic với 3 partition, consumer group với 5 consumer:
  Partition 0 --> Consumer A  (đang hoạt động)
  Partition 1 --> Consumer B  (đang hoạt động)
  Partition 2 --> Consumer C  (đang hoạt động)
  (không có partition) Consumer D  (idle)
  (không có partition) Consumer E  (idle)

Consumer D và E tốn bộ nhớ và CPU nhưng không xử lý gì. Bạn không thể tăng parallelism vượt quá số partition mà không repartition lại topic.

Lỗi 3: Dùng auto-commit với xử lý không idempotent

Auto-commit kích hoạt theo timer. Nếu việc xử lý của bạn mất lâu hơn auto.commit.interval.ms (mặc định 5 giây), Kafka commit offset cho message mà code của bạn vẫn đang xử lý. Crash sau khi commit nhưng trước khi xử lý hoàn thành gây mất message âm thầm.

Luôn dùng enable.auto.commit=false và commit rõ ràng.

Lỗi 4: Xử lý tốn thời gian bên trong poll loop

Kafka có session timeout (session.timeout.ms, mặc định 45 giây) và maximum poll interval (max.poll.interval.ms, mặc định 5 phút). Nếu consumer không gọi poll() trong khoảng thời gian đó, broker coi nó đã chết và kích hoạt rebalance.

// SAI: xử lý chậm bên trong poll loop
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        externalApiClient.process(record.value()); // 10 giây mỗi call
        // Nếu có 100 record, poll không được gọi trong 1000 giây -> rebalance
    }
}

// ĐÚNG: xử lý bất đồng bộ hoặc tăng max.poll.interval.ms
// và giảm max.poll.records để giữ batch size nhỏ
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000); // 1 phút

Lỗi 5: Không xử lý lỗi deserialization

Nếu một message không hợp lệ rơi vào topic và deserializer của bạn throw exception, hầu hết consumer loop sẽ retry message đó vô hạn, chặn tất cả message tiếp theo trong partition.

// Spring Kafka: cấu hình dead-letter topic cho message fail sau nhiều lần retry
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, String> template) {
    DeadLetterPublishingRecoverer recoverer =
        new DeadLetterPublishingRecoverer(template,
            (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));

    // Retry tối đa 3 lần với backoff 1 giây, sau đó gửi sang DLT
    return new DefaultErrorHandler(recoverer,
        new FixedBackOff(1000L, 3));
}

Tổng kết

Khái niệmChức năngCài đặt quan trọng
TopicLog event được đặt tênretention.ms
PartitionĐơn vị parallelism và orderingSố lượng đặt lúc tạo, khó thay đổi
ProducerGhi vào topicacks, linger.ms
ConsumerĐọc từ topicenable.auto.commit, auto.offset.reset
Consumer groupCác reader song song chia sẻ topicgroup.id
OffsetVị trí trong partitionCommit sau khi xử lý thành công
ReplicationĐộ bền khi broker bị lỗireplication.factor

Mental model: Kafka là một append-only log, không phải queue. Message không bị xóa khi đọc. Chúng hết hạn sau retention window. Nhiều consumer độc lập đọc cùng message theo tốc độ của riêng mình. Mỗi consumer group có offset riêng cho từng partition. Thêm consumer group không bao giờ ảnh hưởng đến group khác. Đây là hành vi cốt lõi làm cho Kafka hữu ích: producer và consumer hoàn toàn tách biệt về thời gian và deployment.