TIL

[Kafka] Kafka Consumer의 commit을 제어하는 enable.auto.commit과 AckMode

우리가 Kafka Consumer의 commit을 제어한다고 했을 때 보통 2가지 설정값을 떠올린다. enable.auto.commit과 AckMode다. 그렇다면 enable.auto.commit도 Consumer의 commit을 제어하는 설정값이고, AckMode도 Consumer의 commit을 제어하는데 이 둘이 따로 관리되는 걸까? 

 

일단 consumer가 offset을 commit 하는 것은 Kafka의 내부 Topic인 __consumer_offsets에 저장하는 행위이며, consumer는 이 offset 값을 참조해서 다음에 처리한 메시지를 읽어온다는 것이 기본 개념이다. Commit에는 자동커밋과 수동커밋이 있는데 enable.auto.commit으로 Kafka 클라이언트의 커밋방식 제어를 해서 자동커밋과 수동커밋을 설정할 수 있다.

 

Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by the config 
auto.commit.interval.ms.

출처) https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

 

enable.auto.commit은 Kafka 클라이언트의 자동 커밋(true, default), 수동 커밋(false)을 세팅한다. auto.commit.interval.ms와 함께 쓰이는데 여기서 설정한 시간 간격마다 commit이 된다.

private Map<String, Object> consumerConfigs() {
    Map<String, Object> configs = new HashMap<>();
    // ...
    // enable.auto.commit 설정
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    return configs;
}

 

enable.auto.commit=true로 했을 때 많이 얘기하는게 중복/유실 문제인데, Consumer에서 메시지를 처리했던 못했던 상관없이 일정 시간이 지나면 알아서 offset을 갱신(commit)하기 때문이다.

  • 중복 - poll해서 가져온 메시지들을 아직 다 처리를 못했고 commit 타이밍도 안됐는데 오류가 나면 Consumer는 한번 poll해왔던 메시지를 다시 가져올 것이다.
  • 유실 - poll해서 가져온 메시지들을 아직 다 처리를 못한 상황에서 commit 타이밍이 돼서 commit을 했는데 메시지 처리중 오류가 나버리면 처리못한 메시지를 다시 읽어오지 않기 때문에 유실된다.

이런 문제를 방지하기 위해 enable.auto.commit=false로 설정해주면 자동으로 커밋되지 않도록 할 수 있다.

 

그런데 enable.auto.commit=false로 해줘도 offset commit 이 일어나는 걸 알 수 있다. Commit도 정상적으로 이뤄지고 lag이 쌓이지 않는다. 이는 Kafka 클라이언트의 자동 커밋은 비활성화 되지만, Spring Kafka에 의해 수동으로 offset commit을 하기 때문이다. 이것이 AckMode 설정이고 기본값은 BATCH로 들어간다. 그래서 자동 커밋처럼 auto.commit.interval.ms의 영향을 받지는 않지만 listener 메서드가 완료되면 배치단위(default가 BATCH)로 commit이 일어난다. 즉, enable.auto.commit=false로 해줬을 때는 AckMode 설정을 함께 사용해서 Spring Kafka의 commit까지 제어해서 진정한 수동 커밋을 해주게 된다.

 

AckMode의 종류는 다음과 같다.

AcksMode 설명
RECORD 레코드 단위로 프로세싱 이후 커밋한다.
BATCH (default) poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋한다.
TIME 특정 시간 이후에 커밋한다.
이 옵션을 사용할 경우에는 시간 간격을 선언하는 `AckTime`옵션을 설정해야 한다.
COUNT 특정 개수만큼 레코드가 처리된 이후에 커밋한다.
이 옵션을 사용할 경우에는 레코드 개수를 선언하는 `AckCount`옵션을 설정해야 한다.
COUNT_TIME TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋한다.
MANUAL Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll()때 커밋한다.
매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다.
이 옵션을 사용할 경우 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.
MANUAL_IMMEDIATE Acknowledgement.acknowledge() 메서드를 호출한 즉시 커밋한다.
이 옵션을 사용할 경우 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.

 

보통 설정은 ContainerFactory를 통해 해준다.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> testFactory = new ConcurrentKafkaListenerContainerFactory<>();
    testFactory.getContainerProperties().setAckMode(AckMode.MANUAL);
    testFactory.setConsumerFactory(testConsumerFactory());
    return testFactory;
}

 

AckMode는 acknowledgement.acknowledge() 메서드 호출을 통해 개발자가 코드상에서 수동커밋을 할 수 있다.

@KafkaListener(topics = "topic_name", ackMode = "manual")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    // 메시지 처리
    acknowledgment.acknowledge(); // 메시지를 수동으로 ack
}

 

이렇게 @KafkaListener를 사용해서 Acknowledgment를 인자로 받아와서 수동 commit을 한다. 그럼 메시지 처리가 성공했을 경우만 acknowledge()를 호출하고, 실패했을 경우는 재처리하는 식으로 활용할 수 있을 것이다.

 

다만, 수동 커밋으로 설정하고 acknowledge()를 호출하지 않으면 offset이 커밋되지 않아 메시지가 처리되지 않은 상태가 되고, 오류가 났을 경우 중복/유실의 문제가 enable.auto.commit=true일 경우와 같이 생길 수 있어 명시적으로 acknowledge()를 호출해주는게 중요하다. (물론 acknowledge()를 호출하지 않았다고 consumer가 다음 메시지를 못읽는 건 아니다. 다만 재처리를 해야하는 경우 문제가 발생할 수 있다.)

 

정리하자면, enable.auto.commit은 Kafka 클라이언트의 커밋 방식에 대한 설정이고, AckMode는 Spring Kafka 프레임워크 내에서 메시지 처리 후 커밋을 관리하는 설정이다. 둘은 상호 보완적인 역할을 하며, 각각의 설정이 Kafka 클라이언트와 Spring Kafka의 서로 다른 레벨에서 작동하기 때문에 사용법을 명확히 알고 쓰는게 중요하다.

 

 

 

728x90
반응형