IoT data pipeline 회고

Published:

SSAFY 프로젝트로 IoT 디바이스에서 올라오는 좌표 데이터를 실시간으로 처리하는 파이프라인을 만들었다. 프로젝트가 일단락되고, 때마침 Kafka Shallow Dive 10주 시리즈도 끝난 타이밍이라 한 번 돌아봄. “뭘 만들었고 / 뭘 놓쳤고 / 다음엔 뭘 하지” 세 줄로 회고.

만든 것

[IoT 디바이스] ──MQTT──▶ Mosquitto
                            │
                            ▼
                        Telegraf
                            │
                            ▼
                       Kafka (KRaft)
                            │
                            ▼
                 kafka-redis-consumer (Py, consumer group)
                            │
                            ▼
                         Redis (24h TTL)
                            │
                            ▼
                 WebSocket Server (Spring Boot)
                            │
                            ▼
                      브라우저 (실시간 지도)

MQTT로 들어온 좌표 이벤트를 Telegraf가 Kafka에 밀어넣고, Python consumer가 Redis로 옮기고, Spring Boot 서버가 WebSocket으로 클라이언트에 밀어주는 구조. Device ID를 Kafka 파티션 key로 써서 같은 기기의 메시지는 순서가 보장되도록 했다.

처음엔 그냥 로그 파일이었다

처음 버전은 Kafka 없었다. 기기 한 대 붙여놓고 테스트하는 단계라 MQTT 메시지를 받아서 파일에 그냥 append 하고 있었음.

mosquitto_sub -t 'devices/+/coord' >> coords.log

쉘 리다이렉션 >> 한 줄로 끝. 파일 하나에 계속 쌓이고, 필요하면 tail -f 로 보고, grep으로 훑으면 되니까 개발 중엔 이게 제일 빠름. “굳이 뭘 더 넣지?” 싶었다.

실제 프로젝트 요구사항이 “수천 대 동시 접속”은 아니었다. 그런데 혼자 굴려보다가 “이거 기기 수십, 수백, 수천 대 올라오면 그대로 버틸 수 있나?” 하는 의문이 들었고, 머릿속으로 시뮬레이션 돌려보니 이 방식이 전부 무너졌다.

문제한 대일 때수천 대일 때
쓰기 경합괜찮음여러 프로세스가 한 파일 append → flock 지옥
파일 크기괜찮음수 GB/시간. rotate 안 하면 디스크 폭발
순서자연히 시간순프로세스 여러 개면 섞이고 깨짐
소비 병렬화tail -f 하나면 됨같은 파일을 N명이 읽으면 offset 관리 불가
장애 복구재시작하면 끝“어디부터 읽어야 하지?”가 매번 문제
리플레이head/sed구조가 없어서 찾기 힘듦

파일 append는 본질적으로 단일 writer, 단일 reader 가정의 도구였다. 그래서 “여러 writer가 동시에, 여러 reader가 각자 속도로, 안전하게” 가 필요해진 순간 파일 로그의 분산 버전이 필요 해졌고, 그게 Kafka였다.

나중에 Shallow Dive 첫 편에서 “Kafka는 분산된 append-only 로그”라고 정리했을 때 “그거 내가 처음에 쓰던 >> coords.log 의 확장판이잖아” 싶어서 오히려 개념이 바로 꽂혔다. 시작이 파일 append였던 게 결과적으로 Kafka를 이해하는 가장 좋은 진입점이었다.

왜 이 구성이 됐나

“파일 append 말고 뭘?”로 고민 시작하니까 요구사항이 자연스럽게 정리됐다.

  • 기기 수가 늘어나도 처리량이 선형으로 버텨야 함
  • 잠깐 consumer가 죽어도 이벤트가 유실되면 안 됨
  • 나중에 “특정 기간 데이터 리플레이”도 해야 함
  • 최신 좌표는 매우 자주 조회됨 (웹 클라이언트 N명이 계속 요청)

이 4개를 풀려고 컴포넌트가 하나씩 늘었다.

요구파일 append로는?최종 해결
처리량 확장단일 파일 writer 경합Kafka 파티션 + consumer group
유실 방지fsync 관리 직접Kafka가 24시간 보존 (consumer 복구 시 이어서)
리플레이head/sed로 수동offset 되감기 (Kafka 특성상 공짜)
빠른 최신값 조회매번 파일 스캔 불가Redis에 최신 좌표만 캐시

결국 Kafka를 넣은 이유가 “큐가 아니라 로그” 라는 한 줄로 정리됨. 파일 append의 장점(순차 기록, 재처리 가능)은 그대로 가져가면서 분산/병렬/내구성만 추가한 도구가 Kafka였다는 걸 프로젝트 끝나고 나서 딥다이브하며 정식으로 납득.

잘한 것 / 손에 붙은 것

  • KRaft로 시작 — Zookeeper 안 쓰고 바로 KRaft. 운영할 게 하나 줄었고, 컨테이너 수도 적음
  • Device ID 파티션 키 — 같은 기기 메시지 순서 깨질 일 없음. 이건 나중에 consumer 쪽 코드가 많이 단순해진 원인
  • Consumer group 설계coordinate-consumer-group 한 그룹으로 여러 pod 병렬 소비. 인스턴스 늘려서 스케일 아웃이 바로 됨
  • 벤치마크 디렉토리 — 배치 사이즈별 성능, consumer group 유무로 offset 유지되는지 여부 등을 직접 스크립트로 돌려봄. “감”이 아니라 “숫자”로 확인해보려는 습관이 생김
  • Telegraf를 브릿지로 선택 — 직접 MQTT consumer 짜서 Kafka로 보내는 코드 짤까 했다가, Telegraf의 mqtt_consumer → kafka output plugin 조합이 설정 몇 줄로 끝나길래 그쪽으로 감. “있는 도구 쓰면 코드 안 짜도 된다”는 걸 이번에 체감
  • MQTT Source Connector도 한 번 붙여봄 — Confluent의 MQTT Source Connector를 Kafka Connect 위에 붙여서 Telegraf 대안으로도 돌려봄. 결과적으론 Telegraf가 더 가벼워서 채택했지만, “같은 일을 다른 방식으로 풀어보는” 경험은 도구 선택 감각에 도움됨

놓친 것 / Shallow Dive 끝나고 보이는 구멍

시리즈 공부하면서 “어… 이거 내 프로젝트에서 안 했는데” 싶었던 것들.

1. 내구성 설정이 반쪽 (Producer / Replication 편)

Producer에 acks=all은 넣었는데, broker는 RF=1 단일이라 의미 없음. ISR=1이니까 broker 한 대 죽으면 그냥 끝. 개발/데모용으로는 괜찮지만 “내구성 있다”고 말하기엔 부족했다.

enable.idempotence=true도 빼먹음. 재시도로 인한 중복이 이론적으로 가능했는데 consumer 쪽에서 딱히 멱등 처리도 안 했으니 동일 좌표가 두 번 쌓일 수 있었던 듯.

2. Consumer offset 관리가 대충 (Consumer 편)

enable_auto_commit=True + 5초 간격. 처리 중에 죽으면 이미 커밋된 메시지가 날아갈 수도, 덜 커밋된 게 다시 올 수도 있는 애매한 상태. 프로젝트 특성상 좌표는 “최신값만 맞으면 되니까” 큰 문제는 없었지만, 진짜 안전을 원하면 manual commit + rebalance listener 조합이 맞다.

CooperativeStickyAssignor도 안 썼다. 배포할 때마다 rebalance로 lag이 튀는 걸 그냥 “pod 수 작으니까 괜찮아” 하고 넘겼는데, 지금 같으면 바로 넣을 설정.

3. 모니터링이 블랙박스 (운영 편)

docker compose logskafka-console-consumer로 동작만 확인한 수준. JMX 노출도, Prometheus exporter도, lag exporter도 없었다. 문제가 생기면 “느려진 것 같아”에서 멈추고 숫자가 없음. 진짜 운영 환경이었으면 새벽에 불려나왔을 것.

4. 스키마가 JSON 문자열 (Schema Registry 편)

“Avro 쓰면 안전하겠지” 수준의 막연한 느낌이 아니라, 실제로 지금 코드를 열어보니 스키마 없음 때문에 이미 잔잔한 구멍이 여러 개 있었다.

(1) 같은 프로젝트 안에서 필드명이 통일 안 됨

소스payload 필드
README 샘플{"x": 123.45, "y": 67.89}
테스트 코드{"x": 99.99, "y": 88.88}
벤치마크 producer{"coordX": 37.5..., "coordY": 127.0...}
Consumer가 실제 검증coordX, coordY

즉 README 예시대로 x/y 로 publish하면 consumer validation을 통과 못한다. Avro + Schema Registry가 있었으면 PR 단계에서 바로 터졌을 이야기.

(2) 이중 JSON 중첩

Telegraf 설정을 data_format="value" + data_type="string" 으로 두는 바람에, MQTT payload가 파싱 안 된 채 문자열로 감싸져서 Kafka에 들어감.

{
  "tags":   {"mqtt_topic": "sensors/ORIN001/coordinates", "device_id": "ORIN001"},
  "fields": {"value": "{\"coordX\": 37.5, \"coordY\": 127.0}"},    문자열
  "timestamp": 1691234567
}

Consumer는 message.value 를 JSON 파싱하고, 그 안의 fields.value 를 다시 JSON 파싱해야 좌표에 닿는다. Telegraf에서 data_format="json" 으로 받아 field를 flatten 했으면 훨씬 깔끔했을 설계 실수.

(3) 관례만 있고 강제는 없는 것들

  • timestamp 가 초(README)와 밀리초(벤치마크)로 섞여 있음
  • device_id 는 payload 바깥(토픽 경로)에 있어서 토픽 네이밍 바꾸면 전부 터짐
  • 위 모두 코드 주석/README 메모로만 관리됨

이걸 Avro로 묶었다면:

  • 필드명 불일치 → 스키마 등록 시점에 reject
  • 이중 중첩 → Telegraf 쪽 포맷이 스키마에 안 맞아 당장 조정
  • timestamp/device_id → record 안에 logicalType: timestamp-millisdevice_id: string 필수 필드로 명시

스키마 없음의 진짜 비용은 “버그가 조용히 쌓인다” 는 거였다. 프로젝트 규모라 버틴 게 아니라, 버티는 중에 이미 새고 있었던 것.

5. 복잡한 흐름은 Python consumer에 몰아넣음 (Streams 편)

좌표 검증, 범위 체크, Redis 적재를 Python consumer 코드에 직접 짰다. 간단한 로직이라 그랬는데, 나중에 “직전 5분 평균 속도” 같은 stateful aggregation이 필요해지면 Kafka Streams로 갔어야 할 영역. Python으로 상태 관리하면 장애 복구 로직을 직접 짜야 해서 복잡해짐.

Kafka 외적으로도 배운 것

  • depends_on 만으론 부족하다 — docker-compose의 depends_on 은 “컨테이너 시작 순서”만 보장하지 “서비스가 준비됐는지“는 안 봄. Kafka 컨테이너는 떠 있어도 브로커가 아직 listener를 열기 전인데 producer가 먼저 붙어서 실패하는 케이스가 반복됨. 결국 Python script로 health check + 토픽 초기화를 따로 만들어서 make up 뒤에 끼워 넣었음. “컨테이너 기동 ≠ 서비스 준비”라는 감각이 생긴 포인트
  • benchmark를 프로젝트의 서브모듈처럼 — 설정 바꾸면 벤치마크 돌려보는 습관. 숫자가 없으면 설정 고칠 근거가 없음

다음에 같은 걸 만든다면

지금 다시 설계한다고 가정하고 고칠 것.

레이어BeforeAfter
Broker단일 (RF=1)3노드 (RF=3, min.ISR=2)
Produceracks=all only+ enable.idempotence=true
Consumerauto commit 5smanual commit + CooperativeSticky
포맷JSON 문자열Avro + Schema Registry
집계Python consumer에서 직접Kafka Streams (최소한 windowed count)
모니터링없음JMX exporter + Grafana + lag exporter
장애 테스트없음broker 강제 종료/네트워크 지연 시뮬레이션

다 한꺼번에 넣는 건 overkill이지만, 최소한 RF≥2 + manual commit + lag 모니터링 이 3개는 “돌아가는 파이프라인”이 아니라 “운영 가능한 파이프라인”의 최소치라는 감이 생겼다.

시니어 데이터 엔지니어라면 (ChatGPT와 대화)

위 “다음에 같은 걸 만든다면” 이 점진적 개선이라면, 이 섹션은 “처음부터 제대로 설계한다면” — 스케일·조직·거버넌스까지 고려한 아키텍처. 현재는 overkill일 수 있지만 “방향”으로 알고 있으면 의사결정 기준이 생긴다.

전체 그림

 ┌──────────────────────── INGEST ──────────────────────────┐
 │                                                           │
 │  IoT devices  ─── mTLS/MQTT ───▶  EMQX Cluster (HA)       │
 │                                        │                  │
 │                                        ▼                  │
 │                               EMQX→Kafka Bridge           │
 │                               (backpressure, retry)       │
 └────────────────────────────────────────┬──────────────────┘
                                          │
 ┌──────────────────────── MESSAGING ─────┼──────────────────┐
 │                                        ▼                  │
 │  Kafka Cluster (3 brokers, KRaft, RF=3, min.ISR=2, rack)  │
 │                                                            │
 │    iot.coord.raw.v1        ─┐                             │
 │    iot.coord.validated.v1  ─┼── Schema Registry           │
 │    iot.coord.enriched.v1   ─┤   (Avro, BACKWARD)          │
 │    iot.coord.dlq.v1        ─┘                             │
 └────────────────────────────────────────┬──────────────────┘
                                          │
 ┌──────────────────────── PROCESSING ────┼──────────────────┐
 │                                        ▼                  │
 │   Kafka Streams  (processing.guarantee = exactly_once_v2) │
 │    ├─ validate (range, finite, dedup by event_id)         │
 │    ├─ enrich   (device-meta KTable join)                  │
 │    ├─ window   (5분 평균/이동거리 등 집계)                   │
 │    └─ route    (invalid → DLQ, valid → validated 토픽)     │
 └────────────────────────────────────────┬──────────────────┘
                                          │
 ┌──────────────────────── SERVE ─────────┼──────────────────┐
 │                                        ▼                  │
 │   Kafka Connect (distributed)                              │
 │    ├─ Redis Sink         (latest coord, TTL)              │
 │    ├─ S3/MinIO Sink      (Parquet, hourly 파티션)           │
 │    ├─ TimescaleDB Sink   (시계열 분석, Grafana 직결)        │
 │    └─ Debezium           (device-meta DB CDC → Kafka)     │
 │                                        │                  │
 │                                        ▼                  │
 │         WebSocket Server ◀── Redis (Spring Boot)           │
 │                                        │                  │
 │                                        ▼                  │
 │                               브라우저 (실시간 지도)         │
 └───────────────────────────────────────────────────────────┘

 ┌───────────── OBSERVABILITY (cross-cutting) ──────────────┐
 │  JMX → Prometheus JMX Exporter                            │
 │  kafka-lag-exporter (consumer group lag)                  │
 │  Micrometer (앱 메트릭)                                    │
 │  OpenTelemetry (MQTT → Kafka headers로 trace 전파)         │
 │  Grafana dashboards + Alertmanager + Loki                 │
 └───────────────────────────────────────────────────────────┘

 ┌────────────── PLATFORM (cross-cutting) ──────────────────┐
 │  Kubernetes + Strimzi (Kafka operator)                    │
 │  TLS everywhere + SASL + Kafka ACLs                       │
 │  Vault (secrets) + SOPS + GitOps (ArgoCD)                 │
 └───────────────────────────────────────────────────────────┘

레이어별 변경점

INGEST — Mosquitto/Telegraf → EMQX + mTLS

  • Mosquitto 단일 → EMQX 클러스터 (HA, auto-reconnect, built-in Kafka bridge)
  • 장치 인증 username/password → mTLS. 장치별 인증서로 revoke 가능
  • Telegraf의 애매한 data_format=value 대신 EMQX rule engine에서 JSON 필드를 바로 파싱해 Kafka 헤더/키/값으로 매핑 → 이중 JSON 중첩 문제 원천 제거

MESSAGING — 단일 토픽 → 처리 단계별 토픽 분리 + Schema Registry

  • 1 broker → 3 brokers, RF=3, min.ISR=2, rack-aware (Replication 편)
  • 단일 mqtt-messagesraw → validated → enriched 단계별 토픽. 단계마다 contract가 다름
  • DLQ 토픽으로 실패 레코드 격리 (alert 연결)
  • Avro + Schema Registry (Schema Registry 편). BACKWARD 호환성으로 CI에서 스키마 변경 사전 검증. x/y vs coordX/coordY 같은 불일치는 PR에서 reject

PROCESSING — Python 모놀리스 → Kafka Streams (EOS v2)

  • 좌표 검증, enrich, 집계를 Python consumer 안에 섞어놓은 걸 Kafka Streams 앱으로 분리 (Streams 편)
  • processing.guarantee=exactly_once_v2 (Exactly-once 편) — validated 토픽에 중복 없이
  • 디바이스 메타데이터는 KTable로 들고, 좌표 스트림과 join해서 enrich
  • 5분 windowed aggregation 같은 stateful 로직이 자연스럽게 붙음 (직접 상태 관리 X)
  • 잘못된 레코드는 DLQ 토픽으로 route, 재처리 가능

SERVE — custom consumer → Kafka Connect sinks 조합

  • 현재: Python consumer가 Redis 직접 쓰기
  • 목표: Redis Sink Connector 로 Kafka → Redis 자동화. 앱 코드 제거
  • S3/MinIO Sink (Parquet, 시간 파티션) — 데이터 레이크, Athena/Trino로 historical query
  • TimescaleDB — 시계열 분석 (직전 24시간 궤적, heatmap 등)
  • Debezium — device 메타 DB 변경을 Kafka로 (KTable join용)
  • WebSocket 서버는 Redis를 그대로 읽음 (변경 없음)

OBSERVABILITY — 로그 의존 → 풀 스택 관측 (운영 편)

  • JMX Exporter → Prometheus (broker 메트릭: UnderReplicatedPartitions 등)
  • kafka-lag-exporter — consumer lag alert 기준
  • Micrometer + OpenTelemetry — 앱 메트릭 + 분산 trace
  • MQTT payload에 trace id 심어서 디바이스 → 브라우저까지 end-to-end trace

PLATFORM — docker-compose → K8s + Strimzi

  • docker-compose는 단일 머신 한계. Kubernetes + Strimzi operator로 선언형 Kafka 관리
  • TLS + SASL + Kafka ACLs — 현재는 평문 + 토픽 전체 공개
  • Vault 로 secret, GitOps (ArgoCD) 로 배포

의사결정 매트릭스

“이 중 뭘 먼저?” 를 실제 운영 리스크 기준으로 나열.

우선순위항목이유
1RF=3 + min.ISR=2브로커 한 대 죽으면 끝나는 현재 상태가 제일 큰 리스크
2Prometheus + lag exporter“문제 생겼을 때 숫자로 볼 수 있어야” — 나머지 투자의 전제
3Schema Registry + Avro이미 x/y vs coordX/coordY 구멍이 생긴 상태라 CI 게이트 필요
4Kafka Streams로 validate/enrich 분리앱 코드 줄이고 stateful 요구사항에 대비
5Connect Sink 교체 (Redis/S3)운영 코드 감소
6EMQX + mTLS디바이스 수/보안 요구 커지면
7K8s + Strimzi팀 규모/멀티 환경 생기면

각 항목은 바로 앞 항목이 있어야 가치가 커진다 — 예를 들어 Kafka Streams(4)를 EOS로 쓰려면 Schema Registry(3)가 있어야 효과적이고, Schema Registry는 관측(2) 없이는 스키마 문제를 조용히 넘어감.

무엇이 overkill인가

시니어 관점이라고 전부 도입하자는 게 아니다. 각 선택의 손익분기를 알아야 함.

  • EMQX: 디바이스 수 < 수천대면 Mosquitto로 충분. HA가 핵심 아니면 불필요
  • TimescaleDB: “시계열 분석 쿼리가 실제로 필요한가?” 부터. Redis + S3만으로 커버되면 skip
  • Kafka Streams: JVM 팀 없으면 ksqlDB 또는 Flink 고려. Python에 남기는 것도 선택지
  • Kubernetes: 팀 1명, 환경 1개면 docker-compose가 오히려 합리적. 복잡도 비용 무시 못함
  • OpenTelemetry end-to-end trace: 정말 “왜 이 디바이스 이벤트가 늦지?”를 자주 디버깅해야 할 때. 그렇지 않으면 lag 메트릭만으로도 충분

“뭘 넣을까”보다 “왜 아직 넣지 않는가”를 설명할 수 있는 게 시니어의 선택 감각 이라는 걸 이 시리즈 통해 배움.

정리

프로젝트를 하고 나서 개념 공부(Kafka Shallow Dive)를 한 순서가 결과적으로 좋았다. 프로젝트에서 “대충 이렇게 하면 돌아간다”로 넘어갔던 부분들이 딥다이브할 때마다 하나씩 “아 그때 그게 그래서 그랬구나” 로 채워졌다. 다음 프로젝트는 지금의 체크리스트를 들고 시작할 수 있음. 그리고 회고 쓸 때 구멍이 뭔지 구체적으로 댈 수 있게 된 것 자체가 수확.

한 줄 요약: “돌아가는 파이프라인”을 만들어 봤고, 이제 “운영 가능한 파이프라인”은 뭐가 달라야 하는지 말할 수 있게 됐다.