728x90
Python으로 Kafka Consumer 생성
Python에서는 confluent_kafka 라이브러리를 주로 사용하여 Kafka 클러스터와 상호작용합니다.
Apache Kafka - Cluster Architecture
1. 간단한 Kafka 컨슈머를 만들기
confluent_kafka 라이브러리를 설치합니다.
pip install confluent_kafka
Python Kafka Consumer 예제
- Topic : test-topic-1
- Kafka Broker Server : node1:9092, node2:9092, node3:9092
- Consumers Group ID : my-consumer-group
vim python_kafka_consumer.py
from confluent_kafka import Consumer, KafkaException
# Kafka Consumer 설정
conf = {
'bootstrap.servers': 'node1:9092,node2:9092,node3:9092', # Kafka broker 목록
'group.id': 'my-consumer-group', # 컨슈머 그룹
'auto.offset.reset': 'earliest', # 시작 시점 (earliest: 처음부터, latest: 최신 메시지부터)
}
# Consumer 객체 생성
consumer = Consumer(conf)
# 구독할 토픽 설정(토픽 이름)
consumer.subscribe(['test-topic-1'])
try:
while True:
# 메시지 소비
msg = consumer.poll(timeout=1.0) # 1초 동안 메시지를 기다림
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
# 메시지 출력
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
# 컨슈머 종료
consumer.close()
실행 권한 부여
chmod +x python_kafka_consumer.py
2. 프로듀서 메시지 전송(Send Message)
Topic : test-topic-1
/opt/kafka/bin/kafka-console-producer.sh --topic test-topic-1 --bootstrap-server node1:9092
3. 컨슈머 메시지 확인(Consume Message)
python python_kafka_consumer.py
$ python python_kafka_consumer.py
Received message: value consumer test
Kafka 클러스터의 브로커 주소와 토픽 이름을 지정하고 메시지를 실시간으로 소비하여 출력하는 방식으로 작동합니다.
728x90
'리눅스' 카테고리의 다른 글
[draft] 우분투에서 PHP 8.3으로 업그레이드하고 기존의 PHP 8.1을 삭제하는 방법 (0) | 2024.10.14 |
---|---|
[draft] Shell 스크립트에서 CRLF와 LF 문제를 해결하는 방법 (0) | 2024.10.11 |
[draft] 우분투에서 apt를 사용하여 최신 Python 버전 설치 및 기본 설정하는 방법 (0) | 2024.10.11 |
[draft] Kafdrop 도구를 사용하여 Kafka 클러스터를 관리하는 방법 (0) | 2024.10.11 |
[draft] Kafka-UI 도구를 사용하여 Kafka 클러스터를 관리하는 방법 (0) | 2024.10.11 |