Kafka Deep Dive (1) — 브로커는 어디서부터 시작되는가

Published:

작년 여름에 10주짜리 Kafka Shallow Dive 시리즈를 끝냈다. 그건 “Kafka가 뭐고 왜 쓰는지”였다. 한참을 쓰다 보니 이제는 안에서 어떻게 돌아가는지가 궁금해졌고, 그래서 이번엔 트렁크 소스를 직접 따라가는 deep dive를 시작한다.

첫 편은 entry point. ./bin/kafka-server-start.sh를 치면 그 다음에 정확히 뭐가 일어나는가. 셸 → JVM → 서버 부팅까지 한 줄씩 따라간다.

이 글의 시작점 README의 Running a Kafka broker는 3단계다 — kafka-storage.sh random-uuidkafka-storage.sh formatkafka-server-start.sh. 이 글은 마지막 줄부터, 즉 이미 포맷된 클러스터를 기동하는 시점부터 따라간다. 처음 띄우는 법 자체는 공식 quickstart 참고.

큰 그림 — 4단계 체인

bin/kafka-server-start.sh           ① 셸 진입점 (44 lines)
        │  exec kafka-run-class.sh ... kafka.Kafka "$@"
        ▼
bin/kafka-run-class.sh              ② 공용 자바 런처 (354 lines)
        │  exec java $KAFKA_OPTS -cp ... kafka.Kafka server.properties
        ▼
core/.../kafka/Kafka.scala          ③ JVM 진입점
        │  object Kafka { def main(args) }
        ▼
core/.../KafkaRaftServer.scala      ④ 역할 분기 (broker / controller / combined)
        │  startup() → BrokerServer.startup() / ControllerServer.startup()
        ▼
실제 일이 일어나는 곳

① kafka-server-start.sh — 의외로 얇다

44줄짜리 스크립트의 마지막 줄은 이렇다.

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

exec로 호출하는 게 포인트다. 새 프로세스를 띄우지 않고 현재 셸을 JVM으로 교체한다. 그래서:

  • PID가 하나로 유지된다 (systemd, docker, k8s에서 깔끔)
  • SIGTERM 같은 시그널이 셸을 거치지 않고 JVM에 바로 전달된다 → Kafka.scalaExit.addShutdownHook("kafka-shutdown-hook", ...)가 정상 동작
  • 만약 bash로 다시 한 번 감쌌다면 시그널 전달과 종료 코드가 꼬여서 graceful shutdown이 깨질 수 있다

스크립트가 직접 챙기는 건 셋뿐이다.

  • LOG_DIR 디폴트 설정
  • KAFKA_HEAP_OPTS 디폴트 (-Xmx1G -Xms1G)
  • log4j2.yaml 설정 파일 경로

브로커 역할에 한정된 옵션만 남기고, 나머지는 다 위임한다. 그 위임의 대상이 kafka-run-class.sh.

② kafka-run-class.sh — 왜 한 단을 더 거치나

처음 봤을 땐 “왜 굳이 한 번 더 쉘을 거치지?” 싶다. bin/에 가서 한 줄 세보면 답이 나온다.

$ ls bin/*.sh | wc -l
43

$ grep -l "kafka-run-class.sh" bin/*.sh | wc -l
41

43개의 셸 스크립트 중 41개가 kafka-run-class.sh를 호출한다. 즉 이건 단순 wrapper가 아니라 Kafka 도구 전체가 공유하는 자바 런처다.

kafka-server-start.sh    →  kafka.Kafka                       (브로커)
kafka-topics.sh          →  org.apache.kafka.tools.TopicCommand
kafka-console-producer.sh→  kafka.tools.ConsoleProducer
kafka-storage.sh         →  org.apache.kafka.tools.StorageTool
kafka-acls.sh            →  org.apache.kafka.tools.AclCommand
...

각 도구는 자기가 띄울 메인 클래스 이름만 다르게 넘긴다. kafka-run-class.sh가 354줄에 걸쳐 처리하는 건 도구 전체에 공통된 다음 항목들이다.

책임왜 거기 있어야 하나
CLASSPATH 조립소스 트리(*/build/libs/*.jar)와 릴리스 tarball(libs/*.jar)을 자동 감지해야 한다. dev/prod에서 따로 처리하지 않게 한 곳에 모음
Scala 버전 분기 (SCALA_VERSION, SCALA_BINARY_VERSION)jar 이름에 Scala 버전이 박혀 있어서 (kafka_2.13-...jar) 빌드 시점에 맞춰 골라줘야 함
JDK 버전 감지GC 로그 옵션 형태가 JDK 8/11/17에서 다르고, 모듈 시스템(--add-opens) 옵션도 다름
KAFKA_HEAP_OPTS, KAFKA_JVM_PERFORMANCE_OPTS, KAFKA_GC_LOG_OPTS, KAFKA_OPTS 처리운영자가 스크립트 파일을 안 건드리고 환경변수만 export해서 튜닝하게 함. 도커/쿠버 환경에서 결정적
JMX (JMX_PORT, JMX_OPTS)모든 도구가 JMX를 켤 수 있어야 하므로 공용
Windows .bat 별도 분기OS 차이를 한 곳에서만 처리

이걸 43개 스크립트마다 복붙했다면 — 그리고 JDK 25 지원을 추가할 때마다 43군데를 고쳐야 했다면 — 운영팀이 무릎을 꿇었을 거다. 분리는 DRY + 관심사 분리 + 외부 커스터마이징 셋을 동시에 푼다.

같은 패턴이 HBase, Hadoop, ZooKeeper에도 있다. JVM 기반 분산 시스템의 관용구다.

두 줄 요약

  • kafka-server-start.sh“브로커를 띄운다” 라는 의도만 담은 얇은 wrapper
  • kafka-run-class.sh — 어떤 자바 클래스든 띄울 수 있는 제네릭 런처

bin/에 깔린 다른 41개 도구가 정확히 뭐고 언제 쓰는지는 본문 흐름과 결이 달라서 부록으로 뺐다.

③ Kafka.scala — JVM이 처음 만나는 진짜 main

// core/src/main/scala/kafka/Kafka.scala
object Kafka extends Logging {
  def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)
      val server = buildServer(serverProps)

      // 시그널 핸들러 등록
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()

      // 셧다운 훅 — SIGTERM 받으면 server.shutdown() 호출
      Exit.addShutdownHook("kafka-shutdown-hook", () => {
        try server.shutdown()
        catch { case _: Throwable => Exit.halt(1) }
      })

      server.startup()
      server.awaitShutdown()   // 블로킹 — 여기서 평생 대기
    } catch {
      case e: Throwable => Exit.exit(1)
    }
    Exit.exit(0)
  }
}

Scala objectmain은 JVM 입장에서 public static void main 으로 노출된다. 그래서 셸이 java ... kafka.Kafka server.properties로 호출할 수 있다.

main이 하는 일은 4가지뿐이다.

  1. CLI 파싱 (server.properties 경로 + --override key=value 처리)
  2. buildServer(props)new KafkaRaftServer(...) 생성
  3. 시그널 핸들러 / 셧다운 훅 등록
  4. server.startup() 호출 후 server.awaitShutdown()에서 영원히 블록

진짜 일은 4단계의 KafkaRaftServer로 넘어간다.

④ KafkaRaftServer — process.roles 분기

KRaft 시대의 Kafka는 한 JVM이 세 모드 중 하나로 뜬다.

process.roles=broker         → BrokerServer 만
process.roles=controller     → ControllerServer 만
process.roles=broker,controller → 둘 다 (combined 모드, 주로 dev/소규모)

KafkaRaftServer는 이 분기를 담당한다. 핵심만 발췌하면:

private val sharedServer = new SharedServer(config, ...)

private val broker: Option[BrokerServer] =
  if (config.processRoles.contains(ProcessRole.BrokerRole))
    Some(new BrokerServer(sharedServer))
  else None

private val controller: Option[ControllerServer] =
  if (config.processRoles.contains(ProcessRole.ControllerRole))
    Some(new ControllerServer(sharedServer, configSchema, bootstrapMetadata))
  else None

override def startup(): Unit = {
  // ★ 컨트롤러를 먼저, 브로커를 나중에
  controller.foreach(_.startup())
  broker.foreach(_.startup())
}

세 가지 디자인 포인트.

  1. SharedServer가 양쪽이 공유하는 인프라를 들고 있다. 가장 큰 게 KafkaRaftManager — KRaft 합의 클라이언트. combined 모드에서도 Raft 클라이언트는 하나만 떠야 하니까 공유한다.
  2. 컨트롤러가 먼저 startup — 브로커가 켜질 때 컨트롤러 엔드포인트 정보가 이미 KRaft 매니저에 들어가 있어야 한다.
  3. shutdown 순서는 반대로 — 브로커 먼저, 컨트롤러 나중. controlled shutdown에서 브로커가 컨트롤러에게 “나 내려간다” 알려야 하므로 컨트롤러가 살아 있어야 함.

BrokerServer.startup() — 18단계 부팅

이게 진짜 본론이다. 921줄짜리 BrokerServer.scala에서 startup() 한 메서드가 191~642 라인을 차지한다 (약 450줄). 처음 보면 압도적이지만, 순서에는 명확한 의도가 있다.

전체 흐름을 “기반 → 저장소 → 통신 → 처리기 → 등록 → 트래픽 개방” 6개 페이즈로 묶으면 이렇다.

[페이즈 A] 기반 인프라
  1. status: SHUTDOWN → STARTING
  2. sharedServer.startForBroker()           ← Raft/메타데이터 로더 시동
  3. KafkaScheduler, BrokerTopicStats, LogDirFailureChannel

[페이즈 B] 저장소
  4. KRaftMetadataCache 생성                  ← 메모리 위 메타데이터 뷰
  5. LogManager **생성만** (시작은 보류)       ← 로그 복구는 메타데이터 따라잡은 뒤에

[페이즈 C] 통신·네트워크
  6. BrokerLifecycleManager                  ← 컨트롤러에 하트비트·등록 담당
  7. clientToControllerChannelManager + ForwardingManager
  8. SocketServer 생성 (acceptor만, processor는 보류)  ★

[페이즈 D] 처리기·코디네이터
  9.  alterPartitionManager, AddPartitionsToTxnManager, AssignmentsManager
  10. ReplicaManager 생성                     ← 파티션 복제 핵심
  11. groupCoordinator, transactionCoordinator, shareCoordinator
  12. KafkaApis + KafkaRequestHandlerPool    ← 모든 API 디스패처

[페이즈 E] 메타데이터 동기화·등록
  13. BrokerMetadataPublisher 등록
  14. lifecycleManager.start(...)             ← 컨트롤러에 자기 등록 시작
  15. initialCatchUpFuture 대기               ← 메타데이터 로그 끝까지 따라잡기
  16. firstPublishFuture 대기                 ← 첫 메타데이터 발행 완료

[페이즈 F] 트래픽 개방
  17. setReadyToUnfence() 대기                ← 컨트롤러가 "OK" 응답
  18. socketServer.enableRequestProcessing()  ★ 여기서 비로소 요청 처리 개시
      status: STARTING → STARTED

알면 좋을 디자인 포인트 4가지

1. 두 단계 부팅 — 포트 열기 ≠ 트래픽 받기

8단계에서 SocketServer를 만들 때는 acceptor만 띄운다. TCP 포트는 열리지만 클라이언트 요청은 처리하지 않는다. 17단계의 enableRequestProcessing()이 호출되어야 비로소 processor가 깨어나고 요청을 받기 시작한다.

왜? 인증/인가가 준비되기 전에 클라이언트 요청을 처리해버리면 보안 구멍이 된다. 그리고 메타데이터가 없는 상태에서 Produce 요청을 받으면 “리더가 누구인지” 답을 못 한다. 그래서 “포트는 열어두되 실제 처리는 모든 준비가 끝난 뒤” 라는 분리가 들어가 있다.

2. 메타데이터 catch-up이 게이트

15단계의 initialCatchUpFuture 전까지는 어떤 클라이언트 요청도 처리되지 않는다. 즉, 부팅이 느릴 때는 컨트롤러 연결 또는 메타데이터 로그 catch-up을 의심해야 한다. 디스크 I/O가 아니라 네트워크 문제일 가능성이 크다.

3. 코디네이터 ↔ ReplicaManager 순환 의존 — 람다로 풀기

ReplicaManager는 코디네이터들을 알아야 하고, 코디네이터는 replicaManager를 알아야 한다. Kafka는 람다를 통한 지연 바인딩으로 푼다.

val addPartitionsToTxnManager = new AddPartitionsToTxnManager(
  config,
  ...,
  // 이 시점엔 transactionCoordinator가 아직 없음 → 람다로 미루기
  transactionalId => transactionCoordinator.partitionFor(transactionalId),
  time
)

이렇게 하면 생성 순서에 자유가 생긴다. 의존성 주입 프레임워크 없이도 충분히 깔끔하게 풀린다.

4. KafkaApis가 단일 진입점

12단계의 KafkaApis모든 클라이언트 RPC가 분기되는 한 곳이다. Produce, Fetch, OffsetCommit, ListOffsets, ShareFetch… 전부 여기서 case로 갈린다. 새 API를 추가한다는 건 사실상 KafkaApis.handle()에 case 하나 추가하는 일” 이다. 다음 편에서 자세히 본다.

데이터 평면의 길

부팅이 끝난 뒤 실제 요청이 흐르는 길도 그려두자.

TCP 클라이언트
   ▼
SocketServer (NIO Acceptor → Processor)
   ▼
RequestChannel (요청 큐)
   ▼
KafkaRequestHandlerPool   ← numIoThreads 개의 워커 스레드
   ▼
KafkaApis.handle()
   ├─ Produce/Fetch          → ReplicaManager → LogManager → UnifiedLog
   ├─ OffsetCommit/Join/Sync → GroupCoordinator
   ├─ InitProducerId/EndTxn  → TransactionCoordinator
   ├─ ShareFetch/ShareAck    → SharePartitionManager + ShareCoordinator
   └─ 컨트롤러 전용 API       → ForwardingManager → ControllerServer

흥미로운 게, 클라이언트가 컨트롤러 전용 API(예: CreateTopics)를 브로커에 보내도 브로커는 거절하지 않는다. ForwardingManager가 컨트롤러로 forward 해준다. KRaft 전환 이후의 통합된 인터페이스. 옛날 ZooKeeper 시절엔 클라이언트가 직접 컨트롤러를 찾아갔지만 지금은 어느 브로커에 던져도 알아서 라우팅된다.

정리

오늘 따라간 길:

$ ./bin/kafka-server-start.sh config/server.properties
       │
       ├─ kafka-server-start.sh  (얇은 wrapper, 44줄)
       ├─ kafka-run-class.sh     (제네릭 자바 런처, 354줄, 도구 41개가 공유)
       ├─ Kafka.scala main       (CLI 파싱 + shutdown hook)
       ├─ KafkaRaftServer        (process.roles 분기, 컨트롤러 먼저)
       └─ BrokerServer.startup() (18단계, 약 450줄)
              ↓
         status = STARTED
              ↓
         awaitShutdown() — 영원히 블록

다음 편은 KafkaApis.handle()로 들어간다. 클라이언트가 보낸 ProduceRequest 하나가 정확히 어떤 case 문을 거쳐 어디서 디스크에 닿는가. Kafka의 모든 API가 통과하는 좁은 문이고, 새 KIP 작업 대부분이 결국 여기 한 줄을 건드린다.


부록 — bin/의 나머지 도구들은 언제 실행되나

본문에서는 부팅 흐름만 봤지만, bin/에는 사실 셸 스크립트가 42개 더 있다 (kafka-server-start.sh 제외). 운영하다 보면 그룹별로 한두 번씩 다 만나게 되는데, “브로커가 떠있는 동안 옆에서 호출하는 도구” 라고 뭉뚱그리기엔 실행 시점과 수명이 네 가지로 갈린다. 이 구분은 사고가 났을 때 “지금 이 도구를 써도 되나”의 판단 근거가 된다.

① 부팅 실행 — 오프라인 도구

브로커가 떠있으면 안 되는 부류. 파일시스템을 직접 만진다.

스크립트언제
kafka-storage.sh format클러스터 최초 1회. 각 로그 디렉토리에 meta.properties를 쓴다. KRaft 시대의 필수 사전작업
kafka-dump-log.sh세그먼트 파일을 직접 파싱. 데이터 깨졌나 의심될 때, 보통 해당 브로커를 멈추고 본다

② 장기 데몬형 — 별도 JVM 프로세스로 계속 산다

“한 번 실행 = 영원히 떠있음”인 부류. 모두 Kafka.scala처럼 자기만의 main + awaitShutdown() 패턴을 가진다.

스크립트무엇이 뜨나
kafka-server-start.sh브로커/컨트롤러 JVM (kafka.Kafka)
connect-distributed.sh / connect-standalone.shConnect 워커 JVM. 브로커와 별도 프로세스
connect-mirror-maker.shMirrorMaker 2 JVM (클러스터 간 미러링)
trogdor.sh분산 부하·결함 주입 테스트 프레임워크

③ 단발성 CLI 클라이언트 — RPC 한 번 쏘고 종료

가장 많은 부류. 30개가 넘는다. 모두 같은 패턴이다.

$ kafka-topics.sh --bootstrap-server ... --list
        │
        ▼
  AdminClient (또는 Producer/Consumer)
        │  TCP 9092
        ▼
  브로커의 SocketServer → KafkaApis.handle()
        │
   응답 받고 → JVM 종료

내부적으로 어떤 클라이언트를 쓰느냐로 다시 나뉜다.

내부 클라이언트대표 스크립트
AdminClientkafka-topics, kafka-configs, kafka-acls, kafka-consumer-groups, kafka-reassign-partitions, kafka-log-dirs, kafka-leader-election, kafka-features, kafka-cluster, kafka-metadata-quorum, kafka-transactions, kafka-delegation-tokens, kafka-broker-api-versions
KafkaProducerkafka-console-producer, kafka-verifiable-producer, kafka-producer-perf-test
KafkaConsumerkafka-console-consumer, kafka-verifiable-consumer, kafka-consumer-perf-test, kafka-get-offsets
Producer + Consumer 동시kafka-e2e-latency, kafka-replica-verification
브로커 RPC X (JMX·메타데이터 로그 직접)kafka-jmx, kafka-metadata-shell

④ 시그널 도구

kafka-server-stop.shps로 PID를 찾아 kill -TERM만 보낸다. 그 SIGTERM이 Kafka.scala의 shutdown hook을 트리거 → graceful shutdown.

한눈에 — 호출 시나리오

[부팅 전]   storage.sh format
                │
                ▼
[부팅]      server-start.sh   ──────▶  브로커 JVM (영원히 산다)
                                          ▲
                                          │  TCP 9092
                                          │
[운영 중] ─ topics.sh         ──RPC 1번──┤   ← 단발성
          ─ consumer-groups   ──RPC 1번──┤
          ─ console-producer   ─접속·발행──┤
          ─ reassign           ─RPC 여러번┤
                                          │
          ─ connect-distributed ─────────▶ Connect JVM (별도 데몬)
                                          │
[종료]    server-stop.sh    ── kill TERM ─┘

결국 두 가지로 줄어든다.

  1. kafka-server-start.sh / connect-*.sh / trogdor.sh = 데몬을 띄우는 스크립트. 한 번 실행하면 그 JVM이 거기 계속 살아있음
  2. 나머지 30+ 개 = 이미 떠있는 클러스터에 클라이언트로 한 번 접속하는 스크립트. 명령 보내고 JVM 죽음

도구 추가가 “클래스 하나 짜고 셸 1줄 만들면 끝” 인 이유도 여기 있다. ③번 카테고리는 사실상 AdminClient/Producer/Consumer를 감싼 얇은 CLI 래퍼일 뿐이고, ②번도 kafka.Kafka처럼 main만 있으면 된다. 본문에서 본 “공용 자바 런처에 클래스 이름만 넘긴다” 라는 패턴이 도구 카탈로그 전체를 지탱하고 있는 셈이다.


참고 (트렁크 기준)

  • bin/kafka-server-start.sh:44
  • bin/kafka-run-class.sh
  • core/src/main/scala/kafka/Kafka.scala:73
  • core/src/main/scala/kafka/server/KafkaRaftServer.scala:91-99
  • core/src/main/scala/kafka/server/BrokerServer.scala:191-642