Kafka Shallow Dive - Kafka Streams 기초
Published:
Kafka를 데이터 허브로 쓰다 보면 결국 “흐르는 데이터를 중간에서 가공”하고 싶어진다. 이 영역에 Flink/Spark Streaming도 있지만, Kafka만 쓰는 환경에선 Kafka Streams가 제일 가볍게 쓸 수 있는 선택지.
Kafka Streams ?
Kafka 클라이언트 라이브러리. 별도 클러스터 필요 없이 그냥 JVM 앱에 스트림 처리 로직을 얹는다.
- Flink/Spark처럼 별도 실행 엔진 안 띄움 → 배포가 일반 서버 앱과 동일
- Kafka topic을 입력으로 읽고, 변환하고, 다시 topic에 쓰는 DAG 정의
- Kafka의 파티션 = 병렬 처리의 단위 (consumer group 재활용)
- Exactly-once 기본 지원 (processing.guarantee=exactly_once_v2)
언제 쓰나:
- Kafka 중심 아키텍처에서 topic-to-topic 변환
- 조인, 집계, 윈도잉을 실시간으로
- Flink를 도입할 만큼 규모가 크지 않을 때
핵심 추상화: KStream vs KTable
| 구분 | KStream | KTable |
|---|---|---|
| 관점 | 이벤트 스트림 | 상태 테이블 (changelog) |
| 의미 | “무슨 일이 일어났다” | “현재 상태는 이거다” |
| 같은 key 재입력 | 새 레코드 추가 | 값 업데이트 (upsert) |
| null 값 | 값이 null인 이벤트 | tombstone (삭제) |
예시로 감 잡기:
이벤트 스트림 (KStream):
(user1, login)
(user1, click)
(user1, logout)
상태 테이블 (KTable):
user1 → logout ← 가장 최근 상태만
같은 topic도 KStream으로 읽으면 “사실의 연속”, KTable로 읽으면 “key별 최신 값”이 된다.
간단한 예제
orders topic에서 읽어서 금액 100 이상만 hi_orders 로 보내기:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders",
Consumed.with(Serdes.String(), orderSerde));
orders
.filter((key, order) -> order.amount() >= 100)
.mapValues(order -> order.withFlag("HIGH"))
.to("hi_orders", Produced.with(Serdes.String(), orderSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
설정:
application.id=order-filter-v1
bootstrap.servers=localhost:9092
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
processing.guarantee=exactly_once_v2
num.stream.threads=4
application.id가 consumer group id 겸 내부 topic prefix로 쓰임. 변경하면 완전 새 앱으로 취급되니 조심.
Stateful 처리
단순 filter/map은 stateless. Aggregation/Join은 상태가 필요함 (지금까지의 합계, 카운트 등). Kafka Streams는 로컬 RocksDB에 상태를 저장하고, changelog topic으로 Kafka에 백업한다.
KStream → groupByKey → count()
내부적으로:
- local RocksDB store ← 읽기/쓰기 (빠름)
- changelog topic (compacted) ← 각 상태 변화 append
- 장애 시 다른 인스턴스가 changelog 재생해서 복구
상태가 큰 앱도 OK. RocksDB는 디스크까지 쓰고, changelog는 compact되므로 용량이 폭주하지 않음.
Aggregation 예시
KTable<String, Long> orderCountByUser = orders
.groupBy((key, order) -> order.userId())
.count(Materialized.as("order-count-store"));
Materialized.as(...)로 이름을 주면 해당 store를 외부에서 Interactive Queries로 조회 가능 (consumer 없이 앱 REST로 상태 노출).
Windowing
무한 스트림에 “1분 단위 집계” 같은 걸 하려면 시간 윈도우가 필요.
| 윈도우 | 설명 | 예 |
|---|---|---|
| Tumbling | 겹치지 않는 고정 크기 | 매분 0~60초 카운트 |
| Hopping | 겹치는 고정 크기 | 10분 윈도우, 1분마다 새로 |
| Sliding | 레코드 시점 기준 N분 | 직전 5분간 합계 |
| Session | 비활성 간격으로 그룹 | 유저 세션 길이 |
orders
.groupBy((k, o) -> o.userId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count()
.toStream()
.to("orders-per-minute");
Grace period — late arrival을 얼마나 허용할지. ofSizeWithNoGrace는 0. 실무에선 stream 시계 대신 event time을 쓰면서 몇 분 정도 여유를 두는 경우가 많음.
Join
스트림끼리, 스트림-테이블, 테이블-테이블 조인 지원.
| 조인 | 예 |
|---|---|
| KStream-KStream | 이벤트 쌍 매칭 (예: click + impression within 5min) |
| KStream-KTable | 이벤트에 참조 데이터 붙이기 (예: order에 고객 정보) |
| KTable-KTable | 양쪽 상태 결합 |
KTable<String, Customer> customers = builder.table("customers");
KStream<String, Order> orders = builder.stream("orders");
orders.join(customers, (order, cust) -> enrich(order, cust))
.to("enriched-orders");
KTable과 조인할 때 co-partitioning(양쪽의 key와 파티션 수 일치)이 필요. 안 맞으면 re-key + re-partition이 필요해서 추가 topic이 생긴다.
Topology와 병렬성
builder.build().describe()
topology를 프린트해보면 sub-topology 단위로 나뉜 DAG가 나옴. 각 sub-topology는 stream thread가 파티션을 나눠 처리.
- 입력 topic 파티션 수 = 최대 병렬도
num.stream.threads로 인스턴스당 스레드 수- 여러 인스턴스(pod) 띄우면 자동으로 파티션 재분배
운영 포인트
- application.id 바꾸지 말 것 — 내부 topic/group id가 다 바뀜. 옛 상태 찌꺼기 남음
- changelog topic 관리 — compacted라 무한히 커지진 않지만, 클러스터에 내부 topic이 꽤 생김
- 상태 크기 — RocksDB 디스크 사용량 모니터링
- rebalance 비용 — stateful 앱은 state 재로딩이 있어 리밸런스가 비쌈.
CooperativeStickyAssignor+ standby replica (num.standby.replicas=1) 고려 - exactly_once_v2 — 권장. v1 대비 오버헤드 감소
언제 Kafka Streams가 맞지 않나
- 입력이 Kafka가 아님 (DB/파일 입력 비중이 큼) → Flink 고려
- 정말 큰 join/aggregation 워크로드 → Flink의 cluster 모델이 더 유리
- SQL로 표현하고 싶음 → ksqlDB 또는 Flink SQL
정리
Kafka Streams는 Kafka 에코시스템 안에서 변환/집계를 해결하는 제일 가벼운 방법. “별도 클러스터 필요 없음 + 기본 EOS”가 큰 장점이다. KStream/KTable의 이중 관점과 changelog 기반 상태 복구가 핵심 아이디어. 복잡한 스트림 처리는 여전히 Flink의 영역이지만, Kafka-centric한 일반적 변환은 대부분 Streams로 충분.
다음 주는 스트림 처리 말고 “외부 시스템 연결” — Kafka Connect.
