반응형

[Kafka] Kafka 설치 (with KRaft) PUB/SUB 테스트 코드 (with Python) 실습

 

l  Kafka 3.6.1 with KRaft

 

Apache Kafka(이하 카프카”) 실시간으로 기록 스트림을 게시, 구독, 저장 처리할 있는 분산형 데이터 스트리밍 플랫폼이다. 여러 소스에서 데이터 스트림을 처리하고 여러 사용자에게 전달하도록 설계되었다. 간단히 말해 A지점에서 B지점까지 이동하는 것뿐만 아니라 A지점에서 Z지점을 비롯해 필요한 모든 곳에서 대규모 데이터를 동시에 이동할 있다. 카프카는 현재 여러 마이크로서비스 개발 환경에서 많이 사용되고 있다.

 

카프카는 3.5.2 버전까지는 분산 코디네이션으로 주키퍼(Zookeeper) 사용하였지만, 3.6.0 버전 부터는 분산 코디네이션에 KRaft 도입하였다. 현재 글을 쓰는 시점의 최신 버전인 3.6.1 버전에도 주키퍼와 KRaft 모두 지원하고 있어 사용자가 선택하여 사용할 있다. 하지만 4.0 버전부터는 주키퍼는 지원하지 않는다고 이후에는 KRaft 사용될 예정이라고 한다.

 

Kraft 장점 자세한 설명은 아래 링크를 참고한다

 

l  KRaft: Apache Kafka Without ZooKeeper : https://developer.confluent.io/learn/kraft/

 

 

이번 포스트는 카프카 3.6.1 버전을 설치하고, 주키퍼가 아닌 KRaft 사용하며, 간단히 파이썬 코드로 데이터를 게시(Producer)하고, 구독(Consumer)하는 프로그램을 만들어 본다.

 

[테스트 환경]

OS Ubuntu 22.04.3 LTS
CPU 2 Core
Memory 2 GB
Kafka Kafka 3.6.1

 

 

카프카를 설치 운영하기 위해서는 JDK 설치되어 있어야 한다. JDK 11버전이상 사용해야 하며, 이번 실습에서는 17버전을 설치하였다.

 

apt-get install openjdk-17-jdk

 

 

카프카 다운로드 페이지에서 원하는 버전을 다운로드 압축을 해제한다.

l  Kafka Download : https://kafka.apache.org/downloads

 

카프카의 경우 설치형이 아닌 압축을 해제 서비스를 실행하여 바로 사용할 있다. 실습에서는 3.6.1 버전을 설치한다. 파일을 다운로드 하고, /usr/local 경로에 압축을 해제한다.

wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
 
tar xvf kafka_2.13-3.6.1.tgz -C /usr/local

 

관리 사용자 편의성을 위해 심볼링크를 생성한다.

cd /usr/local
 
ln -s kafka_2.13-3.6.1/ kafka

 

 

[server.properties 세팅]

카프카를 실행하기 전에 server.properties 설정해야 한다. 아래 단계를 따라 설정한다. Properties 파일의 경로는 아래와 같다.

cd /usr/local/kafka/config/kraft/

 

vi 또는 편집기를 실행하여 server.properties 파일의 내용을 수정한다.

vi server.properties

 

외부 통신을 위해서는 advertised.listeners localhost 주소를 카프카 서버 IP 반드시 변경해야 한다. (필자도 부분을 놓쳐서 거의 이틀동안 삽질했다.)

############################# Socket Server Settings #############################
#advertised.listeners=PLAINTEXT://localhost:9092 <-주석처리
advertised.listeners=PLAINTEXT://XXX.XXXX.XXXX:9092 <- localhost 대신 카프카 서버의 IP 입력
 

 

 

log.dirs 로그 파일의 경로이다 . 기본값은 /tmp/kraft-combined-logs 되어 있지만 사용자 편의에 따라 경로를 변경할 있다. 필자는 /usr/local/kafka/kraft-combined-logs 경로에 디렉터리를 생성하고, 변경하였다.

############################# Log Basics #############################
log.dirs=/usr/local/kafka/kraft-combined-logs

 

 

여기까지 server.properties 수정이 완료 되었으면, 저장하고 편집기를 빠져나온다.

 

[Kafka 클러스터 ID 생성 포맷]

카프카 서버를 시작하기전 클러스터 ID 생성하고, 스토리지(위에서 지정한 로그 경로의 디렉터리) 포맷해야 한다.

 

클러스터 ID 아래 명령으로 생성할 있다.

cd /usr/local/kafka
 
./bin/kafka-storage.sh random-uuid

 

 

랜덤한 UUID 클러스터 ID 생성되면 해당 ID 사용하여 스토리지 디렉터리를 포맷한다.

cd /usr/local/kafka
 
./bin/kafka-storage.sh format -t J9wwo7p2RHe9-4PiP4KbBw -c ./config/kraft/server.properties

 

 

 

[카프카 서버 시작 / 중지]

단계까지 완료되었으면 이제 카프카 서버를 시작한다. 카프카 서버의 시작 중지는 아래 명령을 사용할 있다.

#Start Service
cd /usr/local/kafka
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
 
#Stop Service
cd cd /usr/local/kafka
./bin/kafka-server-stop.sh -daemon ./config/kraft/server.properties

 

서비스가 정상적으로 실행되었는지 확인하려면 아래 스크립트를 사용하여 데몬 활성화 상태 포트 오픈 여부를 확인할 있다.

ps -ef | grep java | grep kraft/server.properties

 

 

netstat -antp | grep 9092

 

 

netstat -antp | grep 9093

 

 

 

[Producer / Consumer Test]

카프카 서비스가 정상적으로 실행되었으면, 간단히 토픽을 하나 생성하고, 프로듀서, 컨슈머 테스트를 진행한다.

토픽 생성은 아래 스크립트를 사용한다. 실습에서는 토픽의 이름을 topic1 이라고 생성하였다.

#Create Topic
cd /usr/local/kafka
./bin/kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
 
#Delete Topic
./bin/kafka-topics.sh --delete --topic topic1 --bootstrap-server localhost:9092

 

현재 생성되어 있는 토픽을 확인하는 명령은 아래와 같다.

cd /usr/local/kafka
./bin/kafka-topics.sh --bootstrap-server=localhost:9092 --list

 

 

카프카에 topic1 메시지를 게시한다.

cd /usr/local/kafka
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
> Input message
> Input another message
 
#End Input
Ctrl+C

 

 

카프카의 topic1 메시지를 구독한다.

cd /usr/local/kafka
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning

 

 

 

[파이썬으로 Producer, Consumer 테스트]

파이썬으로 간단한 Producer, Consumer 테스트 코드를 작성해 본다. 우선 카프카 커넥터를 설치해야 한다. 다양한 커넥터가 있지만 실습에서는 kafka-python 커넥터를 사용한다.

pip3 install kafka-python

 

아래 코드는 producer 코드로 1 ~ 10까지 반복하며 topic1 메시지를 입력한다. (파이썬 코드에 관한 설명은 생략한다.)

from kafka import KafkaProducer
from json import dumps
import time
import traceback
 
producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['XXX.XXX.XXX.XXXX:9092'],
    value_serializer=lambda x:dumps(x).encode('utf-8')
    )
 
start = time.time()
 
try:
    for i in range(10):
        data = {'str' : 'result'+str(i)}
        producer.send('topic1', value=data)
        producer.flush()
    producer.close()
    print('[Done]', time.time() - start)
except:
    traceback.print_exc()

 

 

 

아래 코드는 Consumer 코드로 topic1 있는 메시지를 처음부터 구독한다.

from kafka import KafkaConsumer
from json import loads
 
consumer = KafkaConsumer(
    'topic1',
    bootstrap_servers=['XXX.XXX.XXX.XXX:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='1',
    value_deserializer=lambda x: loads(x.decode('utf-8')),
    consumer_timeout_ms=1000
)
 
print('[Start] get consumer')
 
for message in consumer:
    print(f'Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
 
print('[End] get consumer')

 

지금까지 카프카 설치 간단하게 프로슈머, 컨슈머 사용법에 대해서 알아보았다. 카프카는 대규모 메시징 시스템에서 많이 사용된다. 그렇기 때문에 단독 서버가 아닌 클러스터로 구성하여 사용하는 경우가 많다. 카프카의 특성을 이해하고, 토픽 파티션 최적화 설계로 안정적인 서비스 구성이 있도록 반드시 추가 학습을 진행할 것을 권장한다.

 

[참고자료]

KRaft: Apache Kafka Without ZooKeeper : https://developer.confluent.io/learn/kraft/

Mark KRaft as Production Ready : https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready

 

 

 

2024-01-09 / Sungwook Kang / https://sungwookkang.com

 

 

KAFKA, 아파치 카프카, 카프카 설치, Install Kafka, Apache Kafka, Kraft, 카프카 스트림, producer, consumer

 

반응형

+ Recent posts