한줄 소개
분산 이벤트 스트리밍 플랫폼
Broker
카프카 애플리케이션이 설치된 서버 또는 노드
특징
- 높은 처리량
- 빠른 응답 속도
- 안정성
Docker Compose로 Kafka 실행
ZooKeeper
카프카의 메타데이터 관리, 브로커의 정상상태(Health) 점검(Check) 담당
ZooKeeper 실행
https://hub.docker.com/r/confluentinc/cp-zookeeper
mkdir conf
vi compose.yml
services:
some-zk-1:
container_name: some-zk-1
hostname: zk-1
image: confluentinc/cp-zookeeper:7.6.1
network_mode: host
#volumes:
# - ./conf/zoo-1.conf:/
environment:
- ZOOKEEPER_SERVER_ID=1
- ZOOKEEPER_CLIENT_PORT=12181
- ZOOKEEPER_SERVERS=localhost:12888:13888;localhost:22888:23888;localhost:32888:33888
some-zk-2:
container_name: some-zk-2
hostname: zk-2
image: confluentinc/cp-zookeeper:7.6.1
network_mode: host
#volumes:
# - ./conf/zoo-1.conf:/
environment:
- ZOOKEEPER_SERVER_ID=2
- ZOOKEEPER_CLIENT_PORT=22181
- ZOOKEEPER_SERVERS=localhost:12888:13888;localhost:22888:23888;localhost:32888:33888
some-zk-3:
container_name: some-zk-3
hostname: zk-3
image: confluentinc/cp-zookeeper:7.6.1
network_mode: host
#volumes:
# - ./conf/zoo-1.conf:/
environment:
- ZOOKEEPER_SERVER_ID=3
- ZOOKEEPER_CLIENT_PORT=32181
- ZOOKEEPER_SERVERS=localhost:12888:13888;localhost:22888:23888;localhost:32888:33888
- environment
- ZOOKEEPER_SERVERS
- hostname:clientport:electionport;..
- clientport: 앙상블(클러스터) 내 노드끼리 연결할 때 사용
- electionport: 앙상블 내 노드 중 대표 선출 시 사용
- hostname:clientport:electionport;..
- ZOOKEEPER_SERVERS
docker compose up
컨테이너 내부를 살펴보자
docker exec -it some-zk-1 /bin/bash
ls -l /etc/zookeeper/secrets
total 0
ls -l /etc/kafka
total 72
-rw-rw-r-- 1 appuser root 906 Apr 2 09:31 connect-console-sink.properties
-rw-rw-r-- 1 appuser root 909 Apr 2 09:31 connect-console-source.properties
-rw-rw-r-- 1 appuser root 5489 Apr 2 09:31 connect-distributed.properties
-rw-rw-r-- 1 appuser root 883 Apr 2 09:31 connect-file-sink.properties
-rw-rw-r-- 1 appuser root 881 Apr 2 09:31 connect-file-source.properties
-rw-rw-r-- 1 appuser root 2063 Apr 2 09:31 connect-log4j.properties
-rw-rw-r-- 1 appuser root 2540 Apr 2 09:31 connect-mirror-maker.properties
-rw-rw-r-- 1 appuser root 2276 Apr 2 09:31 connect-standalone.properties
-rw-rw-r-- 1 appuser root 1221 Apr 2 09:31 consumer.properties
drwxrwxr-x 2 appuser root 4096 Apr 3 11:34 kraft
-rw-rw-r-- 1 appuser root 213 Jun 18 22:09 log4j.properties
-rw-rw-r-- 1 appuser root 2065 Apr 2 09:31 producer.properties
-rw-rw-r-- 1 appuser root 7535 Apr 2 09:31 server.properties
-rw-rw-r-- 1 appuser root 251 Jun 18 22:09 tools-log4j.properties
-rw-rw-r-- 1 appuser root 1169 Apr 2 09:31 trogdor.conf
-rw-rw-r-- 1 appuser root 136 Jun 18 22:09 zookeeper.properties
cat /etc/kafka/zookeeper.properties
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/log
clientPort=12181
initLimit=10
syncLimit=5
server.1=localhost:12888:13888
server.2=localhost:22888:23888
server.3=localhost:32888:33888
ls -l /var/lib/zookeeper
total 8
drwxrwxr-x 3 appuser root 4096 Jun 18 22:09 data
drwxrwxr-x 3 appuser root 4096 Jun 18 22:09 log
ls -l /var/lib/zookeeper/data
total 8
-rw-r--r-- 1 appuser appuser 1 Jun 18 22:09 myid
drwxr-xr-x 2 appuser appuser 4096 Jun 18 22:09 version-2
cat /var/lib/zookeeper/data/myid
1
ls -l /var/lib/zookeeper/data/version-2/
total 12
-rw-r--r-- 1 appuser appuser 1 Jun 24 13:50 acceptedEpoch
-rw-r--r-- 1 appuser appuser 1 Jun 24 13:50 currentEpoch
-rw-r--r-- 1 appuser appuser 595 Jun 24 13:50 snapshot.0
ls -l /var/lib/zookeeper/log/version-2/
total 0
ls -l /usr/bin/zookeeper*
-rwxr-xr-x 1 root root 864 Apr 2 09:31 /usr/bin/zookeeper-security-migration
-rwxr-xr-x 1 root root 1884 Apr 2 09:31 /usr/bin/zookeeper-server-start
-rwxr-xr-x 1 root root 1366 Apr 2 09:31 /usr/bin/zookeeper-server-stop
-rwxr-xr-x 1 root root 1016 Apr 2 09:31 /usr/bin/zookeeper-shell
주키퍼 CLI를 실행해보자
zookeeper-shell localhost
Connecting to localhost
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
존재하지 않는 아무 명령어를 입력 후 엔터를 치면 사용법이 출력된다
(예를 들어 a 타이핑 후 엔터)
a
ZooKeeper -server host:port [-zk-tls-config-file <file>] cmd args
addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path [-b batch size]
delquota [-n|-b|-N|-B] path
get [-s] [-w] path
getAcl [-s] path
getAllChildrenNumber path
getEphemerals path
history
listquota path
ls [-s] [-w] [-R] path
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b|-N|-B val path
stat [-w] path
sync path
version
whoami
Command not found: Command not found a
version
ZooKeeper CLI version: 3.8.4-9316c2a7a97e1666d8f4593f34dd6fc36ecc436c, built on 2024-02-12 22:16 UTC
quit
주키퍼는 AdminServer를 내장하고 있다
웹 브라우저에서 아래 URI로 이동하면
http://localhost:8080/commands
아래와 같은 링크 목록이 나온다

Kafka 실행
https://hub.docker.com/r/confluentinc/cp-kafka
mkdir data data/kafka-1 data/kafka-2 data/kafka-3
vi compose.yml
..
some-kafka-1:
container_name: some-kafka-1
hostname: kafka-1
image: confluentinc/cp-kafka:7.6.1
network_mode: host
volumes:
- ./data/kafka-1:/var/lib/kafka/data
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=localhost:12181,localhost:22181,localhost:32181
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:19092
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT
some-kafka-2:
container_name: some-kafka-2
hostname: kafka-2
image: confluentinc/cp-kafka:7.6.1
network_mode: host
volumes:
- ./data/kafka-2:/var/lib/kafka/data
environment:
- KAFKA_BROKER_ID=2
- KAFKA_ZOOKEEPER_CONNECT=localhost:12181,localhost:22181,localhost:32181
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:29092
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT
some-kafka-3:
container_name: some-kafka-3
hostname: kafka-3
image: confluentinc/cp-kafka:7.6.1
network_mode: host
volumes:
- ./data/kafka-3:/var/lib/kafka/data
environment:
- KAFKA_BROKER_ID=3
- KAFKA_ZOOKEEPER_CONNECT=localhost:12181,localhost:22181,localhost:32181
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:39092
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT
컨테이너 내부를 살펴보자
docker exec -it some-kafka-1 /bin/bash
ls -l /etc/kafka
total 80
-rw-rw-r-- 1 appuser root 906 Apr 2 09:31 connect-console-sink.properties
-rw-rw-r-- 1 appuser root 909 Apr 2 09:31 connect-console-source.properties
-rw-rw-r-- 1 appuser root 5489 Apr 2 09:31 connect-distributed.properties
-rw-rw-r-- 1 appuser root 883 Apr 2 09:31 connect-file-sink.properties
-rw-rw-r-- 1 appuser root 881 Apr 2 09:31 connect-file-source.properties
-rw-rw-r-- 1 appuser root 2063 Apr 2 09:31 connect-log4j.properties
-rw-rw-r-- 1 appuser root 2540 Apr 2 09:31 connect-mirror-maker.properties
-rw-rw-r-- 1 appuser root 2276 Apr 2 09:31 connect-standalone.properties
-rw-rw-r-- 1 appuser root 1221 Apr 2 09:31 consumer.properties
-rw-r--r-- 1 appuser appuser 170 Jun 21 04:29 kafka.properties
drwxrwxr-x 2 appuser root 4096 Apr 3 11:36 kraft
-rw-rw-r-- 1 appuser root 541 Jun 21 04:29 log4j.properties
-rw-rw-r-- 1 appuser root 2065 Apr 2 09:31 producer.properties
drwxrwxr-x 2 appuser root 4096 Apr 3 11:36 secrets
-rw-rw-r-- 1 appuser root 7535 Apr 2 09:31 server.properties
-rw-rw-r-- 1 appuser root 251 Jun 21 04:29 tools-log4j.properties
-rw-rw-r-- 1 appuser root 1169 Apr 2 09:31 trogdor.conf
-rw-rw-r-- 1 appuser root 1209 Apr 2 09:31 zookeeper.properties
cat /etc/kafka/zookeeper.properties
..
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
cat /etc/kafka/log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka=INFO
log4j.logger.kafka.network.RequestChannel$=WARN
log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG
log4j.logger.kafka.request.logger=WARN
log4j.logger.kafka.controller=TRACE
log4j.logger.kafka.log.LogCleaner=INFO
log4j.logger.state.change.logger=TRACE
log4j.logger.kafka.authorizer.logger=WARN
ls -l /var/lib/zookeeper
total 0
ls -l /var/lib/kafka/data
total 4
-rw-r--r-- 1 appuser appuser 0 Jun 21 04:59 cleaner-offset-checkpoint
-rw-r--r-- 1 appuser appuser 0 Jun 21 04:59 log-start-offset-checkpoint
-rw-r--r-- 1 appuser appuser 88 Jun 21 04:59 meta.properties
-rw-r--r-- 1 appuser appuser 0 Jun 21 04:59 recovery-point-offset-checkpoint
-rw-r--r-- 1 appuser appuser 0 Jun 21 04:59 replication-offset-checkpoint
cat /var/lib/kafka/data/meta.properties
#
#Mon Jun 24 22:42:53 UTC 2024
broker.id=1
version=0
cluster.id=KRwusXZcToGLo6rsAJ-cTA
ls -l /usr/bin/zookeeper*
-rwxr-xr-x 1 root root 864 Apr 2 09:31 /usr/bin/zookeeper-security-migration
-rwxr-xr-x 1 root root 1884 Apr 2 09:31 /usr/bin/zookeeper-server-start
-rwxr-xr-x 1 root root 1366 Apr 2 09:31 /usr/bin/zookeeper-server-stop
-rwxr-xr-x 1 root root 1016 Apr 2 09:31 /usr/bin/zookeeper-shell
카프카 컨테이너 안에도 주키퍼 쉘이 있다
ls -l /usr/bin/kafka*
-rwxr-xr-x 1 root root 858 Apr 2 09:31 /usr/bin/kafka-acls
-rwxr-xr-x 1 root root 870 Apr 2 09:31 /usr/bin/kafka-broker-api-versions
-rwxr-xr-x 1 root root 868 Apr 2 09:31 /usr/bin/kafka-cluster
-rwxr-xr-x 1 root root 861 Apr 2 09:31 /usr/bin/kafka-configs
-rwxr-xr-x 1 root root 942 Apr 2 09:31 /usr/bin/kafka-console-consumer
-rwxr-xr-x 1 root root 941 Apr 2 09:31 /usr/bin/kafka-console-producer
-rwxr-xr-x 1 root root 868 Apr 2 09:31 /usr/bin/kafka-consumer-groups
-rwxr-xr-x 1 root root 956 Apr 2 09:31 /usr/bin/kafka-consumer-perf-test
-rwxr-xr-x 1 root root 879 Apr 2 09:31 /usr/bin/kafka-delegation-tokens
-rwxr-xr-x 1 root root 877 Apr 2 09:31 /usr/bin/kafka-delete-records
-rwxr-xr-x 1 root root 863 Apr 2 09:31 /usr/bin/kafka-dump-log
-rwxr-xr-x 1 root root 874 Apr 2 09:31 /usr/bin/kafka-e2e-latency
-rwxr-xr-x 1 root root 871 Apr 2 09:31 /usr/bin/kafka-features
-rwxr-xr-x 1 root root 862 Apr 2 09:31 /usr/bin/kafka-get-offsets
-rwxr-xr-x 1 root root 864 Apr 2 09:31 /usr/bin/kafka-jmx
-rwxr-xr-x 1 root root 867 Apr 2 09:31 /usr/bin/kafka-leader-election
-rwxr-xr-x 1 root root 871 Apr 2 09:31 /usr/bin/kafka-log-dirs
-rwxr-xr-x 1 root root 878 Apr 2 09:31 /usr/bin/kafka-metadata-quorum
-rwxr-xr-x 1 root root 870 Apr 2 09:31 /usr/bin/kafka-metadata-shell
-rwxr-xr-x 1 root root 859 Apr 2 09:31 /usr/bin/kafka-mirror-maker
-rwxr-xr-x 1 root root 815 Apr 2 09:31 /usr/bin/kafka-preferred-replica-election
-rwxr-xr-x 1 root root 956 Apr 2 09:31 /usr/bin/kafka-producer-perf-test
-rwxr-xr-x 1 root root 871 Apr 2 09:31 /usr/bin/kafka-reassign-partitions
-rwxr-xr-x 1 root root 882 Apr 2 09:31 /usr/bin/kafka-replica-verification
-rwxr-xr-x 1 root root 11522 Apr 2 09:31 /usr/bin/kafka-run-class
-rwxr-xr-x 1 root root 1867 Apr 2 09:31 /usr/bin/kafka-server-start
-rwxr-xr-x 1 root root 1658 Apr 2 09:31 /usr/bin/kafka-server-stop
-rwxr-xr-x 1 root root 857 Apr 2 09:31 /usr/bin/kafka-storage
-rwxr-xr-x 1 root root 953 Apr 2 09:31 /usr/bin/kafka-streams-application-reset
-rwxr-xr-x 1 root root 860 Apr 2 09:31 /usr/bin/kafka-topics
-rwxr-xr-x 1 root root 876 Apr 2 09:31 /usr/bin/kafka-transactions
-rwxr-xr-x 1 root root 955 Apr 2 09:31 /usr/bin/kafka-verifiable-consumer
-rwxr-xr-x 1 root root 955 Apr 2 09:31 /usr/bin/kafka-verifiable-producer
Kafka Topic 생성 후 Producer와 Consumer 사용해보기
Topic
카프카는 메시지 Feed들을 토픽으로 구분한다
토픽명은 카프카 내에서 고유하다
Partition
병렬 처리 및 고성능을 얻기 위해 하나의 토픽을 여러 개로 나눈 것
Producer
메시지를 생산(카프카로 메시지를 보냄)하는 클라이언트
Consumer
메시지를 소비(카프카에서 메시지를 꺼내감)하는 클라이언트
Message, Record
생산자가 브로커로 전송하거나 소비자가 읽어가는 데이터 조각
토픽 생성
kafka-topics --bootstrap-server localhost:19092 --create --topic some-topic-1
Created topic some-topic-1.
도커 컴포즈 로그에 아래와 같이 출력된다
some-kafka-1 | [2024-06-24 22:49:51,887] INFO Creating topic some-topic-1 with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
some-kafka-1 | [2024-06-24 22:49:51,910] INFO [Controller id=1] New topics: [Set(some-topic-1)], deleted topics: [HashSet()], new partition replica assignment [Set(TopicIdReplicaAssignment(some-topic-1,Some(6vs-e0EzQGmGQQU2J3j5fg),Map(some-topic-1-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=))))] (kafka.controller.KafkaController)
some-kafka-1 | [2024-06-24 22:49:51,910] INFO [Controller id=1] New partition creation callback for some-topic-1-0 (kafka.controller.KafkaController)
some-kafka-1 | [2024-06-24 22:49:51,911] INFO [Controller id=1 epoch=1] Changed partition some-topic-1-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,911] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,912] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition some-topic-1-0 from NonExistentReplica to NewReplica (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,912] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,934] INFO [Controller id=1 epoch=1] Changed partition some-topic-1-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isrWithBrokerEpoch=List(BrokerState(brokerId=1, brokerEpoch=-1)), leaderRecoveryState=RECOVERED, partitionEpoch=0) (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,934] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='some-topic-1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], partitionEpoch=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true, leaderRecoveryState=0) to broker 1 for partition some-topic-1-0 (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,934] INFO [Controller id=1 epoch=1] Sending LeaderAndIsr request to broker 1 with 1 become-leader and 0 become-follower partitions (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,936] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet(1, 2, 3) for 1 partitions (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,936] INFO [Broker id=1] Handling LeaderAndIsr request correlationId 5 from controller 1 for 1 partitions (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,936] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='some-topic-1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], partitionEpoch=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true, leaderRecoveryState=0) correlation id 5 from controller 1 epoch 1 (state.change.logger)
some-kafka-2 | [2024-06-24 22:49:51,939] TRACE [Broker id=2] Cached leader info UpdateMetadataPartitionState(topicName='some-topic-1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition some-topic-1-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 4 (state.change.logger)
some-kafka-2 | [2024-06-24 22:49:51,940] INFO [Broker id=2] Add 1 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 4 (state.change.logger)
some-kafka-3 | [2024-06-24 22:49:51,939] TRACE [Broker id=3] Cached leader info UpdateMetadataPartitionState(topicName='some-topic-1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition some-topic-1-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 3 (state.change.logger)
some-kafka-3 | [2024-06-24 22:49:51,940] INFO [Broker id=3] Add 1 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 3 (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,941] TRACE [Controller id=1 epoch=1] Received response UpdateMetadataResponseData(errorCode=0) for request UPDATE_METADATA with correlation id 3 sent to broker localhost:39092 (id: 3 rack: null) (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,941] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition some-topic-1-0 from NewReplica to OnlineReplica (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,941] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,942] TRACE [Controller id=1 epoch=1] Received response UpdateMetadataResponseData(errorCode=0) for request UPDATE_METADATA with correlation id 4 sent to broker localhost:29092 (id: 2 rack: null) (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,947] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 5 from controller 1 epoch 1 starting the become-leader transition for partition some-topic-1-0 (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,949] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(some-topic-1-0) (kafka.server.ReplicaFetcherManager)
some-kafka-1 | [2024-06-24 22:49:51,949] INFO [Broker id=1] Stopped fetchers as part of LeaderAndIsr request correlationId 5 from controller 1 epoch 1 as part of the become-leader transition for 1 partitions (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,976] INFO [LogLoader partition=some-topic-1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
some-kafka-1 | [2024-06-24 22:49:51,978] INFO Created log for partition some-topic-1-0 in /var/lib/kafka/data/some-topic-1-0 with properties {} (kafka.log.LogManager)
some-kafka-1 | [2024-06-24 22:49:51,979] INFO [Partition some-topic-1-0 broker=1] No checkpointed highwatermark is found for partition some-topic-1-0 (kafka.cluster.Partition)
some-kafka-1 | [2024-06-24 22:49:51,980] INFO [Partition some-topic-1-0 broker=1] Log loaded for partition some-topic-1-0 with initial high watermark 0 (kafka.cluster.Partition)
some-kafka-1 | [2024-06-24 22:49:51,981] INFO [Broker id=1] Leader some-topic-1-0 with topic id Some(6vs-e0EzQGmGQQU2J3j5fg) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [1], adding replicas [] and removing replicas [] . Previous leader None and previous leader epoch was -1. (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,986] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 5 from controller 1 epoch 1 for the become-leader transition for partition some-topic-1-0 (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,987] INFO [Broker id=1] Finished LeaderAndIsr request in 51ms correlationId 5 from controller 1 for 1 partitions (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,988] TRACE [Controller id=1 epoch=1] Received response LeaderAndIsrResponseData(errorCode=0, partitionErrors=[], topics=[LeaderAndIsrTopicError(topicId=6vs-e0EzQGmGQQU2J3j5fg, partitionErrors=[LeaderAndIsrPartitionError(topicName='', partitionIndex=0, errorCode=0)])]) for request LEADER_AND_ISR with correlation id 5 sent to broker localhost:19092 (id: 1 rack: null) (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,988] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='some-topic-1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition some-topic-1-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 6 (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,988] INFO [Broker id=1] Add 1 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 6 (state.change.logger)
some-kafka-1 | [2024-06-24 22:49:51,990] TRACE [Controller id=1 epoch=1] Received response UpdateMetadataResponseData(errorCode=0) for request UPDATE_METADATA with correlation id 6 sent to broker localhost:19092 (id: 1 rack: null) (state.change.logger)
some-kafka-1 브로커가 some-topic-1-0 파티션에 대한 로그를 생성했다는 로그가 보인다
docker exec -it some-kafka-1 /bin/bash
ls -l /var/lib/kafka/data/some-topic-1-0
total 8
-rw-r--r-- 1 appuser appuser 10485760 Jun 21 05:05 00000000000000000000.index
-rw-r--r-- 1 appuser appuser 0 Jun 21 05:05 00000000000000000000.log
-rw-r--r-- 1 appuser appuser 10485756 Jun 21 05:05 00000000000000000000.timeindex
-rw-r--r-- 1 appuser appuser 8 Jun 21 05:05 leader-epoch-checkpoint
-rw-r--r-- 1 appuser appuser 43 Jun 21 05:05 partition.metadata
소비자 사용해보기
컨테이너 내부에서 실행
kafka-console-consumer --bootstrap-server localhost:19092 --topic some-topic-1
아직 생산된 메시지가 없어 아무런 출력 없이 대기한다
생산자 사용해보기
다른 컨테이너 내부에서 실행
kafka-console-producer --bootstrap-server localhost:19092 --topic some-topic-1
>Hello, world!
입력 후 엔터
소비자 실행 컨테이너에 메시지가 출력된다
Hello, world!
참고
- Listeners
- kafka.common.InconsistentClusterIdException: The Cluster ID 7ZMQpkQITnCOZpftlluSzg doesn't match stored clusterId Some(TSlUodnFRdivvc1lrtQBwg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
- Docker Compose
'Event streaming > Kafka' 카테고리의 다른 글
| 2. Kafka > 기초 개념 (0) | 2024.06.21 |
|---|