Consumer
카프카에서 데이터를 읽는 애플리케이션을 토픽을 구독(subscribe)하고 구독한 토픽들로부터 메시지를 받기 위해 KafkaConsumer를 사용한다. 카프카에서 데이터를 읽는 것은 다른 메시지 브로커들과 조금 달라 이러한 개념들을 먼저 이해하지 않다면 컨슈머 API 사용법을 이해하는 것은 어려울 수 있다. 따라서 이러한 개념을 먼저 살펴보자.
KafkaConsumer 생성
public class SimpleConsumer {
public static void main(String[] args) {
String topicName = "simple-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props)) {
kafkaConsumer.subscribe(List.of(topicName)); // 여러 개의 토픽 구독 가능
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("record key: " + record.key() + ", record value: " + record.value() + ", partition: " + record.partition());
}
}
}
}
}
위 코드는 KafkaConsumer를 생성하고 토픽을 구독하여 메시지를 poll() 메서드를 통해 주기적으로 메시지를 가져오는 방법을 보여준다. KafkaProducer를 설정했을 때와 비슷하지만 새로 추가된 속성으로는 컨슈머가 속하는 컨슈머 그룹의 이름을 지정하는 group.id가 있다.
토픽 구독하기
kafkaConsumer.subscribe(List.of(topicName)); // 여러 개의 토픽 구독 가능
kafkaConsumer.subscribe(Pattern.compile("test.*"); // 정규식으로 여러 개의 토픽 구독 가능
컨슈머를 생성하고 나서 1개 이상의 토픽을 구독하는 것이 가능한데 토픽 목록을 List로 받는 것도 가능하지만, 위처럼 정규식을 매개변수로 사용해서 subscribe를 호출하는 것도 가능하다. 정규식은 다수의 토픽 이름에 매치될 수도 있으며, 만약 누군가가 정규식과 매치되는 이름을 가진 새로운 토픽을 생성할 경우, 거의 즉시 리밸런스가 발생하면서 컨슈머들은 새로운 토픽으로부터의 읽기 작업을 시작하게 된다. 이는 다수의 토픽에서 레코드를 읽어와서 토픽이 포함하는 서로 다른 유형의 데이터를 처리해야 하는 애플리케이션의 경우 편리하다.
Polling
위 코드에서 poll() 메서드에 전달한 매개변수는 컨슈머 버퍼에 데이터가 없을 경우 poll()이 블록될 수 있는 최대 시간을 결정한다. 만약 이 값이 0으로 지정되거나 버퍼 안에 이미 레코드가 준비되어 있을 경우 poll() 메서드는 즉시 반환한다. 그게 아닐 경우 지정된 Timer만큼 기다린다.
poll() 메서드는 위 설명처럼 프로듀서가 배치 단위로 메시지를 전송하는 것처럼 컨슈머도 메시지를 가져올 때 배치 단위로 가져오기 때문에 레코드들이 저장된 List 객체를 반환한다. 각각의 레코드는 레코드가 저장되어 있는 토픽, 파티션, 파티션에서의 오프셋과 키값, 밸류값을 포함한다.
poll 메서드 살펴보기
public class KafkaConsumer<K, V> implements Consumer<K, V> {
// 데이터를 폴링하는 메서드
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
do {
// 파티션의 메시지를 가져옴
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
// 로컬 캐시에 데이터가 있으면 ConsumerRecords를 반환
if (!records.isEmpty()) {
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired()); // Timer가 만료되지 않았다면 반복해서 로컬 캐시 확인 및 데이터 요청
}
// 실제 데이터를 가져오는 메서드
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
// 로컬 캐시에 이미 가져온 데이터 확인
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records; // 로컬 캐시에 데이터가 있으면 즉시 반환
}
// 로컬 캐시에 데이터가 없으면 브로커에 새로운 데이터 요청
// 이미 진행 중인 파티션에 대한 요청은 제외하고 브로커에 요청
fetcher.sendFetches();
// 로컬 캐시를 다시 확인하고, 데이터가 있으면 반환
return fetcher.fetchedRecords();
}
}
KafkaConsumer 객체로부터 poll() 메서드를 실행하면 pollForFetches() 메서드를 통해 실제 데이터를 가져온다. pollForFeches() 메서드가 실행되면 먼저, Fetcher 객체를 통해 버퍼에 이미 가져온 레코드가 있는지 확인한다. 버퍼가 비어있지 않다면(이미 메시지가 있다면), 즉시 데이터를 반환한다.
만약 캐시에 데이터가 없다면 fetcher.sendFetches() 메서드를 통해 이미 요청이 진행 중인 파티션은 제외하고, 요청하지 않은 파티션에 대해서만 브로커에 데이터를 요청한다. sendFetches() 메서드는 ConsumerNetworkClient 객체를 통해 비동기로 데이터를 요청하기 때문에 요청한 데이터가 아직 도착하지 않았을 경우 다시 fetcher.fetchedRecords() 메서드를 확인하여도 빈 결과를 반환할 수 있다. 따라서 KafkaConsumers 객체를 반환받지 못하고 Timer가 만료되기까지(poll() 메서드 호출 시 매개변수로 넘긴 시간) 루프를 돌아 버퍼를 재확인하고 만료되기까지 대기한 후, ConsumerRecords를 반환하고 있다.
poll() 메서드는 데이터를 가져오는 것보다 훨씬 더 많은 일을 한다. 새 컨슈머에서 poll()을 호출하면 컨슈머는 GroupCoordinator를 찾아서 컨슈머 그룹에 참가하고, 파티션을 할당받는다. 리밸런스 역시 연관된 콜백들과 함께 여기서 처리된다. 즉, 컨슈머 혹은 콜백에서 뭔가 잘못될 수 있는 거의 모든 것들은 poll()에서 예외의 형태로 발생되는 것이다.
poll()이 max.poll.interval.ms에 지정된 시간 이상으로 호출되지 않을 경우, 컨슈머는 죽은 것으로 판정되어 컨슈머 그룹에 퇴출되기 때문에 예측 불가능한 시간 동안 블록된느 작업을 수행하는 것은 피해야 한다.
Heartbeat Thread
KafkaConsumer 객체를 생성하면 위 사진처럼 heartbeat thread 이름의 별도 스레드가 만들어지는 것을 확인할 수 있다. 해당 스레드는 컨슈머의 정상적인 활동을 그룹 코디네이터(Group Coordinator)에게 보고하는 역할을 수행하는데 그룹 코디네이터가 제한 시간 내에 heartbeat thread로부터 응답을 받지 못하면 컨슈머 그룹 내에서 Rebalancing을 수행한다.
오프셋과 커밋
카프카에서는 파티션에서의 현재 위치를 업데이트하는 작업을 오프셋 커밋(offset commit)이라고 한다. 카프카에 특수 토픽인 __consumer_offset 토픽에 각 파티션별로 커밋된 오프셋을 업데이트하도록 하는 메시지를 보냄으로써 이루어진다. 모든 컨슈머들이 정상적으로 실행중일 때는 아무런 영향을 주지 않지만 컨슈머가 그룹에 추가/제외될 경우 리밸런스되면 각각의 컨슈머는 리밸랜스 이전에 처리하고 있던 것과는 다른 파티션들을 할당받을 수 있는데 이 때 컨슈머는 각 파티션의 마지막으로 커밋된 메시지를 읽어온 뒤 거기서부터 처리를 재개한다.
__consumer_offset
컨슈머는 subscribe()를 호출하여 읽어들이려는 토픽을 등록한다. 메시지를 성공적으로 가져왔으면 commit을 통해서 __consumer_offset이라는 내부 토픽에서 컨슈머가 다음에 읽을 offset 위치를 기록한다.
위 그림처럼 Consumer Group A에서 MyTopic 토픽의 파티션 0에서 offset 2까지 읽은 경우에 __consumer_offsets라는 토픽에 다음에 읽을 offset 위치를 저장한 것을 확인할 수 있다. {ConsumerGroup명}:{Topic명}:{Partition}:{다음에 읽을 offset 위치}로 저장한다. 만약 컨슈머에 장애가 발생하거나 컨슈머 그룹에 컨슈머가 추가되어 리밸런싱이 이뤄지는 경우 새로운 파티션을 할당받은 컨슈머는 해당 정보에서 offset 위치부터 데이터를 읽어 가져온다.
자동 커밋
오프셋을 커밋하는 가장 쉬운 방법은 컨슈머가 대신하도록 하는 것이다. enable.auto.commit 설정을 true로 잡아주면 컨슈머는 auto.commit.interval.ms(default: 5000) 시간에 한 번 poll()을 통해 받은 메시지 중 마지막 메시지의 오프셋을 커밋한다. 하지만 auto.commit.interval.ms으로 커밋 전에 리밸런스가 일어나면 오프셋은 업데이트가 되지 않았기 때문에 마지막으로 처리된 오프셋과 커밋된 오프셋 사이의 메시지들은 중복으로 두 번 처리할 수도 있다.
현재 오프셋 커밋하기
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
// ...
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e);
}
}
enable.auto.commit = false로 설정해줌으로써 애플리케이션이 명시적으로 커밋하려 할 때만 오프셋이 커밋되게 할 수 있다. 가장 간단하고 신뢰성 있는 커밋 API는 commitSync()이다. 이 API는 poll()이 리턴한 마지막 오프셋을 커밋한 뒤 커밋이 성공적으로 완료되면 반환, 어떠한 이유로 실패하면 예외를 발생시킨다.
commitSync()도 애플리케이션이 아직 레코드들을 처리하는 와중에 장애가 발생할 경우, 마지막 메시지 배치의 맨 앞 레코드에서부터 리밸런스 시작 시점까지의 모든 레코드들은 두 번 처리될 것이다.
비동기적 커밋
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
// ...
}
consumer.commitASync();
}
수동 커밋의 단점 중 하나는 브로커가 커밋 요청에 응답할 때까지 애플리케이션이 블록된다는 점이다. 이것은 애플리케이션의 처리량을 제한하게 된다. 이 방식의 단점은 commitSync()가 성공하거나 재시도 불가능한 실패가 발생할 때까지 재시도하는 반면, commitAsync()는 재시도를 하지 않는다는 점이다. commitAsync()가 재시도가 가능하다면 다음 배치 3000을 커밋하고 난 후 실패한 2000 오프셋이 재시도로 커밋되어 커밋 순서 관련된 문제가 발생할 수도 있다.
컨슈머와 컨슈머 그룹
만약 데이터를 읽고 처리하는 컨슈머가 하나뿐이라면 애플리케이션은 새로 추가되는 메시지의 속도를 따라잡을 수 없기 때문에 메시지 처리가 계속해서 뒤로 밀리게 될 것이다. 따라서, 우리는 토픽으로부터 데이터를 읽어 오는 작업을 확장(scale)할 수 있어야 한다. 여러 개의 프로듀서가 동일한 토픽에 메시지를 쓰듯이, 여러 개의 컨슈머가 컨슈머 그룹(Consumer Group)을 통해 같은 토픽으로부터 서로 다른 파티션의 메시지를 읽어올 수 있도록 한다.
위 그림처럼 네 개의 파티션을 갖는 토픽이 있다고 가정하면 컨슈머가 추가/제거되는 상황에 따라 각각의 컨슈머가 하나 이상의 파티션에서 메시지를 읽어올 수 있게 된다. 만약 하나의 토픽을 구독하는 하나의 컨슈머 그룹에 파티션 수보다 더 많은 컨슈머를 추가한다면, 컨슈머 중 몇몇은 유휴 상태가 되어 메시지를 전혀 받지 못한다. 즉, 토픽에 설정된 파티션 수 이상으로 컨슈머를 투입하는 것은 아무 의미없다는 것을 명심해야 한다.
이처럼 커슈머 그룹에 컨슈머를 추가하는 것은 카프카 토픽에서 읽어오는 데이터 양을 확장하는 주된 방법이다. 카프카 컨슈머가 지연 시간이 긴 작업(DB에 쓰기 작업, 데이터에 대해 시간이 오래 걸리는 연산 작업)을 수행하는 경우 하나의 컨슈머로 토픽에 들어오는 데이터의 속도를 감당할 수 없을 수도 있기 때문에 컨슈머를 추가함으로써 단위 컨슈머가 처리하는 파티션과 메시지의 수를 분산시키는 것이 일반적인 규모 확장 방식이다. 이것은 토픽을 생성할 때 파티션 수를 크게 잡아주는 게 좋은 이유이기도 한데, 부하가 증가함에 따라 더 많은 컨슈머를 추가할 수 있게 해주기 때문이다.
파티션 할당
컨슈머가 그룹에 참여하고 싶을 때는 그룹 코디네이터에게 JoinGroup 요청을 보낸다. 가장 먼저 그룹에 참여한 컨슈머가 그룹 리더가 된다. 리더는 그룹 코디네이터로부터 해당 그룹 안에 있는 모든 컨슈머의 목록을 받아서 각 컨슈머에게 파티션의 일부를 할당해 준다. 어느 파티션이 어느 컨슈머에게 할당되어야 하는지를 결정하기 위해서는 PartitionAssignor 인터페이스의 구현체가 사용된다.
partition.assignment.strategy
카프카에는 몇 개의 파티션 할당 정책이 기본적으로 내장되어 있다. 파티션 할당이 결정되면 컨슈머 그룹 리더는 할당 내역을 그룹 코디네이터에게 전달하고 그룹 코디네이터는 다시 이 정보를 모든 컨슈머에게 전파한다. 각 컨슈머 입장에서는 자기에게 할당된 내역만 보인다. 즉, 리더만 클라이언트 프로세스 중에서 유일하게 그룹 내 컨슈머와 할당 내역 전부를 볼 수 있는 것이다. 파티션 할당 정책에는 다음과 같은 정책이 있다.
- Range
- 토픽별 동일한 파티션을 특정 컨슈머에게 할당하는 전략이다.
- RoundRobin
- 파티션별로 컨슈머들이 균등하게 분배될 수 있도록 모든 구독된 토픽의 모든 파티션을 가져다 순차적으로 하나씩 컨슈머에 할당해 준다.
- Sticky
- 파티션들을 가능한 한 균등하게 할당하고, 리밸런스가 발생했을 때 가능하면 많은 파티션들이 같은 컨슈머에 할당되게 함으로써 할당된 파티션을 하나의 컨슈머에서 다른 컨슈머로 옮길 때 발생하는 오버헤드를 최소화하는 전략이다.
- Cooperative Sticky
- Sticky 전략과 기본적으로 동일하지만, 컨슈머가 재할당되지 않은 파티션으로부터 레코드를 계속해서 읽어올 수 있도록 해주는 협력적 리밸런스 기능을 지원한다.
컨슈머 그룹과 파티션 리밸런싱
컨슈머는 컨슈머 그룹이 있듯이 브로커 내에 그룹의 상태를 관리하는 컨슈머 코디네이터(Consumer Coodinator)가 있다. 그룹 코디네이터는 컨슈머들의 Join Group 정보, 파티션 매핑 정보 등을 관리하고 컨슈머들의 하트비트를 관리한다. 컨슈머 그룹 내에 새로운 컨슈머가 추가/종료되거나 토픽에 새 파티션을 추가하는 경우 그룹 코디네이터는 컨슈머들에게 리밸런싱(Rebalancing)을 수행하도록 지시하여 메시지를 읽는 작업을 중지하고 리밸런싱에 집중한다. 리더 컨슈머는 파티션 할당 전략에 따라 컨슈머들에게 파티션을 할당한다.
파티션 할당 전략에는 2가지가 있다.
조급한 리밸런스(Eager Rebalance)
조급한 리밸런스는 위 그림처럼 모든 컨슈머가 읽기 작업을 멈추고 자신에게 할당된 모든 파티션에 대한 소유권을 포기한 뒤, 컨슈머 그룹에 다시 참여(rejoin)하여 완전히 새로운 파티션 할당을 전달받는 방식을 말한다. 이러한 방식은 전체 컨슈머 그룹에 대해 짧은 시간 동안 작업을 멈추게 된다. 파티션 할당 전략(partition.assignment.strategy) 중 Range, Round Robin, Sticky 방식이 여기에 해당한다.
협렵적 리밸런스(Cooperative Rebalance)
협력적 리밸런스의 경우, 한 컨슈머에게 할당되어 있던 파티션만을 다른 컨슈머에 재할당한다. 재할당되지 않은 파티션에서 레코드를 읽어서 처리하던 컨슈머들은 작업에 방해받지 않고 하던 일을 계속 활 수 있는 것이다. 이 경우 리밸런싱은 위 그림처럼 2개 이상의 단계에 걸쳐서 수행된다. 즉, 우선 컨슈머 그룹 리더가 다른 컨슈머들에게 각자에게 할당된 파티션 중 일부가 재할당될 것이라고 통보하면, 컨슈머들은 해당 파티션에서 데이터를 읽어오는 작업을 멈추고 해당 파티션에 대한 소유권을 포기한다. 두 번째 단계에서는 컨슈머 그룹 리더가 포기된 파티션들을 새로 할당한다. 이 점진적인 방식은 안정적으로 파티션이 할당될 때까지 몇 번 반복될 수 있지만 위에서 살펴본 조급한 리밸런스와 같이 전체 작업이 중단되는 사태는 발생하지 않는다.
버전 3.1 이후의 컨슈머 리밸런스는 협력적 리밸런스를 기본값으로 설정하고, 조급한 리밸런스는 추후 삭제될 예정이다. 파티션 할당 전략(partition.assignment.strategy) 중 Cooperative Sticky가 여기에 해당한다.
컨슈머 설정
fetch.min.bytes(default: 1)
- 이 속성은 컨슈머가 브로커로부터 레코드를 얻어올 때 받는 데이터의 최소량(바이트)를 지정알 수 있게 해준다.
- 만약 브로커가 컨슈모루벝 레코드 요청을 받았는데 새로 보낼 레코드의 양이 fetch.min.bytes보다 작을 경우, 브로커는 충분한 메시지를 보낼 수 있을 때까지 fetch.max.wait.ms만큼 기다린 뒤 컨슈머에게 레코드를 보내준다.
fetch.max.bytes(default: 52428800, 50MB)
- 컨슈머가 브로커를 폴링할 때 카프카가 리턴하는 최대 바이트 수를 지정한다.
- 컨슈머가 브로커로부터 받은 데이터를 저장하기 위해 사용하는 메모리의 양을 제한하기 위해 사용된다.
max.partition.fetch.bytes(default: 1048576, 1MB)
- 서버가 파티션별로 반환하는 최대 바이트 수를 결정한다.
- 만약 파티션별 한 번에 1MB씩 가져온다고 했을때 100개의 파티션에 대해 메시지를 가져온다고 해서 100MB를 한 번에 가져올 수 있는 것이 아닌 그때는 위 fetch.max.bytes의 크기만큼만 최대로 데이터를 가져올 수 있다.
max.poll.records(default: 500)
- poll()을 호출할 때마다 리턴되는 최대 레코드 수를 지정한다. 애플리케이션이 폴링 루프를 반복할 떄마다 처리해야 하는 레코드의 개수를 제어하려면 이 설정을 사용하면 된다.
fetch.max.wait.ms(default: 500, 0.5초)
- fetch.min.bytes를 설정함으로써 카프카가 컨슈머에게 응답하기 전 충분한 데이터가 모일 떄까지 해당 설정 시간만큼 대기한다.
- poll() 메서드에서 Timer를 매개변수로 넘기는 것은 큐에 레코드가 없으면 지정 시간동안 대기하는 것이고, fetch.max.wait.ms는 브로커가 fetch.min.bytes 이상의 메시지가 쌓일 때까지의 fetch.max.wait.ms만큼 기다린 후 데이터를 반환하는 시간이다.
session.timeout.ms(default: 45000, 45초)와 heartbeat.interval.ms(default: 3000, 3초)
- session.timeout.ms는 컨슈머가 브로커와 신호를 주고받지 않고도 살아 있는 것으로 판정되는 최대 시간이다. 만약 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않은 채로 session.timeout.ms가 지나가면 그룹 코디네이터는 해당 컨슈머를 죽은 것으로 간주하고 리밸런싱을 실행시킨다.
- heartbeat.interval.ms는 그룹 코디네이터에게 하트비트를 보내는 시간 주기이다. heartbeat.interval.ms는 session.timeout.ms보다 더 낮은 값이어야 하며 대체로 1/3으로 결정하는 것이 보통이다.
max.poll.interval.ms(default: 300000, 5분)
- 컨슈머가 폴링을 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간을 지정할 수 있게 해 준다.
- 하트비트는 위에서 살펴본 백그라운드 스레드에 의해 전송된다. 카프카에서 레코드를 읽어오는 메인 스레드는 데드락이 걸렸는데 백그라운드 스레드는 멀쩡히 하트비트를 전송하고 있을 수도 있다. 타임아웃이 발생하면 백그라운드 스레드는 브로커로 하여금 컨슈머가 죽어서 리밸런스가 수행되어야 한다는 걸 알 수 있도록 'leave group' 요청을 보낸 뒤, 하트비트 전송을 중단한다.
request.timeout.ms(default: 30000, 30초)
- 컨슈머가 브로커로부터의 응답을 기다릴 수 있는 최대 시간이다.
- 만약 브로커가 이 설정에 지정된 시간 사이에 응답하지 않을 경우, 클라이언트는 브로커가 완전히 응답하지 않을 것이라고 간주하고 연경릉 닫은 뒤 재연결을 시도한다.
auto.offset.reset(default: lastest)
- 파티션을 읽기 시작할 때의 작동을 정의한다.
- lastest는 유효한 오프셋이 없을 경우 컨슈머는 가장 최신 레코드부터 읽기 시작한다.
- earliest는 유효한 오프셋이 없을 경우 파티션의 맨 처음부터 모든 데이터를 읽는 방식인데 Consumer Group으로 컨슈머가 새롭게 접속할 시 __consumer_offsets에 있는 offset 정보를 기반으로 메시지를 가져오기 때문에 무조건 0번 오프셋부터 읽는 것은 아니다.
출처
KafkaConsumer Client Internals
'기타 > kafka' 카테고리의 다른 글
Kafka Producer (1) | 2024.11.24 |
---|