Kafka Deep Dive (1) — 브로커는 어디서부터 시작되는가
Published:
작년 여름에 10주짜리 Kafka Shallow Dive 시리즈를 끝냈다. 그건 “Kafka가 뭐고 왜 쓰는지”였다. 한참을 쓰다 보니 이제는 안에서 어떻게 돌아가는지가 궁금해졌고, 그래서 이번엔 트렁크 소스를 직접 따라가는 deep dive를 시작한다.
첫 편은 entry point. ./bin/kafka-server-start.sh를 치면 그 다음에 정확히 뭐가 일어나는가. 셸 → JVM → 서버 부팅까지 한 줄씩 따라간다.
이 글의 시작점 README의 Running a Kafka broker는 3단계다 —
kafka-storage.sh random-uuid→kafka-storage.sh format→kafka-server-start.sh. 이 글은 마지막 줄부터, 즉 이미 포맷된 클러스터를 기동하는 시점부터 따라간다. 처음 띄우는 법 자체는 공식 quickstart 참고.
큰 그림 — 4단계 체인
bin/kafka-server-start.sh ① 셸 진입점 (44 lines)
│ exec kafka-run-class.sh ... kafka.Kafka "$@"
▼
bin/kafka-run-class.sh ② 공용 자바 런처 (354 lines)
│ exec java $KAFKA_OPTS -cp ... kafka.Kafka server.properties
▼
core/.../kafka/Kafka.scala ③ JVM 진입점
│ object Kafka { def main(args) }
▼
core/.../KafkaRaftServer.scala ④ 역할 분기 (broker / controller / combined)
│ startup() → BrokerServer.startup() / ControllerServer.startup()
▼
실제 일이 일어나는 곳
① kafka-server-start.sh — 의외로 얇다
44줄짜리 스크립트의 마지막 줄은 이렇다.
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec로 호출하는 게 포인트다. 새 프로세스를 띄우지 않고 현재 셸을 JVM으로 교체한다. 그래서:
- PID가 하나로 유지된다 (
systemd,docker, k8s에서 깔끔) SIGTERM같은 시그널이 셸을 거치지 않고 JVM에 바로 전달된다 →Kafka.scala의Exit.addShutdownHook("kafka-shutdown-hook", ...)가 정상 동작- 만약
bash로 다시 한 번 감쌌다면 시그널 전달과 종료 코드가 꼬여서 graceful shutdown이 깨질 수 있다
스크립트가 직접 챙기는 건 셋뿐이다.
LOG_DIR디폴트 설정KAFKA_HEAP_OPTS디폴트 (-Xmx1G -Xms1G)log4j2.yaml설정 파일 경로
브로커 역할에 한정된 옵션만 남기고, 나머지는 다 위임한다. 그 위임의 대상이 kafka-run-class.sh.
② kafka-run-class.sh — 왜 한 단을 더 거치나
처음 봤을 땐 “왜 굳이 한 번 더 쉘을 거치지?” 싶다. bin/에 가서 한 줄 세보면 답이 나온다.
$ ls bin/*.sh | wc -l
43
$ grep -l "kafka-run-class.sh" bin/*.sh | wc -l
41
43개의 셸 스크립트 중 41개가 kafka-run-class.sh를 호출한다. 즉 이건 단순 wrapper가 아니라 Kafka 도구 전체가 공유하는 자바 런처다.
kafka-server-start.sh → kafka.Kafka (브로커)
kafka-topics.sh → org.apache.kafka.tools.TopicCommand
kafka-console-producer.sh→ kafka.tools.ConsoleProducer
kafka-storage.sh → org.apache.kafka.tools.StorageTool
kafka-acls.sh → org.apache.kafka.tools.AclCommand
...
각 도구는 자기가 띄울 메인 클래스 이름만 다르게 넘긴다. kafka-run-class.sh가 354줄에 걸쳐 처리하는 건 도구 전체에 공통된 다음 항목들이다.
| 책임 | 왜 거기 있어야 하나 |
|---|---|
| CLASSPATH 조립 | 소스 트리(*/build/libs/*.jar)와 릴리스 tarball(libs/*.jar)을 자동 감지해야 한다. dev/prod에서 따로 처리하지 않게 한 곳에 모음 |
Scala 버전 분기 (SCALA_VERSION, SCALA_BINARY_VERSION) | jar 이름에 Scala 버전이 박혀 있어서 (kafka_2.13-...jar) 빌드 시점에 맞춰 골라줘야 함 |
| JDK 버전 감지 | GC 로그 옵션 형태가 JDK 8/11/17에서 다르고, 모듈 시스템(--add-opens) 옵션도 다름 |
KAFKA_HEAP_OPTS, KAFKA_JVM_PERFORMANCE_OPTS, KAFKA_GC_LOG_OPTS, KAFKA_OPTS 처리 | 운영자가 스크립트 파일을 안 건드리고 환경변수만 export해서 튜닝하게 함. 도커/쿠버 환경에서 결정적 |
JMX (JMX_PORT, JMX_OPTS) | 모든 도구가 JMX를 켤 수 있어야 하므로 공용 |
Windows .bat 별도 분기 | OS 차이를 한 곳에서만 처리 |
이걸 43개 스크립트마다 복붙했다면 — 그리고 JDK 25 지원을 추가할 때마다 43군데를 고쳐야 했다면 — 운영팀이 무릎을 꿇었을 거다. 분리는 DRY + 관심사 분리 + 외부 커스터마이징 셋을 동시에 푼다.
같은 패턴이 HBase, Hadoop, ZooKeeper에도 있다. JVM 기반 분산 시스템의 관용구다.
두 줄 요약
kafka-server-start.sh— “브로커를 띄운다” 라는 의도만 담은 얇은 wrapperkafka-run-class.sh— 어떤 자바 클래스든 띄울 수 있는 제네릭 런처
bin/에 깔린 다른 41개 도구가 정확히 뭐고 언제 쓰는지는 본문 흐름과 결이 달라서 부록으로 뺐다.
③ Kafka.scala — JVM이 처음 만나는 진짜 main
// core/src/main/scala/kafka/Kafka.scala
object Kafka extends Logging {
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val server = buildServer(serverProps)
// 시그널 핸들러 등록
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
// 셧다운 훅 — SIGTERM 받으면 server.shutdown() 호출
Exit.addShutdownHook("kafka-shutdown-hook", () => {
try server.shutdown()
catch { case _: Throwable => Exit.halt(1) }
})
server.startup()
server.awaitShutdown() // 블로킹 — 여기서 평생 대기
} catch {
case e: Throwable => Exit.exit(1)
}
Exit.exit(0)
}
}
Scala object의 main은 JVM 입장에서 public static void main 으로 노출된다. 그래서 셸이 java ... kafka.Kafka server.properties로 호출할 수 있다.
main이 하는 일은 4가지뿐이다.
- CLI 파싱 (
server.properties경로 +--override key=value처리) buildServer(props)→new KafkaRaftServer(...)생성- 시그널 핸들러 / 셧다운 훅 등록
server.startup()호출 후server.awaitShutdown()에서 영원히 블록
진짜 일은 4단계의 KafkaRaftServer로 넘어간다.
④ KafkaRaftServer — process.roles 분기
KRaft 시대의 Kafka는 한 JVM이 세 모드 중 하나로 뜬다.
process.roles=broker → BrokerServer 만
process.roles=controller → ControllerServer 만
process.roles=broker,controller → 둘 다 (combined 모드, 주로 dev/소규모)
KafkaRaftServer는 이 분기를 담당한다. 핵심만 발췌하면:
private val sharedServer = new SharedServer(config, ...)
private val broker: Option[BrokerServer] =
if (config.processRoles.contains(ProcessRole.BrokerRole))
Some(new BrokerServer(sharedServer))
else None
private val controller: Option[ControllerServer] =
if (config.processRoles.contains(ProcessRole.ControllerRole))
Some(new ControllerServer(sharedServer, configSchema, bootstrapMetadata))
else None
override def startup(): Unit = {
// ★ 컨트롤러를 먼저, 브로커를 나중에
controller.foreach(_.startup())
broker.foreach(_.startup())
}
세 가지 디자인 포인트.
SharedServer가 양쪽이 공유하는 인프라를 들고 있다. 가장 큰 게KafkaRaftManager— KRaft 합의 클라이언트. combined 모드에서도 Raft 클라이언트는 하나만 떠야 하니까 공유한다.- 컨트롤러가 먼저 startup — 브로커가 켜질 때 컨트롤러 엔드포인트 정보가 이미 KRaft 매니저에 들어가 있어야 한다.
- shutdown 순서는 반대로 — 브로커 먼저, 컨트롤러 나중. controlled shutdown에서 브로커가 컨트롤러에게 “나 내려간다” 알려야 하므로 컨트롤러가 살아 있어야 함.
BrokerServer.startup() — 18단계 부팅
이게 진짜 본론이다. 921줄짜리 BrokerServer.scala에서 startup() 한 메서드가 191~642 라인을 차지한다 (약 450줄). 처음 보면 압도적이지만, 순서에는 명확한 의도가 있다.
전체 흐름을 “기반 → 저장소 → 통신 → 처리기 → 등록 → 트래픽 개방” 6개 페이즈로 묶으면 이렇다.
[페이즈 A] 기반 인프라
1. status: SHUTDOWN → STARTING
2. sharedServer.startForBroker() ← Raft/메타데이터 로더 시동
3. KafkaScheduler, BrokerTopicStats, LogDirFailureChannel
[페이즈 B] 저장소
4. KRaftMetadataCache 생성 ← 메모리 위 메타데이터 뷰
5. LogManager **생성만** (시작은 보류) ← 로그 복구는 메타데이터 따라잡은 뒤에
[페이즈 C] 통신·네트워크
6. BrokerLifecycleManager ← 컨트롤러에 하트비트·등록 담당
7. clientToControllerChannelManager + ForwardingManager
8. SocketServer 생성 (acceptor만, processor는 보류) ★
[페이즈 D] 처리기·코디네이터
9. alterPartitionManager, AddPartitionsToTxnManager, AssignmentsManager
10. ReplicaManager 생성 ← 파티션 복제 핵심
11. groupCoordinator, transactionCoordinator, shareCoordinator
12. KafkaApis + KafkaRequestHandlerPool ← 모든 API 디스패처
[페이즈 E] 메타데이터 동기화·등록
13. BrokerMetadataPublisher 등록
14. lifecycleManager.start(...) ← 컨트롤러에 자기 등록 시작
15. initialCatchUpFuture 대기 ← 메타데이터 로그 끝까지 따라잡기
16. firstPublishFuture 대기 ← 첫 메타데이터 발행 완료
[페이즈 F] 트래픽 개방
17. setReadyToUnfence() 대기 ← 컨트롤러가 "OK" 응답
18. socketServer.enableRequestProcessing() ★ 여기서 비로소 요청 처리 개시
status: STARTING → STARTED
알면 좋을 디자인 포인트 4가지
1. 두 단계 부팅 — 포트 열기 ≠ 트래픽 받기
8단계에서 SocketServer를 만들 때는 acceptor만 띄운다. TCP 포트는 열리지만 클라이언트 요청은 처리하지 않는다. 17단계의 enableRequestProcessing()이 호출되어야 비로소 processor가 깨어나고 요청을 받기 시작한다.
왜? 인증/인가가 준비되기 전에 클라이언트 요청을 처리해버리면 보안 구멍이 된다. 그리고 메타데이터가 없는 상태에서 Produce 요청을 받으면 “리더가 누구인지” 답을 못 한다. 그래서 “포트는 열어두되 실제 처리는 모든 준비가 끝난 뒤” 라는 분리가 들어가 있다.
2. 메타데이터 catch-up이 게이트
15단계의 initialCatchUpFuture 전까지는 어떤 클라이언트 요청도 처리되지 않는다. 즉, 부팅이 느릴 때는 컨트롤러 연결 또는 메타데이터 로그 catch-up을 의심해야 한다. 디스크 I/O가 아니라 네트워크 문제일 가능성이 크다.
3. 코디네이터 ↔ ReplicaManager 순환 의존 — 람다로 풀기
ReplicaManager는 코디네이터들을 알아야 하고, 코디네이터는 replicaManager를 알아야 한다. Kafka는 람다를 통한 지연 바인딩으로 푼다.
val addPartitionsToTxnManager = new AddPartitionsToTxnManager(
config,
...,
// 이 시점엔 transactionCoordinator가 아직 없음 → 람다로 미루기
transactionalId => transactionCoordinator.partitionFor(transactionalId),
time
)
이렇게 하면 생성 순서에 자유가 생긴다. 의존성 주입 프레임워크 없이도 충분히 깔끔하게 풀린다.
4. KafkaApis가 단일 진입점
12단계의 KafkaApis는 모든 클라이언트 RPC가 분기되는 한 곳이다. Produce, Fetch, OffsetCommit, ListOffsets, ShareFetch… 전부 여기서 case로 갈린다. 새 API를 추가한다는 건 사실상 “KafkaApis.handle()에 case 하나 추가하는 일” 이다. 다음 편에서 자세히 본다.
데이터 평면의 길
부팅이 끝난 뒤 실제 요청이 흐르는 길도 그려두자.
TCP 클라이언트
▼
SocketServer (NIO Acceptor → Processor)
▼
RequestChannel (요청 큐)
▼
KafkaRequestHandlerPool ← numIoThreads 개의 워커 스레드
▼
KafkaApis.handle()
├─ Produce/Fetch → ReplicaManager → LogManager → UnifiedLog
├─ OffsetCommit/Join/Sync → GroupCoordinator
├─ InitProducerId/EndTxn → TransactionCoordinator
├─ ShareFetch/ShareAck → SharePartitionManager + ShareCoordinator
└─ 컨트롤러 전용 API → ForwardingManager → ControllerServer
흥미로운 게, 클라이언트가 컨트롤러 전용 API(예: CreateTopics)를 브로커에 보내도 브로커는 거절하지 않는다. ForwardingManager가 컨트롤러로 forward 해준다. KRaft 전환 이후의 통합된 인터페이스. 옛날 ZooKeeper 시절엔 클라이언트가 직접 컨트롤러를 찾아갔지만 지금은 어느 브로커에 던져도 알아서 라우팅된다.
정리
오늘 따라간 길:
$ ./bin/kafka-server-start.sh config/server.properties
│
├─ kafka-server-start.sh (얇은 wrapper, 44줄)
├─ kafka-run-class.sh (제네릭 자바 런처, 354줄, 도구 41개가 공유)
├─ Kafka.scala main (CLI 파싱 + shutdown hook)
├─ KafkaRaftServer (process.roles 분기, 컨트롤러 먼저)
└─ BrokerServer.startup() (18단계, 약 450줄)
↓
status = STARTED
↓
awaitShutdown() — 영원히 블록
다음 편은 KafkaApis.handle()로 들어간다. 클라이언트가 보낸 ProduceRequest 하나가 정확히 어떤 case 문을 거쳐 어디서 디스크에 닿는가. Kafka의 모든 API가 통과하는 좁은 문이고, 새 KIP 작업 대부분이 결국 여기 한 줄을 건드린다.
부록 — bin/의 나머지 도구들은 언제 실행되나
본문에서는 부팅 흐름만 봤지만, bin/에는 사실 셸 스크립트가 42개 더 있다 (kafka-server-start.sh 제외). 운영하다 보면 그룹별로 한두 번씩 다 만나게 되는데, “브로커가 떠있는 동안 옆에서 호출하는 도구” 라고 뭉뚱그리기엔 실행 시점과 수명이 네 가지로 갈린다. 이 구분은 사고가 났을 때 “지금 이 도구를 써도 되나”의 판단 근거가 된다.
① 부팅 전 실행 — 오프라인 도구
브로커가 떠있으면 안 되는 부류. 파일시스템을 직접 만진다.
| 스크립트 | 언제 |
|---|---|
kafka-storage.sh format | 클러스터 최초 1회. 각 로그 디렉토리에 meta.properties를 쓴다. KRaft 시대의 필수 사전작업 |
kafka-dump-log.sh | 세그먼트 파일을 직접 파싱. 데이터 깨졌나 의심될 때, 보통 해당 브로커를 멈추고 본다 |
② 장기 데몬형 — 별도 JVM 프로세스로 계속 산다
“한 번 실행 = 영원히 떠있음”인 부류. 모두 Kafka.scala처럼 자기만의 main + awaitShutdown() 패턴을 가진다.
| 스크립트 | 무엇이 뜨나 |
|---|---|
kafka-server-start.sh | 브로커/컨트롤러 JVM (kafka.Kafka) |
connect-distributed.sh / connect-standalone.sh | Connect 워커 JVM. 브로커와 별도 프로세스 |
connect-mirror-maker.sh | MirrorMaker 2 JVM (클러스터 간 미러링) |
trogdor.sh | 분산 부하·결함 주입 테스트 프레임워크 |
③ 단발성 CLI 클라이언트 — RPC 한 번 쏘고 종료
가장 많은 부류. 30개가 넘는다. 모두 같은 패턴이다.
$ kafka-topics.sh --bootstrap-server ... --list
│
▼
AdminClient (또는 Producer/Consumer)
│ TCP 9092
▼
브로커의 SocketServer → KafkaApis.handle()
│
응답 받고 → JVM 종료
내부적으로 어떤 클라이언트를 쓰느냐로 다시 나뉜다.
| 내부 클라이언트 | 대표 스크립트 |
|---|---|
| AdminClient | kafka-topics, kafka-configs, kafka-acls, kafka-consumer-groups, kafka-reassign-partitions, kafka-log-dirs, kafka-leader-election, kafka-features, kafka-cluster, kafka-metadata-quorum, kafka-transactions, kafka-delegation-tokens, kafka-broker-api-versions 등 |
| KafkaProducer | kafka-console-producer, kafka-verifiable-producer, kafka-producer-perf-test |
| KafkaConsumer | kafka-console-consumer, kafka-verifiable-consumer, kafka-consumer-perf-test, kafka-get-offsets |
| Producer + Consumer 동시 | kafka-e2e-latency, kafka-replica-verification |
| 브로커 RPC X (JMX·메타데이터 로그 직접) | kafka-jmx, kafka-metadata-shell |
④ 시그널 도구
kafka-server-stop.sh는 ps로 PID를 찾아 kill -TERM만 보낸다. 그 SIGTERM이 Kafka.scala의 shutdown hook을 트리거 → graceful shutdown.
한눈에 — 호출 시나리오
[부팅 전] storage.sh format
│
▼
[부팅] server-start.sh ──────▶ 브로커 JVM (영원히 산다)
▲
│ TCP 9092
│
[운영 중] ─ topics.sh ──RPC 1번──┤ ← 단발성
─ consumer-groups ──RPC 1번──┤
─ console-producer ─접속·발행──┤
─ reassign ─RPC 여러번┤
│
─ connect-distributed ─────────▶ Connect JVM (별도 데몬)
│
[종료] server-stop.sh ── kill TERM ─┘
결국 두 가지로 줄어든다.
kafka-server-start.sh/connect-*.sh/trogdor.sh= 데몬을 띄우는 스크립트. 한 번 실행하면 그 JVM이 거기 계속 살아있음- 나머지 30+ 개 = 이미 떠있는 클러스터에 클라이언트로 한 번 접속하는 스크립트. 명령 보내고 JVM 죽음
도구 추가가 “클래스 하나 짜고 셸 1줄 만들면 끝” 인 이유도 여기 있다. ③번 카테고리는 사실상 AdminClient/Producer/Consumer를 감싼 얇은 CLI 래퍼일 뿐이고, ②번도 kafka.Kafka처럼 main만 있으면 된다. 본문에서 본 “공용 자바 런처에 클래스 이름만 넘긴다” 라는 패턴이 도구 카탈로그 전체를 지탱하고 있는 셈이다.
참고 (트렁크 기준)
bin/kafka-server-start.sh:44bin/kafka-run-class.shcore/src/main/scala/kafka/Kafka.scala:73core/src/main/scala/kafka/server/KafkaRaftServer.scala:91-99core/src/main/scala/kafka/server/BrokerServer.scala:191-642
