Kafka Shallow Dive - Consumer & Consumer Group
Published:
Producer는 뿌리기만 하면 끝이지만 Consumer는 “어디까지 읽었는가”를 스스로 책임져야 함. 이번 주의 키워드는 consumer group, 리밸런싱, offset commit.
Consumer Group ?
같은 topic을 나눠서 읽는 consumer들의 묶음. group.id 로 식별.
Topic orders (partitions=6)
Consumer Group "billing":
consumer A → orders-0, orders-1
consumer B → orders-2, orders-3
consumer C → orders-4, orders-5
Consumer Group "analytics":
consumer X → orders-0,1,2,3,4,5 (혼자 다)
규칙:
- 한 파티션은 그룹 내 한 consumer에게만 할당됨 (순서 보장 위해)
- 그룹끼리는 독립. 같은 topic을 여러 그룹이 각자 읽을 수 있음 (pub-sub 효과)
- consumer 수 > partition 수 면 남는 consumer는 놀게 됨
- partition 수가 그룹의 최대 병렬성
스케일 아웃 = consumer 추가 + 파티션 수 충분히 확보.
리밸런싱 (Rebalance)
그룹 구성이 바뀔 때 파티션을 다시 분배하는 과정.
트리거:
- 새 consumer가 그룹에 join
- 기존 consumer가 leave/crash (heartbeat 끊김)
- topic의 파티션 수가 늘어남
- subscribe 대상 topic 변경
Eager vs Cooperative
예전 프로토콜은 “stop-the-world”. 지금 권장은 cooperative sticky.
| 구분 | Eager (RangeAssignor, RoundRobin) | Cooperative (Sticky, CooperativeSticky) |
|---|---|---|
| 방식 | 전원 revoke → 재할당 | 필요한 파티션만 revoke |
| 일시정지 | 그룹 전체 소비 멈춤 | 대부분은 계속 소비 |
| 지연 | 수 초 | 거의 0 |
| 기본값 | < 2.4 | 3.0+ 부터 기본 |
설정:
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
운영 중 스케일링할 때 체감이 완전히 다름. Eager는 배포 한 번에 consumer lag이 튀고, cooperative는 조용히 넘어간다.
Static Membership
group.instance.id 를 주면 consumer가 잠깐 죽었다가 돌아왔을 때 리밸런스를 스킵함 (session.timeout 안에 복귀하는 조건). K8s에서 pod가 잠깐 재시작되는 경우에 좋음.
Offset 관리
Consumer는 읽은 위치를 어딘가에 기록해야 다음에 이어서 읽을 수 있음. Kafka는 __consumer_offsets 라는 내부 topic에 compacted log로 저장.
auto commit
enable.auto.commit=true
auto.commit.interval.ms=5000
poll() 호출 시점에 주기적으로 자동 커밋. 편하지만 at-most-once에 가까운 함정이 있음 — 메시지 처리 전에 커밋될 수 있어서 크래시 시 메시지 유실 가능.
manual commit
while (true) {
var records = consumer.poll(Duration.ofMillis(1000));
for (var r : records) {
process(r);
}
consumer.commitSync(); // 또는 commitAsync()
}
- commitSync — 블록, 실패 시 재시도. 안전
- commitAsync — 비블록, 콜백으로 결과 받음. 빠름
- 보통 루프에선 async, 종료 직전이나 파티션 revoke 때만 sync
auto.offset.reset
처음 join하거나 기존 offset이 만료됐을 때 어디부터 읽을지.
| 값 | 의미 | 언제 씀 |
|---|---|---|
latest (기본) | 지금 이후 신규만 | 실시간 소비, 과거 무관 |
earliest | 보존된 모든 메시지 | 재처리, 백필 |
none | 예외 던짐 | 안전 확인용 |
실수 포인트: latest인데 컨슈머 새로 띄우면서 “왜 과거 데이터 안 읽지?” 하고 당황함. 백필 목적이면 earliest + 새 group.id.
Consumer Lag
파티션의 최신 offset과 consumer의 현재 offset 차이. 처리가 밀리고 있는지 보는 가장 중요한 지표.
lag = LEO(log end offset) - committed offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group billing
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
# orders 0 123456 123500 44 consumer-A
# orders 1 120000 130000 10000 consumer-B ← 문제
Lag이 쌓이는 원인:
- 처리 속도 < 생산 속도 — consumer 추가 또는 파티션 늘리기
- 특정 파티션에 key 편중 — partitioner/key 설계 재검토
- GC pause나 외부 의존성(DB) 지연 — 처리 로직 최적화
- poll 주기가 너무 긺 —
max.poll.interval.ms넘기면 그룹에서 쫓겨남
Poll loop 주의사항
Consumer는 “poll loop”로 돌아감. 이 루프가 멈추거나 오래 걸리면 heartbeat이 끊겨서 리밸런스 유발.
max.poll.records=500 # 한 번에 가져올 레코드 수
max.poll.interval.ms=300000 # poll 간격 한계 (5분)
session.timeout.ms=45000 # heartbeat 끊김 감지 (45초)
heartbeat.interval.ms=3000 # heartbeat 주기
처리 시간이 길면:
max.poll.records를 줄여서 한 배치 크기 감소- 오래 걸리는 작업은 별도 스레드 풀로 분리 +
pause()/resume()사용 max.poll.interval.ms를 넉넉히 (단, 장애 감지는 늦어짐)
소비 전략별 패턴
| 전략 | 커밋 타이밍 | 보장 | 유스케이스 |
|---|---|---|---|
| Auto commit | 주기적 | At-most-once 경향 | 유실 허용 로그 |
| Commit after process | 처리 후 sync | At-least-once | 일반적 |
| Commit before process | 처리 전 | At-most-once | 처리 실패 = skip |
| Transactional (Exactly-once 편) | 처리+commit을 tx로 | Exactly-once | 금융, 정합성 중요 |
“At-least-once + 멱등 처리”가 가장 흔한 조합. 소비 쪽에서 멱등키(예: 이벤트 ID)로 중복을 걸러내면 실용적으로 안전.
ConsumerRebalanceListener
리밸런스 직전에 훅을 걸 수 있음. 처리 중이던 것 마무리 + 커밋하고 넘기기 좋은 타이밍.
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> parts) {
consumer.commitSync(); // 잃기 전에 먼저 커밋
}
public void onPartitionsAssigned(Collection<TopicPartition> parts) {
// 필요 시 offset seek
}
});
정리
Consumer group은 “파티션을 어떻게 나눌지”의 컴포넌트이고, 리밸런싱/offset commit/lag 세 가지가 운영 품질을 결정한다. CooperativeStickyAssignor + manual commit + lag 모니터링이 실용적 기본 셋. 지난 달에 겪은 “배포하면 lag 튐” 문제가 eager rebalance 때문이었다는 걸 이제야 알았다.
다음 주는 Replication과 ISR — 브로커가 죽어도 데이터가 살아남는 원리.
