Kafka Shallow Dive - Kafka Connect

Published:

Producer/Consumer를 직접 짜다 보면 금방 깨달음이 온다. “DB 테이블 → Kafka, Kafka → S3 같은 반복적인 연결을 매번 코드로 짜야 하나?” 그럴 필요 없게 만든 게 Kafka Connect.

Kafka Connect ?

Kafka와 외부 시스템을 설정 기반으로 연결해주는 프레임워크. 코드 대신 JSON 설정만 주면 돌아감.

  • Source connector — 외부 시스템 → Kafka (예: MySQL → orders topic)
  • Sink connector — Kafka → 외부 시스템 (예: events topic → S3)
  • 별도 프로세스(worker)로 띄움 — Kafka 브로커와 분리된 JVM 클러스터

왜 좋은가:

  • 반복되는 통합 코드 안 짜도 됨
  • 분산/장애복구 내장 (distributed mode)
  • REST API로 connector 관리
  • 수백 가지 공식/커뮤니티 커넥터 (Confluent Hub)

구조

        ┌──────── Connect Cluster ─────────┐
        │                                   │
 MySQL ─▶ JDBC Source Connector ──▶        │
        │                          ├─▶ Kafka
        │   S3 Sink Connector   ◀─┤         │
        │                                   │
        └───────────────────────────────────┘

- worker 여러 대가 connector를 나눠서 실행 (task 단위 분산)
- 상태는 내부 Kafka topic (config, offsets, status)에 저장
- 한 worker 죽으면 task가 자동으로 다른 worker로 옮겨감

내부 topic 3개:

  • connect-configs — connector 설정
  • connect-offsets — 각 task의 진행 위치 (예: MySQL binlog position)
  • connect-status — 실행 상태

이 3개가 있어서 worker 재시작/이동해도 상태가 유지됨.

실행 모드

모드특징용도
Standalone단일 프로세스, 설정 파일 기반개발, 로컬 테스트
Distributed여러 worker 클러스터, REST API운영

Distributed가 표준. 설정 파일 대신 REST로 connector 생성/수정.

Source Connector 예시: JDBC

MySQL의 orders 테이블을 Kafka로.

curl -X POST http://connect:8083/connectors -H "Content-Type: application/json" -d '
{
  "name": "orders-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://db:3306/shop",
    "connection.user": "readonly",
    "connection.password": "${file:/secrets/db.pw:pw}",
    "table.whitelist": "orders",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "mysql.shop.",
    "poll.interval.ms": "5000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}'

이걸 올리면 mysql.shop.orders topic이 자동 생성되고, 새로 들어오는 row가 계속 Kafka로 들어옴.

JDBC의 mode:

  • incrementing — auto-increment id로 신규만 추적 (업데이트는 못 잡음)
  • timestamp — updated_at 컬럼 기반 (업데이트도 잡음, 단조 증가 필요)
  • timestamp+incrementing — 둘 조합 (권장)
  • bulk — 매 주기 전체 테이블 재수집 (소규모 참조 테이블용)

JDBC는 간단하지만 한계가 있음: DELETE는 못 잡고, 대규모에선 DB에 부담. 진짜 CDC는 Debezium이 정답.

Debezium — 진짜 CDC

DB의 트랜잭션 로그(MySQL binlog, PostgreSQL WAL, MongoDB oplog 등)를 직접 읽어서 변경 이벤트를 Kafka로 보냄.

장점:

  • INSERT/UPDATE/DELETE 전부 캡처
  • DB에 부하 거의 없음 (read replica 없이도 OK)
  • 이벤트 순서 보장
{
  "name": "orders-debezium",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "db",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "${file:/secrets/dbz.pw:pw}",
    "database.server.id": "223344",
    "topic.prefix": "dbz.shop",
    "database.include.list": "shop",
    "table.include.list": "shop.orders,shop.users",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "dbz.schema-history"
  }
}

이벤트 포맷 (간단화):

{
  "op": "u",                  // c=create, u=update, d=delete, r=read(snapshot)
  "before": { "id": 1, "status": "pending" },
  "after":  { "id": 1, "status": "paid" },
  "ts_ms":  1693564800000,
  "source": { "db": "shop", "table": "orders", "pos": 12345 }
}

before/after로 변경 전후가 다 나옴. outbox 패턴과 조합하면 end-to-end exactly-once 수준의 일관성도 가능.

Sink Connector 예시: S3

events topic을 S3에 Parquet으로 저장.

{
  "name": "events-s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "3",
    "topics": "events",
    "s3.bucket.name": "my-datalake",
    "s3.region": "us-east-1",
    "flush.size": "10000",
    "rotate.interval.ms": "60000",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "3600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en-US",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage"
  }
}

Kafka에 쌓인 이벤트를 S3 데이터레이크로 내리는 가장 흔한 패턴. 시간 파티셔닝으로 Athena/Trino에서 바로 쿼리.

Converters

Kafka에 저장할 때 key/value를 어떤 포맷으로 직렬화할지.

Converter설명
StringConverter문자열 (JSON 문자열 포함)
JsonConverterJSON (+schema 옵션)
AvroConverterAvro + Schema Registry
ProtobufConverterProtobuf + Schema Registry

Source/Sink 양쪽의 converter가 호환되어야 함. Avro + Schema Registry 조합이 현업 기본. Schema Registry 편에서 자세히.

REST API

Connect는 UI가 기본은 없고 REST로 다 관리.

# connector 목록
curl http://connect:8083/connectors

# 상태
curl http://connect:8083/connectors/orders-source/status

# 재시작
curl -X POST http://connect:8083/connectors/orders-source/restart

# 태스크 재시작
curl -X POST http://connect:8083/connectors/orders-source/tasks/0/restart

# 삭제
curl -X DELETE http://connect:8083/connectors/orders-source

UI는 Confluent Control Center, Kafka UI, kafka-connect-ui 같은 걸 붙여서 씀.

SMT (Single Message Transform)

Connector 안에서 간단한 변환 — 필드 이름 바꾸기, 마스킹, route 변경 등. 별도 앱 안 만들어도 되니 유용.

"transforms": "mask,route",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "password,ssn",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "mysql.shop.(.*)",
"transforms.route.replacement": "shop.$1"

단, 복잡한 변환은 ksqlDB나 Kafka Streams로 빼는 게 낫다 — SMT는 stateless, per-record만 가능.

운영 팁

  • tasks.max 적정화 — source는 테이블 수/샤드 수, sink는 topic partition 수가 상한
  • offset flush 주기offset.flush.interval.ms 기본 10초. 너무 자주면 부담, 너무 길면 재시작 시 중복 구간 커짐
  • DLQ (Dead Letter Queue)errors.tolerance=all + errors.deadletterqueue.topic.name 설정으로 에러 레코드 격리
  • 설정 secret 관리ConfigProvider${file:...}, ${vault:...} 같은 외부 참조 사용 (평문 금지)
  • 스키마 관리 — Avro + Schema Registry로 production에선 거의 필수

정리

Kafka Connect는 “외부 시스템 ↔ Kafka” 연결의 표준화. JDBC로는 간단한 동기화, Debezium으론 진짜 CDC가 가능해진다. 내가 짜던 custom producer/consumer 중 상당 부분이 사실 Connect로 대체됐어야 했다는 걸 이번 주에 깨달음. 유지보수 대상 코드가 줄면 그게 곧 품질.

다음 주는 스키마 관리 — Avro + Schema Registry로 스키마 진화를 안전하게 하는 법.