본문 바로가기

리눅스

[draft] Apache Kafka와 ZooKeeper 클러스터를 구성하는 방법

728x90

Apache Kafka와 ZooKeeper 클러스터를 구성하는 방법

ZooKeeper는 Kafka의 클러스터 메타데이터를 관리하기 위해 필요하며 Kafka 브로커와 ZooKeeper의 클러스터 구성은 분산 환경에서 높은 가용성과 확장성을 제공합니다.

테스트 환경

Hostname IP Role 비고
node1 192.168.0.111    
node2 192.168.0.112    
node3 192.168.0.113    

1. Kafka 및 ZooKeeper 클러스터 구조 개요

ZooKeeper는 Kafka 클러스터의 상태를 관리하고 브로커 간의 협력을 조율합니다.

Kafka 브로커는 메시지를 저장하고 클라이언트(프로듀서 및 컨슈머)로부터 데이터를 송수신합니다. 여러 개의 브로커가 함께 작동하여 클러스터를 구성합니다.

2. 필수 조건

서버 : 각 브로커와 ZooKeeper 인스턴스를 실행할 물리적 또는 가상 서버.

Kafka 및 ZooKeeper 소프트웨어 : 동일한 Kafka 버전 및 ZooKeeper 버전을 각 서버에 설치.

Java 설치 : Kafka는 Java로 실행되므로 각 서버에 JDK 8 이상이 필요합니다.

3. 필수 패키지 설치

sudo apt-get install -y wget

4. 호스트 설정

우분투에서는 호스트 파일에 포함된 127.0.1.1 IP 주소를 주석 처리합니다.

sed -i '/^127\.0\.1\.1/s/^/#/' /etc/hosts
cat <<EOF | sudo tee -a /etc/hosts

#Zookeeper Cluster
192.168.0.111 node1
192.168.0.112 node2
192.168.0.113 node3
EOF

5. Java 설치

cd /usr/local/src

wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz

sudo mkdir -p /opt/java

sudo tar xf jdk-17_linux-x64_bin.tar.gz -C /opt/java --strip-components=1

환경 변수 파일에 JAVA_HOME 추가
echo "export JAVA_HOME=/opt/java" >> ~/.bashrc
echo "export PATH=\$JAVA_HOME/bin:\$PATH" >> ~/.bashrc

source ~/.bashrc

java --version

6. ZooKeeper 설치 및 설정

ZooKeeper Releases

6.1. ZooKeeper 설치

각 서버에서 ZooKeeper를 다운로드하고 설치합니다.

cd /usr/local/src
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
mkdir -p /opt/zookeeper
tar xf apache-zookeeper-3.8.4-bin.tar.gz -C /opt/zookeeper --strip-components=1

6.2. ZooKeeper 설정 파일 수정

각 서버에서 ZooKeeper의 설정 파일(zoo.cfg)을 수정하여 클러스터 구성을 설정합니다. 설정 파일은 conf 디렉토리 안에 있습니다.

cd /opt/zookeeper/conf
더보기

---

cp zoo_sample.cfg zoo.cfg
cat zoo_sample.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

---

mkdir -p /opt/zookeeper/data
chmod -R 755 /opt/zookeeper/data

zoo.cfg 파일을 열고 다음 내용을 수정합니다.

sudo vim /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPortAddress=0.0.0.0
clientPort=2181

maxClientCnxns=50

minSessionTimeout=2000
maxSessionTimeout=10000

4lw.commands.whitelist=stat, ruok, conf, isro, srvr

server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888

또는

cat <<EOF | sudo tee /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPortAddress=0.0.0.0
clientPort=2181

maxClientCnxns=50

minSessionTimeout=2000
maxSessionTimeout=10000

4lw.commands.whitelist=stat, ruok, conf, isro, srvr

server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
EOF
  • tickTime : ZooKeeper의 기본 단위 시간(밀리초)입니다.
  • initLimit : 팔로워가 리더를 인식할 때까지의 초기화 제한 시간.
  • syncLimit : 팔로워가 리더와 동기화할 때까지의 시간 제한.
  • dataDir : ZooKeeper 데이터가 저장될 디렉토리.
  • clientPort : 클라이언트가 ZooKeeper에 접속할 포트.
  • maxClientCnxns : 클라이언트당 최대 연결 수.
  • server.X : 각 ZooKeeper 서버의 호스트 이름과 포트 설정.
    • 첫 번째 포트(2888)는 ZooKeeper가 다른 ZooKeeper 서버와 통신하는 데 사용되는 포트.
    • 두 번째 포트(3888)는 리더 선출에 사용되는 포트.

6.3. 각 ZooKeeper 서버에 서버 ID 설정

각 ZooKeeper 서버의 dataDir에 myid 파일을 만들어 각 노드에 고유한 ID를 할당합니다.

 

서버 1에서는 1

echo "1" > /opt/zookeeper/data/myid

서버 2에서는 2

echo "2" > /opt/zookeeper/data/myid

서버 3에서는 3

echo "3" > /opt/zookeeper/data/myid

6.4. ZooKeeper 시작

모든 서버에서 ZooKeeper를 시작합니다.

/opt/zookeeper/bin/zkServer.sh start
$ /opt/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

ZooKeeper Stop

/opt/zookeeper/bin/zkServer.sh stop

클러스터가 정상적으로 작동하는지 확인

/opt/zookeeper/bin/zkServer.sh status
$ /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
$ /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

각 서버에서 클러스터의 리더와 팔로워 상태를 확인할 수 있습니다.

6.5 ZooKeeper 명령어로 확인

정상적으로 작동하는지 확인

  • ZooKeeper 클라이언트를 실행하고 stat 명령을 사용하면 서버의 상태와 버전을 확인할 수 있습니다.
echo stat | nc <zookeeper-host> <zookeeper-port>
echo stat | nc localhost 2181
$ echo stat | nc localhost 2181
Zookeeper version: 3.8.4-9316c2a7a97e1666d8f4593f34dd6fc36ecc436c, built on 2024-02-12 22:16 UTC
Clients:
 /127.0.0.1:52272[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0.0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000002
Mode: follower
Node count: 5
$ echo stat | nc localhost 2181
Zookeeper version: 3.8.4-9316c2a7a97e1666d8f4593f34dd6fc36ecc436c, built on 2024-02-12 22:16 UTC
Clients:
 /127.0.0.1:52272[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0.0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000002
Mode: follower
Node count: 5

6.6. ZooKeeper 서비스 등록

더보기

---

ZooKeeper 서비스 파일 생성

sudo vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper Service
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Environment="JAVA_HOME=/opt/java"
WorkingDirectory=/opt/zookeeper
Type=forking
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
Restart=always
RestartSec=5s

# 사용자와 그룹 설정 (필요시)
# User=zookeeper
# Group=zookeeper

# 로그 파일 설정 (필요시)
# StandardOutput=syslog
# StandardError=syslog

[Install]
WantedBy=multi-user.target

서비스 파일 재로드 및 시작

sudo systemctl daemon-reload
sudo systemctl start zookeeper
sudo systemctl enable zookeeper

서비스 상태 확인

sudo systemctl status zookeeper

---

728x90

7. Kafka 클러스터 구성

kafka_releases

7.1. Kafka 설치

각 서버에서 Kafka를 다운로드하고 설치합니다.

cd /usr/local/src
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
mkdir -p /opt/kafka
tar xf kafka_2.13-3.8.0.tgz -C /opt/kafka --strip-components=1

7.2. Kafka 설정 파일 수정

mkdir -p /opt/kafka/logs
cd /opt/kafka/config
vim server.properties
더보기

---

cat server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

---

# 각 브로커에 고유한 ID 설정 (예: 0, 1, 2)
broker.id=0
log.dirs=/var/lib/kafka-logs
# ZooKeeper 클러스터 정보
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
# 파티션 수 설정
num.partitions=3
# 오프셋 토픽의 복제 계수 설정 (브로커 수만큼 설정)
offsets.topic.replication.factor=3
# 기본 복제 계수
default.replication.factor=3
  • broker.id : 각 Kafka 브로커에 고유한 ID를 설정합니다.
  • zookeeper.connect : ZooKeeper 클러스터의 호스트 및 포트를 지정합니다.

(또는)

cat <<EOF | sudo tee /opt/kafka/config/server.properties
# 각 브로커에 고유한 ID 설정 (예: 0, 1, 2)
broker.id=0

# 네트워크 및 IO 설정
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

num.partitions=3
num.recovery.threads.per.data.dir=1
# 오프셋 토픽의 복제 계수 설정(브로커 수만큼 설정)
offsets.topic.replication.factor=3
# 기본 복제 계수
default.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# 로그 설정
log.dirs=/opt/kafka/logs
# 로그 보관 설정
log.retention.hours=168
log.retention.check.interval.ms=300000

# Zookeeper 클러스터 정보
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

node2, node3 server.propertie

더보기

---

cat <<EOF | sudo tee /opt/kafka/config/server.properties
# 각 브로커에 고유한 ID 설정 (예: 0, 1, 2)
broker.id=1

# 네트워크 및 IO 설정
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

num.partitions=3
num.recovery.threads.per.data.dir=1
# 오프셋 토픽의 복제 계수 설정(브로커 수만큼 설정)
offsets.topic.replication.factor=3
# 기본 복제 계수
default.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# 로그 설정
log.dirs=/opt/kafka/logs
# 로그 보관 설정
log.retention.hours=168
log.retention.check.interval.ms=300000

# Zookeeper 클러스터 정보
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
cat <<EOF | sudo tee /opt/kafka/config/server.properties
# 각 브로커에 고유한 ID 설정 (예: 0, 1, 2)
broker.id=2

# 네트워크 및 IO 설정
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

num.partitions=3
num.recovery.threads.per.data.dir=1
# 오프셋 토픽의 복제 계수 설정(브로커 수만큼 설정)
offsets.topic.replication.factor=3
# 기본 복제 계수
default.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# 로그 설정
log.dirs=/opt/kafka/logs
# 로그 보관 설정
log.retention.hours=168
log.retention.check.interval.ms=300000

# Zookeeper 클러스터 정보
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

---

7.3. Kafka 시작

모든 서버에서 Kafka 브로커를 시작합니다.

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

Kafka가 정상적으로 시작되었는지 확인하려면 로그 파일을 확인하거나 kafka-broker-api-versions.sh 스크립트를 실행합니다.

 

Kafka 중지

/opt/kafka/bin/kafka-server-stop.sh

7.4 Kafka 서비스 상태 확인

Kafka가 실행 중인 서버에서 Kafka 브로커가 정상적으로 작동 중인지 확인하려면 kafka-broker-api-versions.sh 명령을 사용해 브로커 버전을 조회합니다.

/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

7.5 Kafka 서비스 등록

더보기

---

Kafka 서비스 파일 생성

sudo vim /etc/systemd/system/kafka.service
Unit]
Description=Apache Kafka Service
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Environment="JAVA_HOME=/opt/java"
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

# 사용자와 그룹 설정 (보안을 위해 추가)
#User=kafka
#Group=kafka

# 로그 파일 설정 (필요시 활성화)
#StandardOutput=syslog
#StandardError=syslog

[Install]
WantedBy=multi-user.target

서비스 파일 재로드 및 시작

sudo systemctl daemon-reload
sudo systemctl start kafka
sudo systemctl enable kafka

서비스 상태 확인

sudo systemctl status kafka

---

8. Kafka 클러스터 상태 확인

Kafka 클러스터가 올바르게 동작하는지 확인하려면 다음 명령을 사용할 수 있습니다.

 

8.1. 토픽 생성 및 확인

Kafka 클러스터에서 새로운 토픽을 생성하여 브로커들이 올바르게 통신하고 있는지 확인합니다.

/opt/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server node1:9092 --replication-factor 3 --partitions 3

8.2. 토픽 목록 조회

토픽이 정상적으로 생성되었는지 확인합니다.

/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server node1:9092

8.3. 메시지 프로듀서 및 컨슈머 테스트

프로듀서를 실행하여 메시지를 전송합니다.

/opt/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server node1:9092

컨슈머를 실행하여 메시지를 수신합니다.

/opt/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server node1:9092
/opt/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server node2:9092
/opt/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server node3:9092

 

Kafka와 ZooKeeper 클러스터가 완성되었습니다.

 

참고URL

- ORACLE : Java downloads(java17)

- Apache Software Foundation projects : kafka

- Apache Software Foundation projects : zookeeper

 

728x90