Kafka Shallow Dive - Kafka Connect
Published:
Producer/Consumer를 직접 짜다 보면 금방 깨달음이 온다. “DB 테이블 → Kafka, Kafka → S3 같은 반복적인 연결을 매번 코드로 짜야 하나?” 그럴 필요 없게 만든 게 Kafka Connect.
Kafka Connect ?
Kafka와 외부 시스템을 설정 기반으로 연결해주는 프레임워크. 코드 대신 JSON 설정만 주면 돌아감.
- Source connector — 외부 시스템 → Kafka (예: MySQL →
orderstopic) - Sink connector — Kafka → 외부 시스템 (예:
eventstopic → S3) - 별도 프로세스(worker)로 띄움 — Kafka 브로커와 분리된 JVM 클러스터
왜 좋은가:
- 반복되는 통합 코드 안 짜도 됨
- 분산/장애복구 내장 (distributed mode)
- REST API로 connector 관리
- 수백 가지 공식/커뮤니티 커넥터 (Confluent Hub)
구조
┌──────── Connect Cluster ─────────┐
│ │
MySQL ─▶ JDBC Source Connector ──▶ │
│ ├─▶ Kafka
│ S3 Sink Connector ◀─┤ │
│ │
└───────────────────────────────────┘
- worker 여러 대가 connector를 나눠서 실행 (task 단위 분산)
- 상태는 내부 Kafka topic (config, offsets, status)에 저장
- 한 worker 죽으면 task가 자동으로 다른 worker로 옮겨감
내부 topic 3개:
connect-configs— connector 설정connect-offsets— 각 task의 진행 위치 (예: MySQL binlog position)connect-status— 실행 상태
이 3개가 있어서 worker 재시작/이동해도 상태가 유지됨.
실행 모드
| 모드 | 특징 | 용도 |
|---|---|---|
| Standalone | 단일 프로세스, 설정 파일 기반 | 개발, 로컬 테스트 |
| Distributed | 여러 worker 클러스터, REST API | 운영 |
Distributed가 표준. 설정 파일 대신 REST로 connector 생성/수정.
Source Connector 예시: JDBC
MySQL의 orders 테이블을 Kafka로.
curl -X POST http://connect:8083/connectors -H "Content-Type: application/json" -d '
{
"name": "orders-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://db:3306/shop",
"connection.user": "readonly",
"connection.password": "${file:/secrets/db.pw:pw}",
"table.whitelist": "orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mysql.shop.",
"poll.interval.ms": "5000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}'
이걸 올리면 mysql.shop.orders topic이 자동 생성되고, 새로 들어오는 row가 계속 Kafka로 들어옴.
JDBC의 mode:
incrementing— auto-increment id로 신규만 추적 (업데이트는 못 잡음)timestamp— updated_at 컬럼 기반 (업데이트도 잡음, 단조 증가 필요)timestamp+incrementing— 둘 조합 (권장)bulk— 매 주기 전체 테이블 재수집 (소규모 참조 테이블용)
JDBC는 간단하지만 한계가 있음: DELETE는 못 잡고, 대규모에선 DB에 부담. 진짜 CDC는 Debezium이 정답.
Debezium — 진짜 CDC
DB의 트랜잭션 로그(MySQL binlog, PostgreSQL WAL, MongoDB oplog 등)를 직접 읽어서 변경 이벤트를 Kafka로 보냄.
장점:
- INSERT/UPDATE/DELETE 전부 캡처
- DB에 부하 거의 없음 (read replica 없이도 OK)
- 이벤트 순서 보장
{
"name": "orders-debezium",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "db",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${file:/secrets/dbz.pw:pw}",
"database.server.id": "223344",
"topic.prefix": "dbz.shop",
"database.include.list": "shop",
"table.include.list": "shop.orders,shop.users",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "dbz.schema-history"
}
}
이벤트 포맷 (간단화):
{
"op": "u", // c=create, u=update, d=delete, r=read(snapshot)
"before": { "id": 1, "status": "pending" },
"after": { "id": 1, "status": "paid" },
"ts_ms": 1693564800000,
"source": { "db": "shop", "table": "orders", "pos": 12345 }
}
before/after로 변경 전후가 다 나옴. outbox 패턴과 조합하면 end-to-end exactly-once 수준의 일관성도 가능.
Sink Connector 예시: S3
events topic을 S3에 Parquet으로 저장.
{
"name": "events-s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "events",
"s3.bucket.name": "my-datalake",
"s3.region": "us-east-1",
"flush.size": "10000",
"rotate.interval.ms": "60000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en-US",
"timezone": "UTC",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"storage.class": "io.confluent.connect.s3.storage.S3Storage"
}
}
Kafka에 쌓인 이벤트를 S3 데이터레이크로 내리는 가장 흔한 패턴. 시간 파티셔닝으로 Athena/Trino에서 바로 쿼리.
Converters
Kafka에 저장할 때 key/value를 어떤 포맷으로 직렬화할지.
| Converter | 설명 |
|---|---|
| StringConverter | 문자열 (JSON 문자열 포함) |
| JsonConverter | JSON (+schema 옵션) |
| AvroConverter | Avro + Schema Registry |
| ProtobufConverter | Protobuf + Schema Registry |
Source/Sink 양쪽의 converter가 호환되어야 함. Avro + Schema Registry 조합이 현업 기본. Schema Registry 편에서 자세히.
REST API
Connect는 UI가 기본은 없고 REST로 다 관리.
# connector 목록
curl http://connect:8083/connectors
# 상태
curl http://connect:8083/connectors/orders-source/status
# 재시작
curl -X POST http://connect:8083/connectors/orders-source/restart
# 태스크 재시작
curl -X POST http://connect:8083/connectors/orders-source/tasks/0/restart
# 삭제
curl -X DELETE http://connect:8083/connectors/orders-source
UI는 Confluent Control Center, Kafka UI, kafka-connect-ui 같은 걸 붙여서 씀.
SMT (Single Message Transform)
Connector 안에서 간단한 변환 — 필드 이름 바꾸기, 마스킹, route 변경 등. 별도 앱 안 만들어도 되니 유용.
"transforms": "mask,route",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "password,ssn",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "mysql.shop.(.*)",
"transforms.route.replacement": "shop.$1"
단, 복잡한 변환은 ksqlDB나 Kafka Streams로 빼는 게 낫다 — SMT는 stateless, per-record만 가능.
운영 팁
- tasks.max 적정화 — source는 테이블 수/샤드 수, sink는 topic partition 수가 상한
- offset flush 주기 —
offset.flush.interval.ms기본 10초. 너무 자주면 부담, 너무 길면 재시작 시 중복 구간 커짐 - DLQ (Dead Letter Queue) —
errors.tolerance=all+errors.deadletterqueue.topic.name설정으로 에러 레코드 격리 - 설정 secret 관리 —
ConfigProvider로${file:...},${vault:...}같은 외부 참조 사용 (평문 금지) - 스키마 관리 — Avro + Schema Registry로 production에선 거의 필수
정리
Kafka Connect는 “외부 시스템 ↔ Kafka” 연결의 표준화. JDBC로는 간단한 동기화, Debezium으론 진짜 CDC가 가능해진다. 내가 짜던 custom producer/consumer 중 상당 부분이 사실 Connect로 대체됐어야 했다는 걸 이번 주에 깨달음. 유지보수 대상 코드가 줄면 그게 곧 품질.
다음 주는 스키마 관리 — Avro + Schema Registry로 스키마 진화를 안전하게 하는 법.
