[Kafka] Kafka 컨슈머(Consumer) 이해하기
l Kafka
카프카는 메시지를 생산, 발송하는 프로듀서(producer)와 메시지를 수신, 소비하는 컨슈머(consumer)가 있으며, 프로듀서와 컨슈머 사이에서 메시지를 중개하는 브로커(broker)로 구성된다. 이번 포스트는 컨슈머에 대해서 알아본다.
프로듀서가 카프카의 토픽으로 메시지를 보내면 그 토픽의 메시지를 가져와 소비하는 역할을 하는 애플리케이션, 서버 등을 지칭해서 컨슈머(Consumer)라고 한다. 컨슈머의 주요 기능은 파티션 리더에게 메시지를 가져오기를 요청하는 것이다. 각 요청은 로그의 오프셋을 명시하고 그 위치로부터 로그 메시지를 수신한다. 그래서 컨슈머는 수신할 메시지의 위치를 조정할 수 있으며, 이미 수신한 메시지를 다시 수신할 수도 있다.
카프카 컨슈머의 또 하나 특징은 하나의 토픽에 서로 다른 컨슈머 애플리케이션이 동시에 구독할 수 있다는 것이다. 위 그림을 살펴보면, 각 토픽에 여러 컨슈머가 동시에 구독하는 것을 볼 수 있다. 단일 토픽에 대한 멀티 컨슈밍이 가능한 이유는 컨슈머가 메시지를 읽어도 브로커에서 메시지가 유지되기 때문이다. 대신에 각 컨슈머는 파티션의 어느 오프셋까지 읽었는지를 구분하기 위해 컨슈머 오프셋(_consumer_offset) 정보를 토픽에 저장한다. 그렇기 때문에 컨슈머는 오프셋에 저장된 정보를 통해 안정적으로 메시지를 읽어올 수 있다.
[컨슈머 그룹]
컨슈머를 실행할 때는 항상 컨슈머 그룹이라는 것이 필요하다. 만약 컨슈머를 실행할 때 옵션으로 그룹을 설정하지 않으면 자동적으로 console-consumer-xxxxx(숫자형태) 형식의 컨슈머 그룹이 생성된다. 컨슈머 그룹은 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해 메시지를 가져올 수 있다.
컨슈머 그륩은 확장도 가능하다. 만약 프로듀서가 토픽에 보내는 메시지 속도가 증가하여 컨슈머가 메시지를 가져가는 속도보다 빨라지면, 컨슈머가 처리하지 못한 메시지들이 점점 많아지기 때문에 컨슈머를 확장하여 이 문제를 해결해야 한다. 이때 단순히 컨슈머만 확장한다면 기존의 컨슈머어 오프셋 정보와 새로 추가된 컨슈머의 오프셋 정보가 뒤섞이면서 수신되는 메시지들이 엉망이 될 수 있다. 그래서 카프카에서는 동일한 토픽에 대해 여러 컨슈머가 메시지를 가져갈 수 있도록 컨슈머 그룹이라는 기능을 제공한다.
컨슈머 그룹안에서는 컨슈머들은 메시지를 가져오고 있는 토픽의 파티션에 대해 소유권을 공유한다. 위 그림을 보면, Consumer2를 추가함으로써 Partition1의 소유권이 Consumer2로 이동하는데 이렇게 소유권을 이동하는 것을 리밸런스(rebalance)라고 한다. 이러한 리밸런스를 통해 컨슈머 그룹에서는 컨슈머를 쉽고 안전하게 추가할 수 있다. 하지만 리밸런스를 하는 동안 일시적으로 컨슈머 그룹 전체는 메시지를 가져올 수 없다는것에 주의하자. 그 이유는 컨슈머 그룹 내에서 리밸런스가 일어나면 토픽의 각 파티션마다 하나의 컨슈머가 연결되기 때문이다.
그렇다면, 메시지 수가 많아 토픽에 메시지가 점점 쌓이는 상황일 때, 단순히 컨슈머만 추가하면 메시지 적체 현상이 해소될까? 정답은 “아니오” 이다. 위에서 설명할 때, 컨슈머는 하나의 파티션에 매핑 된다고 했다. 그렇기 때문에 파티션개수보다 컨슈머가 많을 경우 일부 컨슈머는 아무일도 하지 않고 대기하게 된다. 그래서 파티션 개수와 컨슈머의 개수를 함께 늘려줘야 한다. 이때 컨슈머의 노드ID는 중복되지 않도록 주의한다.
컨슈머는 일정한 주기로 하트비트를 보내는데, 이를 통해서 메시지를 잘 처리하고 있다는 것을 판단한다. 기존의 하트비트 방식은 컨슈머가 poll할 때와 가져간 메시지를 커밋할 때 보낸다. 만약 컨슈머가 오랫동안 하트비트를 보내지 않으면 세션은 타임아웃되고 해당 컨슈머가 문제가 있다고 판단하여 리밸런스를 시작한다. 이러한 상황을 지속되면 메시지를 늦게 가져오는 현상이 발생할 수 있기 때문에 모니터링을 통해 컨슈머의 장애 상황을 인지하고 새로운 컨슈머를 추가하여 정상적인 상태를 유지할 수 있도록 해야한다.
새로운 하트비트 방식 (KIP-62)에서는 백그라운드 스레드가 도입되어 사용자가 poll()을 호출하기를 기다리는 대신 이 스레드는 Kafka에 우리가 잘 작동하고 있음을 알리는 역할을 한다. 따라서 전체 프로세스가 하트비트 스레드와 함께 종료되면 Kafka는 이를 신속하게 발견하고 하트비트가 비동기식으로 더 자주 전송되기 때문에 레코드 처리에 소요되는 시간에 더 이상 영향을 받지 않는다.
[커밋과 오프셋]
컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져가 메시지의 위치 정보(오프셋)을 기록한다. 이렇게 현재의 위치를 업데이트 하는 동작을 커밋(commit) 이라고 한다. 컨슈머에 장애가 발생하거나 새로운 컨슈머가 추가되었을 때 리밸런싱이 발생하고, 컨슈머는 새로운 파티션에 대해 가장 최근 커밋된 오프셋을 읽고 그 이후부터 메시지들을 가져온다. 만약 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋보다 작으면 메시지는 중복으로 처리되고, 커밋된 오프셋이 컨슈머가 실제 마지막으로 처리한 오프셋 보다 크다면 마지막으로 처리된 오프셋과 커밋된 오프셋 사이의 모든 메시지는 누락된다. 이러한 동작방식 때문에 커밋은 매우 중요하다.
l 자동 커밋 : 기본값 5초마다 컨슈머가 poll()을 호출할 때마다 가장 마지막 오프셋 값을 커밋한다. Enable.atuo.commit=true로 설정이 가능하며, auto.commit.interval.ms 옵션을 통해 조정이 가능하다. 자동으로 오프셋을 커밋하는 방법은 매우 편리하지만 리밸런싱으로 인한 중복이 발생할 수 있기 때문에 동작에 대해 잘 이해하고 사용하는 것이 좋다.
l 수동 커밋 : 이 경우는 메시지 처리가 완료 될 때까지 메시지를 가져온 것으로 간주되어서는 안되는 경우에 사용한다. 컨슈머 객체의 commitAsync()혹은 commitSync()를 통해 직접 커밋해야 한다. 수동 커밋의 경우에도 중복은 발생할 수 있다.
[컨슈머 주요 옵션]
l bootstrap.servers : 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보. 특정 호스트 정보만 사용시, 해당 호스트만 장애가 발생하는 경우에는 접속이 불가하기 때문에, 리스트 전체를 입력하는 방식을 권장한다.
l fetch.min.bytes : 한번에 가져올 수 있는 최소 데이터 사이즈로, 지정한 사이즈보다 작은 경우, 요청에 대해 응답하지 않고 데이터가 누적될때까지 기다린다.
l group.id : 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자로, 그룹 아이디이다.
l enable.auto.commit : 백그라운드로 주기적으로 오프셋을 커밋한다.
l auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더이상 존재하지 않은 경우(데이터가 삭제)에 다음 옵션으로 리셋한다. (- earliest : 가장 초기의 오프셋값으로 설정 - lastest : 가장 마지막의 오프셋값으로 설정 - none : 이전 오프셋값을 찾지 못하면 에러를 리턴)
l fetch.max.bytes : 한번에 가져올 수 있는 최대 데이터 사이즈이다.
l request.timeout.ms : 요청에 대해 응답을 기다리는 최대 시간이다
l session.timeout.ms : 컨슈머와 브로커 사이의 세션 타임 아웃 시간(기본값 10초)으로 브로커가 컨슈머가 살아있는 것으로 판단하는 시간이다.
l heartbeat.interval.ms : 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼 것인지 조정 (기본값은 3초) 한다.
l max.poll.records : 단일 호출 poll()에 대한 최대 레코드 수를 조정한다. 이 옵션을 통해 애플리케이션이 폴링 루프에서 데이터 양을 조정할 수 있다.
l max.poll.interval.ms : 컨슈머가 살아있는지를 체크하기 위해 하트비트를 주기적으로 보내는데, 컨슈머가 계속해서 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우가 있을 수도 있다. 이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외한 후 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게 한다.
l auto.commit.interval.ms : 주기적으로 오프셋을 커밋하는 시간이다.
l fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간이다.
[참고자료]
l Kafka Consumers : https://ibm-cloud-architecture.github.io/refarch-eda/technology/kafka-consumers/
l https://docs.confluent.io/kafka/design/consumer-design.html
l What does the heartbeat thread do in Kafka Consumer? : https://chrzaszcz.dev/2019/06/kafka-heartbeat-thread/
l KIP-62: Allow consumer to send heartbeats from a background thread : https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
2024-01-23 / Sungwook Kang / https://sungwookkang.com
KAFKA, 아파치 카프카, Apache Kafka, 카프카 토픽, 카프카 파티션, Kafka Topic, Kafka Partition, 카프카 복제, Kafka Replication, 컨슈머, Consumer, Kafka Consumer, 카프카 컨슈머
'SW Engineering > DevOps, SRE' 카테고리의 다른 글
[Kafka] ksqlDB를 사용하여, SQL 구문으로 쉽고 빠르게 카프카 데이터 조회하기 (0) | 2024.03.26 |
---|---|
[Kafka] Kafka와 Debezium을 활용하여 SQL Server의 데이터를 실시간으로 PostgreSQL로 복제하기 (0) | 2024.02.26 |
[Kafka] Kafka 프로듀서(Producer) 이해하기 (0) | 2024.01.22 |
[Kafka] Kafka 리더, 팔로워, 복제 및 복구 이해하기 (0) | 2024.01.19 |
[Kafka] Kafka 데이터 모델인 Topic과 Partition 이해하기 (0) | 2024.01.18 |