본문 바로가기
기타/MSA

Kafka 핵심 개념

by oneny 2024. 11. 13.

 

Kafka

모든 애플리케이션은 의미가 있는 데이터를 만들고, 그 다음 처리되어야 하는 작업과 같이 뭔가 중요한 정보를 담고 있다. 따라서 해당 데이터에 알기 위해서는 데이터를 생성된 곳에서 분석할 수 있는 곳으로 옮겨야 하는 만큼 파이프라인(pipeline)이 중요한 핵심적인 요소가 된다고 할 수 있다. 실제 링크드인에서 하루 4.5조 개 이상의 이벤트 스트림을 처리하고 있기 때문에 기존의 Messaging Platform(ex: MQ)로는 처리가 불가능하여 데이터(이벤트) 스트림 처리를 위해 카프카(Kafka)가 개발되었다.

 

Kafka vs RabbitMQ

출처: https://www.confluent.io/blog/kafka-fastest-messaging-system/?utm_medium=sem&utm_source=google&utm_campaign=ch.sem_br.nonbrand_tp.prs_tgt.kafka_mt.mbm_rgn.apac_lng.eng_dv.all_con.kafka-rabbitMQ&utm_term=%2Bkafka%20%2Brabbitmq&creative=&device=c&placement=&gclid=CjwKCAjw3_KIBhA2EiwAaAAliiwLNAaE01pbjuDTGvedQ9G9qPJO-fyKJhHvGIxDq0s9RZ3oU2P9IRoCvT0QAvD_BwE

Kafka와 RabbitMQ 둘 다 메시지 전송 플랫폼을 제공하지만 위 그래프에서도 볼 수 있듯이 카프카가 초당 600MB로 처리량이 훨씬 높은 것을 확인할 수 있다. Kafka는 순차 디스크 I/O를 사용하여 처리량이 높은 메시지를 교환할 수 있으므로 초당 수백만 개의 메시지를 전송하지만, RabbitMQ는 초당 수 천 개의 메시지를 전송할 수 있어 초당 수백만 개의 메시지를 보내려면 브로커가 여러 개 필요하다.

 

Producer

카프카 클라이언트 중 하나로 메시지를 생산(Produce)해서 Kafka의 Topic으로 메시지를 보내는 애플리케이션이다. 기본적으로 프로듀서는 메시지를 쓸 때 토픽에 속한 파티션들 사이에 고르게 나눠서 쓰도록 되어 있다. 하지만 메시지 키(key)와 키 값의 해시를 특정 파티션으로 대응시켜 주는 파티셔너(Partitioner)를 사용해서 특정한 파티션으로 메시지를 발행하도록 할 수도 있다. 자세한 내용은 아래 내용을 통해 살펴볼 수 있다.

 

Record(Message) 구조

클라이언트에서 브로커로 메시지를 보내는 구조는 위 그림처럼 메타데이터 영역인 헤더와 바디 영역인 Key, Value로 구성되어 있다고 할 수 있다. Producer에서 메시지를 보내기 위해서는 반드시 Topic과 Value 정보가 있어야 하며 Serialize, Partitioner, Compress(optional)을 지정할 수 있다. Serializer는 각각 Key와 Value 용으로 설정하여 JSON, String, 아브로 등 다양한 형태로 메시지를 보낼 수 있다. 이렇게 보내진 메시지는 Kafka에서 Byte Array로 저장이 된다.

 

메시지 Producing 과정

메시지를 send() 호출하여 보내게 되면 위에서 설명했듯이 Serializer를 설정하여 직렬화를 하고, Partitioner를 통해 메시지를 어떤 파티션으로 갈지 결정한다. 이러한 과정을 거친 후 RecordAccumulator에서 Partitioner에서 정한 파티션에 보관했다 카프카 토픽으로 메시지를 전송하게 된다.

RecordAccumulator에서 단 건으로 메시지를 보낼 수 있지만, 카프카 효율성을 위해 메시지를 배치(batch) 단위로 저장한다. 배치는 그저 같은 토픽의 파티션에 쓰여지는 메시지들의 집합일 뿐이다. 메시지를 쓸 때마다 네트워크 상에서 통신하는 것은 막대한 오버헤드가 발생할 수 있기 때문에 메시지를 배치 단위로 모아서 쓰면 이것을 줄일 수 있다. 물론 이것은 지연(latency)과 처리량(throughout) 사이에 트레이드오프를 발생시킨다. Kafka 2.4 이후의 DefaultPartitioner는 Sticky 정책으로 동작하여 하나의 Batch가 닫힐 때까지 하나의 Partition에게 Record를 보내고 랜덤으로 Partition으로 선택하는 방식으로 되어있다.

카프카가 메시지를 ack를 응답받는데 성공인 경우에는 성공에 대한 메타데이터를 리턴한다. 만약 실패하게 되면 재시도 옵션을 통해 재시도하다 Retry할 수 없으면 예외가 발생하여 예외에 대한 처리를 할 수 있다.

 

Producer Acks

acks 설정은 요청이 성공할 때를 정의하는데 사용되는 Producer에 설정하는 Parameter로 다음과 같은 수준이 있다.

  • acks=0
    • ack가 필요하지 않아 메시지 손실이 다소 있더라도 빠르게 메시지를 보내야 하는 경우에 사용한다. 이 수준은 자주 사용되지 않는다.
  • acks=1
    • Leader가 메시지를 수신하면 ack를 보낸다. Leader가 Producer에게 ack를 보낸 후, Follwer가 복제하기 전에 Leader에 장애가 발생하면 메시지가 손실된다.
    • At most once(최대 한 번) 전송을 보장한다.
  • acks=-1, acks=all
    • 메시지가 Leader가 모든 Replica까지 Commit되면 ack를 보낸다. Leader가 잃어도 데이터가 살아남을 수 있도록 보장한다. 그러나 대기 시간이 더 길고 특정 실패 사례에서 반복되는 데이터 발생 가능성이 있다.
    • At least once(최소 한 번) 전송을 보장한다.
    • 데이터 유실이 없게 하기 위해서 다음과 같은 설정을 권장한다.
      • Topic: replication.factor는 최소 3이상, min.insync.replicas(최소 요구되는 ISR의 개수에 대한 옵션, default는 1)는 최소 2이상
      • Producer: acks는 all

 

acks=all의 중요성

위 그림처럼 Partition이 3개의 Replica로 구성되어 있고, Producer가 4개의 메시지(M0, M1, M2, M3)를 보냈다고 가정해보자. 메시지 M1, M2가 ISR 리스트 전체에 복제되었고 Commit(주황 화살표)되었고, Broker Y는 M3를 복제했지만 Commit을 못한 상태, Broker Z는 M2까지만 복제한 상태이다.

이 상황에서 Broker X가 장애가 나게 되면, Controller가 Y를 Leader로 선출하고, Leader Epoch가 0에서 1로 증가한다. Broker Z는 M2를 fetch하고, Broker Y는 High Water Mark를 진행, Broker Z는 fetch를 다시 수행하여 High Water Mark를 수신하는 방식으로 장애를 방지하고 있다. Broker Y에서 M2는 아직 Commit 전이라고 하는데 왜 M2를 가지고 있을 수 있을까? 한 번 들어온 데이터를 지우지 않기 때문에 Broker Y는 Leader가 되는 순간 전체 데이터를 가지고 High Water Mark(초록 화살표)를 진행하게 된다.

Producer는 M2, M3 메시지에 대한 ack를 받지 못했으므로 Broker Y로 메시지를 보내게 되면 idempotence=false이기 때문에 M2는 중복이 발생한다. 그리고 Broker Z는 M3, M4를 fetch하고, 다시 fetch를 수행해서 High Water Mark(빨강 화살표)를 수신한다.

acks=all로 설정하는 경우 위와 같은 방식으로 동작하기 때문에 At least once(최소 한 번) 메시지 전송을 보장한다고 할 수 있다.

 

acks=1에서 발생할 수 있는 문제 상황

그러면 acks=1로 설정하는 경우에는 어떤 문제가 발생할 수 있을까? 위와 비슷하게 동작하지만 Leader인 Broker X가 장애나기 전에 Producer는 M3에 대한 ack를 수신하게 되면 Producer가 M3 송신을 재시도하지 않게 되므로, M3는 영원히 손실하게 되는 결과를 초래할 수 있다. 이러한 방식으로 동작하기 때문에 acks=1로 설정하는 경우 At most once(최대 한 번) 전송을 보장한다고 할 수 있다.

 

Replica Recovery

Broker X가 복구되면 어떻게 될까? Broker X가 복구되면 ZooKeeper에 연결되고 Broker는 Controller로부터 metadata를 받는다. 그리고 Leader인 Broker Y로부터 Leader Epoch를 fetch해서 Leadership이 변경된 시점부터 메시지를 삭제(truncate)하여 Leader Y를 복제한 후 ISR 리스트로 복귀한다.

 

Producer Retry

Kafka는 네트워크 또는 시스템의 일시적인 오류를 보완하기 위해 재시도(retry)를 위한 다음과 같은 parameter를 제공한다.

  • retires(default: MAX_INT)
    • 메시지를 send하기 위해 재시도하는 횟수
  • retry.backoff.ms(default: 100)
    • 재시도 사이에 추가되는 대기 시간
  • request.timeout.ms(default: 30,000, 30초)
    • Producer가 응답을 기다리는 최대 시간
  • delivery.timeout.ms(default: 120,000, 2분)
    • send() 후 성공 또는 실패를 보고하는 시간의 상한
    • retries를 조정하는 대신에 delivery.timeout.ms 조정으로 재시도 동작을 제어한다.

 

Producer Delivery Life Cycle

Producer가 send()하면 단건으로 메시지를 전송할 수 있지만 RPC(Remote Produce Call) 수를 줄여서 Broker가 처리하는 작업을 줄여 더 나은 처리량을 제공하기 위해 Batch 처리를 할 수 있다. Batch 처리를 하기 위한 parameter는 다음과 같다. batch 처리의 일반적인 설정은 linger.ms=100, batch.size=1000000이다.

  • linger.ms(default: 0, 즉시 보냄)
    • 메시지가 함께 batch 처리될 때까지 대기 시간
  • batch.size(default: 16KB)
    • 보내기 전 Batch의 최대 크기

위 그림에서 보는 것처럼 Producer가 send()를 호출하면 linger.ms의 시간만큼 대기를 하거나 데이터 크기가 batch.size 이상으로 요청하는 경우 linger.ms와 상관없이 메시지를 전송한다. 그리고 max.in.flight.requests.per.connect=5(default)의 설정으로 동시에 batch를 처리하기 때문에 진행 중인(in-flight)인 여러 요청을 재시도하게 되면 메시지 순서가 변경될 위험이 있다. 예를 들어, batch 0은 실패했지만 batch 1은 성공하면 batch 1은 batch 0보다 먼저 Commit Log에 추가되어 순서가 달라지게 된다. 이를 방지하기 위해 Producer에서 enable.idempotencetrue로 설정하면, 하나의 batch가 실패하는 경우 같은 Partition으로 들어오는 후속 batch들도 OutOfOrderSequenceException과 함께 실패하도록 처리하여 메시지 순서를 보장할 수 있다.

 

Kafka Flush

메시지는 Log Segment File(기본값: 1GB마다 새로운 Segment 생성)로 구성된 Partition에 기록되고, 성능을 위해 Log Segment는 OS Page Cache에 기록된다. 로그 파일에 저장된 메시지의 데이터 형식은 Byte Array로 Broker가 Producer로부터 수신할 것, 그리고 Consumer에게 보내는 것과 정확히 동일하므로, Zero-Copy(데이터가 User Space에 복사되지 않고, CPU 개입 없이 Page Cache와 Network Buffer 사이에서 직접 전송되는 것을 의미. 이것을 통해 Broker Heap 메모리를 절약하고 또한 엄청난 처리량을 제공)가 가능하다.

 

Page Cache는 Broker가 완전히 종료되거나 OS Background "Flusher Thread"가 실행되면 디스크로 flush되는데 그 전에 Broker가 장애가 발생하면 Replication이 없는 경우 데이터는 영구적으로 손실될 수 있다. 따라서 Partition을 꼭 Replication 두어 Broker가 다시 온라인 상태가 되면 필요시 Leader Replica(복제본)에서 데이터가 복구될 수 있도록 해야 한다.

 

Kafka에서는 마지막 flush 이후의 메시지 수(log.flush.interval.messages) 또는 시간(log.flush.interval.ms)으로 flush(fsync)를 트리거할 수 있도록 설정할 수 있다. 하지만 Kafka는 운영 체제의 background Flush 기능(pdflush)을 더 효율적으로 허용하는 것을 선호하기 때문에 이러한 설정은 기본값으로 무한(fsync 비활성화)으로 설정하는 것을 권장한다.

 

Consumer

카프카 클라이언트 중 하나로 Consumer는 Topic의 메시지를 가져와서 소비(Consume)하는 애플리케이션을 말한다. Consumer는 1개 이상의 토픽을 구독해서 여기에 저장된 메시지들을 각 파티션에 쓰여진 순서대로 일거오고, 메시지의 오프셋(offset)을 기록함으로써 어느 메시지까지 읽었는지를 유지한다.

 

Produce와 Consumer의 분리(Decoupling)

Producer와 Consumer는 서로 알지 못하며, Producer와 Consumer는 각각 고유의 속도로 Commit Log에 Write 및 Read를 수행한다. 다른 Consumer Group에 속한 Consumer들은 서로 관련이 없으며, Commit Log에 있는 Event(Message)를 동시에 다른 위치에서 Read할 수 있다.

Commit Log는 추가만 가능하고 변경 불가능한 데이터 스트럭처로 데이터(Event)는 항상 로그 끝에 추가되고 변경되지 않는다. Offset은 Commit Log에서 Event의 위치로 위 그림체서 0부터 10까지의 Offset을 볼 수 있다. Producer가 Write하는 LOG-END-OFFSET과 Consumer Group의 Consumer가 Read하고, 처리한 후에 Commit한 CURRENT-OFFSET과의 차이(Consumer Lag)가 발생할 수 있다.

 

Consumser 관련 Position들

  • Last Commited Offset(Current Offset): Consumer가 최종 Commit한 Offset
  • Current Position: Consumer가 읽어간 위치(처리 중, Commit 전)
  • High Water Mark(Commited): ISR(Leader-Foller)간에 복제된 Offset
  • Log End Offset: Producer가 메시지를 보내서 저장된, 로그의 맨 끝 Offset

 

Consumer Offset

Consumer Offset은 Consumer Group이 읽은 위치를 표시하는 정보로 __consumer_offsets라는 Internal Topic에서 Consumer Offset을 저장하여 관리한다. Consumer Offset은 Conumser가 자동이나 수동으로 데이터를 읽은 위치를 commit하여 다시 읽는 것을 방지하기 위해 사용한다.

위 그림처럼 Conumser Group A에서 MyTopic 토픽의 Partition 0에서 offset 2까지 읽은 경우에 __consumer_offsets 라는 토픽(default 50개)에 해당 정보를 저장한다. {ConsumerGroup명}:{Topic}:{Partition Num}:{Offset 읽을 위치}가 저장되어 있는 것을 확인할 수 있다. 이를 통해 멀티 파티션으로 구성된 토픽과 멀티 Consumer가 하나의 Consumer Group에 있다고 하더라도 각 Consumer는 정확히 하나의 Partition에서 Record(Message)를 consume할 수 있다.

 

Consumer Group

Topic의 메시지를 사용하기 위해 협력하는 Consumer들의 집합이다. 하나의 동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성하고, Consumer Group 내의 Consumer들은 협력하여 Topic의 메시지를 분산 병렬 처리한다. 하나의 partition은 지정된 Consumer Group 내의 하나의 Consumer만 사용하고, 동일한 Key를 가진 메시지는 동일한 Consumer가 사용한다. Consumer의 설정 파라미터 중에서 partition.assignment.strategy로 할당 방식을 조정하고, Consumer Group은 Group Coordinator라는 프로세스에 의해 관리된다.

 

Partition Assignment Strategy

Consumer의 설정 파라미터 중에서 parition.assignment.strategy으로 다음과 같은 할당 방식을 통해 조정할 수 있다.

  • org.apache.kafka.clients.consumer.RangeAssignor
    • Topic별로 작동하는 Default Assignor
    • 동일한 Key를 가지고 있는 메시지들에 대한 Topic들 간에 "co-partitioning"하기 유리하다.
  • org.apache.kafka.clients.consumer.RoundRobinAssignor
    • Round Robin 방식으로 Consumer에게 Partition을 할당
    • 재할당(reassign) 후 Consumer가 동일한 Partition을 유지한다고 보장할 수 없다. 즉, Consumer 0이 Topic A의 Partition 0에 할당되었지만, 재할당이 발생하면 Topic A의 Partition 0이 다른 Consumer에게 할당될 수 있다.
  • org.apache.kafka.clients.consumer.StickyAssignor
    • Consumer들에게 할당된 Topic Partition의 수를 최대 1만큼만 차이나게 하여 최대 균형을 이루는 할당을 보장한다.
    • Consumer가 제거되어 재할당이 발생했을 때, Round Robin 방식은 전체를 재할당하지만 StickyAssignor는 기존 할당을 두고 Consumer 1에 할당된 Partition들에 대해서만 재할당을 하여 최대한 기존 할당을 보존한다.
  • org.apache.kafka.clients.consumer.CooperativeStickyAssignor
    • 동일한 StickyAssignor 논리를 따르지만 협력적인 Rebalance을 허용
  • org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
    • 인터페이스를 구현하면 사용자 지정 할당 전략을 사용할 수 있음

 

Consumer Rebalancing

위 그림처럼 4개의 파티션이 Topic를 consume하는 4개의 Consumer가 하나의 Consumer Group에 있다면, 각 Consumer는 정확히 하나의 Partition에 Record를 consume한다. 하지만 만약 하나의 Consumer에 장애가 생겨 메시지를 consume할 수 없는 상황이 발생한다면 Consumer는 주어진 Topic에 1개 이상의 많은 Partition을 사용할 수 있기 때문에 Rebalancing 작업을 통해 Partition으로 오는 Record를 consume할 수 있도록 한다.

 

출처: https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/

처음은 Subscription 과정으로 Consumer들이 JoinGroup 요청을 Group Coordinator에 보내면 리밸런싱이 시작되며, JoinGroup의 응답이 Consumer들에 전송된다. 모든 구성원은 Broker에 SyncGroup 요청을 보내야 하며, Group Leader는 각 Consumer의 Partition 할당을 계산해서 Group Coordinator에게 전송하면 Broker는 SyncGroup 응답에서 각 Consumer별 Partition 할당을 보낸다. 오랫동안 사용되었던 Eager Rebalancing 프로토콜 방식은 각 구성원이 JoinGroup 요청을 보내고, 재조정에 참여하기 전에 소유한 모든 Parition을 취소해야 하기 때문에 안전면에서는 좋지만 재조정 기간 동안 작업을 수행할 수 없다는 단점이 있다. 따라서 이를 보완하기 위해 재종이 필요한 Partition만 revoke하는 Incremental Cooperative Rebalancing Procotol이 나오게 되었다.

 

출처: https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/

위 그림처럼 ConsumerPartitionAssignor으로 설정한 상태에서 Consumer A는 Partition 1, 3을, Consumer B는 Partition 2를 구독한 상태에서 Consumer C가 추가되었다면 Rebalancing이 총 두 번 일어나게 된다. 첫 재조정 단계에서 마찬가지로 JoinGroup 요청을 보내면서 시작하지만, 소유한 모든 Partition을 보유하고 그 정보를 Group Coordinator에게 보낸다. Group Leader는 원하는 대로 Consumer에 Partition을 할당하지만, 소유권을 이전하는 Partition들만 취소한다. Partition을 취소한 Consumer는 그룹게 ReJoin하여 취소된 Partition을 할당할 수 있도록 두 번째 재조정을 트리거하는 방식으로 revoke할 Partition을 최소화한다.

빈번하게 Rebalancing되는 상황이거나 스케일 인/아웃으로 인한 다운타임이 우려가 된다면 해당 방식을 사용할 수 있다.

 

 

Topic, Partition, Segment

Topic

Kafka 안에서 메시지가 저장되는 논리적인 장소이다. Topic 생성 시 Partition 개수를 지정하고, 각 Partition은 Broker들에 분산되며 Segment File들로 구성된다. 운영 시에 개수 변경이 가능하나 변경을 권장하지 않는다.

 

 

Partition

Commit Log, 하나의 Topic은 하나 이상의 Partition으로 구성되오 병렬처리(Throughput 향상)을 위해서 Multi Partition 사용을 권장한다. Partition 번호는 0부터 시작하고 오름차순이다. Topic 내의 Partition들은 서로 독립적이다.

각 파티션 내에는 Event(Message)의 위치를 나타내는 offset이 존재한다. 파티션에 저장된 데이터(Message)는 변경이 불가능(immutable)하고 한 번 Write된 데이터는 맨 끝에 추가되어 저장되는 방식으로 Offset 값은 계속 증가하고 0으로 돌아가지 않는다.

 

또한 주의할 점으로 Event(Message)의 순서는 하나의 Partition 내에서만 보장이 가능하다. 즉, 멀티 파티션, 멀티 컨슈머를 가진 컨슈머 그룹을 가지는 경우 메시지 순서를 보장하기는 어렵다. 이를 해결하기 위해 주문 생성 -> 주문 진행 중 -> 주문 성공/실패 순서로 메시지 순서가 매우 중요한 경우, 메시지의 Key를 해시 함수를 사용하여 Topic의 어떤 Partition으로 보낼지 결정하면 메시지 순서를 보장할 수 있고, 마찬가지로 동일한 Key를 가진 메시지는 동일한 Consumer Group 내 Consumer에게 전달되도록 보장하여 메시지 순서를 보장할 수 있다.

 

Segment

메세지(데이터)가 저장되는 실제 물리 File이다. Segment File이 지정된 크기보다 크거나 지정된 기간보다 오래되면 새 파일이 열리고 메시지는 새 파일에 추가된다. 위 그림을 보는 것처럼 Partition당 오직 하나의 Segment가 활성화(Active)되어 데이터가 계속 쓰여지고 있다.

세그먼트 파일은 Rolling하여 분리/생성될 때 다음과 같은 기본 정책으로 가지고 아래 파라미터 중 하나라도 해당되면 새로운 Segment File로 Rolling된다.

  • log.segment.bytes: default 1GB
  • log.roll.hours: default 168 hours
  • log.index.size.max.bytes: default 10MB(인덱스 파일)
  • __consumer_offset(Offset Topic)의 Segment File Rolling 파라미터는 별도
    • offsets.topic.segment.bytes(default: 100MB)마다 롤링된다.

 

Kafka Log Segment File

Kafka Log Segment File은 Data File이라고 부르기도 한다. Segment File이 생성되는 위치는 각 Broker의 server.properties 파일 안에서 log.dirs 파라미터로 정의할 수 있고, 콤마(,)로 구분하여 여러 디렉터리를 지정할 수 있다. 그러면 각 Topic과 그 Partition은 log.dirs 아래에 하위 디렉터리로 구성된다. 예를 들어, test_topic의 Partition 0이 있는 경우 아래와 같은 {log.dirs 설정 디렉터리}/test_topic-0 디렉터리가 생성된다.

 

test_topic의 Partition 0 디렉터리에 생성되는 파일의 예

00000000000000123453.* 파일은 00000000000000123453 offset부터 00000000000007735203 offset까지의 메시지를 저장하고 관리하는 것을 의미한다. 그리고 위에서 확인할 수 있듯이 Partition 디렉터리에 생성되는 파일에는 최소 4가지의 타입이 있다.

  • Log Segment File(.log) - 메시지와 metadata를 저장
  • Index File(.index) - 각 메시지의 Offset을 Log Segment 파일의 Byte 위치에 매핑
  • Time-based Index File(.timeindex) - 각 메시지의 timestamp를 기반으로 메시지를 검색하는데 사용
  • Leader Epoch Checkpoint File(leader-epoch-checkpoint) - Leader Epoch과 관련 Offset 정보를 저장
  • 그 외 Idempotent Product를 사용하면 .snapshot 파일이, Transactional Producer를 사용하면 .txnindex 파일이 생성된다.

 

Checkpoint File

각 Broker에는 log.dirs 디렉터리에 위치하는 2개의 Checkpoint File이 존재한다.

  • replication-offset-checkpoint
    • 마지막으로 Commit된 메시지의 ID인 High Water Mark로 시작시 Follower가 이를 사용하여 Commit되지 않은 메시지를 Truncate한다.
  • recovery-point-offset-checkpoint
    • 데이터가 디스크로 Flush된 시점으로 복구 중 Broker는 이 시점 이후의 메시지가 손실되었는지 여부를 확인한다.

 

Log Retention & Cleanup Policy

Log는 Consume되어도 지워지지 않지만 Broker 혹은 Topic 단위로 Cleanup 정책을 설정할 수 있다. 현재 Active Segment의 Log는 cleanup 대상이 아니다.

  • log.cleanup.policy를 delete로 설정한 경우
    • segment 파일에 저장된 가장 최신의 메시지가 log.retention.ms보다 오래된 segment를 삭제하는 정책이다.
    • log.retention.ms: log 보관 주기(default: 7일)
    • log.retention.check.interval.ms: log segment를 체크하는 주기(default: 5분)
  • log.cleanup.policy를 compact로 설정한 경우
    • 주어진 key에 해당하는 value만 유지하는 정책이다. key가 있는 메시지를 사용하면 Custom Partitioner를 사용하지 않는 한, 특정 Key를 가지고 있는 모든 메시지는 동일한 Partition으로 발행하는데 Compact(압축) 정책은 Partition 별로 특정 Key의 최신 Value만 유지하며 압축한다.
    • 동일한 Key를 가진 메시지가 다른 Partition이 있는 경우, 동일한 Key를 가진 여러 메시지가 여전히 있을 수 있다.
    • 압축이 없으면 Consumer는 항상 전체 로그를 읽고 결국 각 Key에 대한 최신 상태에 도달할 수 있지만, 로그 압축을 사용하면 오래된 데이터를 읽지 않기 때문에 Consumer가 최종 상태에 더 빨리 도달할 수 있다.
    • Compaction 성능 튜닝 관련 옵션
      • log.cleaner.min.cleanable.ratio: Head 영역 데이터가 Tail 영역보다 크면(default 50%) Cleaner tlwkr
      • log.cleaner.io.max.bytes.per.second: Log Cleaner의 Read/Write의 처리량을 제한하여 시스템 리소스 보호 가능(default: 무제한)
      • 동일한 key를 갖는 메시지가 매우 많은 경우, 더 빠른 정리를 위해 아래 파라미터를 증가시켜야 한다.
        • log.cleaner.threads(default: 1)
        • log.cleaner.dedupe.buffer.size(default: 134,217,728)

 

 

Kafka Broker

Kafka Broker는 Partition에 대한 Read 및 Write를 관리하는 컴포넌트이다. Kafka Server라고 부르기도 하는데 Topic을 구성하는 Partition들은 여러 Broker 상에 분산되엉 있다. 즉, Topic 생성 시 Kafka가 자동으로 Topic을 구성하는 전체 Partition들을 모든 Broker에게 할당해주고 분배하기 때문에 위 그림처럼 Topic 데이터의 일부분(Partition)을 갖을 뿐 데이터 전체를 갖고 있지 않다.

Kafka Cluster는 여러 개의 Broker들로 구성된 컴포넌트로 Client가 특정 Broker에 연결하면 전체 클러스터에 연결된다. 하지만 특정 Broker 장애를 대비하여, 전체 Broker List(IP, Port)를 파라미터로 입력하는 것을 권장한다. 각각의 Broker는 메타데이터를 공유하기 때문에 모든 Broker, Topic, Partition에 대해 알고 있다.

Kafka Cluster를 구성할 때 ZoopKeeper 모드에서는 Upgrade 등 작업 시 Broker를 순차적으로 종료 시, 정상 동작하기 위해 4대 이상을 권장하고, ZooKeeper 모드가 아닌 KRaft 모드에서는 3대 이상을 권장한다.

 

ZooKeeper 아키텍처

ZooKeeper는 분산형 Configuration 정보 유지, 분산 동기화 서비스를 제공하고 대용량 분산 시스템을 위한 네이밍 레지스트리를 제공하는 서버로 ZooKeeper를 사용하여 멀티 Kafka Broker들 간의 정보(변경 사항 포함) 공유, 동기화 등을 수행할 수 있다. 위 그림처럼 Leader/Follwer 기반의 Master/Slave 아키텍처로 구성되어 예상치 못한 장애가 발생해도 Quorum 알고리즘 기반을 통해 분산 시스템의 일관성을 유지시키기 위해 사용한다.

Quorum(쿼럼)은 "정족수"이며, 합의체가 의사를 진행시키거나 의결을 하는데 필요한 최소한도의 인원수를 의미한다. Quorum 기반으로 일관성을 유지하기 위해 ZooKeeper는 홀수 개의 서버로 작동하게 설계되어 있다.

예를 들어, ZooKeeper 클러스터의 서버가 3대로 구성되었다면 Quorum은 2, 즉 Zookeeper 1대가 장애가 발생하더라도 정상 동작하고, 서버가 5대로 구성되었다면 Quorum은 3, 즉 Zookeeper 2대가 장애가 발생하더라도 정상 도작할 수 있다.

 

Replication

만약 Broker에 장애가 발생하여 Broker 내 Partition들을 모두 사용할 수 없게 되는 경우 어떻게 이를 해결할 수 있을까? 이러한 장애를 대비하기 위해서 Kafka에서는 Partition을 복제(Replication)하여 다른 Broker 상에서 복제물(Replicas)을 만들어서 장애를 미리 대비할 수 있다. 위 그림에서 볼 수 있듯이 Producer가 직접 Write하는 브로커에 있는 Partition을 Leader Partition이라고 하고, 나머지 복제해 가는 Partition을 Follwer Partition이라고 한다. Follower는 Broker 장애시 안정성을 제공하기 위해서만 존재하는 것으로 Producer는 Reader에만 Write하고, Consumer는 Leader로부터만 Read한다. Leader의 Commit Log에서 다음과 같은 과정으로 데이터를 가져오기 요청(Fetch Request)으로 복제한다.

  1. offset 3까지 복제가 완료되어 있는 상황에서 Producer가 메시지를 보내면 Leader가 offset 4에 새 메시지를 추가한다.
  2. 각 Follwer들의 Fetch Thread가 독립적으로 fetch를 수행하고, 가져온 메시지를 offset 4에 메시지를 Write한다.
  3. 각 Follwer들의 Fetch Thread가 독립적으로 다시 fetch를 수행하여 null을 받고, Leader는 High Water Mark를 이동시킨다.
  4. 각 Follwer들의 Fetch Thread가 독립적으로 다시 fetch를 수행하고 High Water Mark를 받는다.

 

Leader에 장애가 발생하면 Kafka Cluster는 Follwer Partition들 중 하나를 Leader로 선출하고 클라이언트(Producer, Consumer)는 자동으로 새 Leader로 전환하여 Write/Read를 한다.

Kafka Cluster는 특정 Broker에만 Leader Partition이 몰리는 Hot Spot을 방지하기 위해 다음과 같은 옵션을 제공하고 있다.

  • auto.leader.rebalance.enable: 기본값 enable
  • leader.imbalance.check.interval.seconds: 기본값 300초로 Leader Partition의 분배가 불균형되었는지 주기적으로 체크한다.
  • leader.imbalance.per.broker.percentage: 기본값 10%로 Leader Partition 분배의 불균형 상태를 확인하고, 만약 이 값을 넘어가는 경우 rebalancing이 이루어진다.

 

In-Sync Replicas(ISR)

In-Sync Replicas(ISR)는 High Water Mark라고 하는 지점까지 동일한 Replicas(Leader와 Follwer 모두)의 목록이다. 위 그림에서 replicas.lag.max.messages를 4로 설정하는 경우에 Leader Partition의 LOG-END-OFFSET 기준으로 offset이 4 미만으로 차이나는 offset을 Fully-Replicated Commited 상태라고 하고, 해당 위치를 High Water Mark라고 하여 replication-offset-checkpoint 파일에 체크포인트를 기록한다. offset이 4 이상 차이나는 Follwer Partition을 Out-of-Sync Follower(OSR)이라 한다. 따라서 해당 Partition에 대한 ISR 목록을 확인하게 되면 상위 두 개의 파티션만 해당하게 되고, Leader Partition에 있는 Broker 101이 장애 발생 시 Leader를 가장 잘 따라오고 있는 Broker 102에 있는 Follwer Partition이 Leader로 선출하게 된다.

새 Leader가 선출된 시점을 Offset으로 표시하는데 이를 Leader Epoch라고 한다. Broker 복구 중에 메시지를 체크포인트로 자른 다음 현재 Leader를 따르기 위해 사용된다. Controller가 새 Leader를 선택하면 Leader Epoch를 업데이트하고 해당 정보를 ISR 목록의 모든 구성원에게 보낸다. Leader Epoch는 leader-epoch-checkpoint 파일에 체크포인트를 기록한다.

 

Follwer 및 Leader가 실패하는 경우

Follwer가 실패하는 경우
Leader가 실패하는 경우

하지만 replica.lag.max.message로 ISR 판단하는 경우 메시지가 항상 일정한 비율로 Kafka로 들어오다 메시지 유입량이 갑자기 늘어날 경우 실제 Follower들은 정상적으로 동작하고 단지 잠깐 지연만 발생했을 뿐인데 지연으로 판단하고 OSR(Out-of-Sync Replica)로 상태를 변경시킬 수도 있다. 따라서 Follwer가 Leader로 Fetch 요청을 보내는 Interval을 체크하는 replica.lag.time.max.ms로 판단을 해야 한다. replica.lag.time.max.ms = 10000이라 하면 Follwer가 Leader로 Fetch 요청을 10000 ms 내에만 요청하면 정상으로 판단한다. Confluent에서는 replica.lag.time.max.ms 옵션만 제공하고 있다.

 

위 그림처럼 Follwer가 너무 느리거나 장애가 나는 경우 Leader는 ISR에서 Follwer를 제거하고 ZooKeeper에 ISR을 변경 목록을 알려준다. ZooKeeper에서는 Partition Metadata에 대한 변경 사항을 기록한 후 Controller에게 이를 알려준다. Leader가 장애가 나는 경우에는 위에서 설명했듯이 Controller가 새로운 Leader를 선출하고, ISR 정보를 업데이트하여 ZooKeeper에 기록한 다음 클라이언트 메타데이터 업데이트를 위해 모든 Broker에 전파한다.

 

Controller는 Kafka Cluster 내의 Broker 중 하나로 선정되는데 ZooKeeper와 상호작용하면서 클러스터의 메타데이터를 관리하고 Broker Liveness를 모니터링한다. Controller는 ZooKeeper에 Replicas 정보의 복사본을 유지한 다음 더 빠른 액세스를 위해 클러스터의 모든 Broker들에게 동일한 정보를 캐시하는 역할을 담당하여 각 Broker가 개별적으로 ZooKeeper와의 연결 수를 줄여주고, Broker들은 Replicas 정보에 빠르게 접근하여 Leader 장애 시 Leader Election을 빠르게 수행할 수 있다. Controller가 장애가 나면 다른 Active Broker들 중에서 재선출된다.

 

 

출처
Kafka와 RabbitMQ의 차이점은 무엇인가요?