문제의 시작

CDC를 도입할 때 어려운 부분은 connector를 띄우는 것만이 아니다. Database change를 어떤 event pipeline으로 옮길 것인지, Kafka와 Debezium을 application cluster 안에서 어떻게 운영할 것인지, offset이 깨졌을 때 어떻게 복구할 것인지까지 함께 고민해야 한다. 이 글은 Strimzi로 Kafka와 Debezium을 Kubernetes에 올리며 겪은 설정과 운영 포인트를 정리한 기록이다.

Debezium은 대표적인 CDC(Change Data Capture) 툴이다.

CDC를 쓰는 이유로는 데이터베이스의 변화를 빠르게 감지하여 이벤트를 발생시켜 작업을 함에 있다.

도입하게 된 유즈케이스를 간단히 설명하자면,

어플리케이션에서 POST api 인입 -> A 데이터베이스에 정보 저장 -> 외부 api 호출 -> 호출 성공시 B 데이터베이스에 정보 저장 -> 다른 service method들 실행

의 과정에서, 외부 api 가 실패하더라도 A 데이터베이스가 업데이트되어 한 트랜잭션에 묶이지 않았기 때문이다.

@Transactional 을 걸고 실패시 롤백 핸들링을 하면 되지 않나 싶겠지만, 외부 api 호출을 async로 던지고 있었기에 불가능했다.

목표는 외부 API 호출을 먼저 수행하고, 성공한 경우에만 Debezium이 감지할 수 있는 table에 정보를 저장한 뒤 event를 발생시켜 후속 작업을 실행하는 것이었다.

여기서 이벤트를 발생시키고 받는 과정에 여러 pub/sub이나 MQ가 사용 가능한데, Kafka를 사용하는 방법을 작성하고자 하며, 그 중에서도 Kafka와 Debezium을 따로 인스턴스로 띄우는 것이 아닌 k8s 클러스터 내에 pod으로 띄워 기존의 어플리케이션과의 연결이 쉽도록 한다.

Kafka를 k8s에 쉽게 배포하는 방법으로 Strimzi가 있다. 총 세 개의 yaml 파일로 Kafka cluster, connect, connector를 배포하여 사용할 수 있는데, 그 과정을 정리해보겠다.

우선 사용할 namespace를 정의해주자.

구현하면서 확인한 흐름

kubectl create ns strimzi-debezium

추가적으로 helm을 사용하고 있다면

helm install strimzi-operator strimzi/strimzi-kafka-operator -n strimzi-debezium

다음으로 Kafka를 특수 object로 가지는 클러스터를 생성해보자.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: strimzi-debezium
spec:
  kafka:
    version: 3.7.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 10Gi
          deleteClaim: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      ssl.endpoint.identification.algorithm: ""
      log4j.logger.org.apache.kafka: DEBUG
      log4j.logger.kafka: DEBUG
    resources:
      requests:
        memory: 1Gi
        cpu: "0.5"
      limits:
        memory: 2Gi
        cpu: "1"
    template:
      kafkaContainer:
        env:
          - name: KAFKA_HEAP_OPTS
            value: "-Xms1G -Xmx1G"
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
    resources:
      requests:
        memory: 512Mi
        cpu: "0.5"
      limits:
        memory: 1Gi
        cpu: "1"
    config:
      autopurge.snapRetainCount: 3
      autopurge.purgeInterval: 1
      clientCnxnSocket: org.apache.zookeeper.ClientCnxnSocketNetty
      serverCnxnFactory: org.apache.zookeeper.server.NettyServerCnxnFactory
      quorum.ssl.enabled: true
      ssl.quorum.protocol: TLSv1.2
      tickTime: 2000
      initLimit: 15
      syncLimit: 10
    logging:
      type: inline
      loggers:
        zookeeper.root.logger: "INFO,CONSOLE"
    template:
      pod:
        securityContext:
          runAsUser: 1001
          fsGroup: 1001
  entityOperator:
    topicOperator: {}
    userOperator: {}
  clusterCa:
    generateCertificateAuthority: true
  clientsCa:
    generateCertificateAuthority: true

여기에서는 kafka와 zookeeper를 각각 3개씩 생성하였고, cpu와 memory 할당을 작지 않게 함을 볼 수 있는데, replica 개수는 용도에 맞게 세팅해도 되겠으나, cpu와 memory는 database 변화 감지를 정상적으로 전달하는데 있어 최소 스펙인것 같기도 하다.

또 apiVersion에는 v1beta2 외에도 v1alpha 등을 사용할 수 있지만, version마다 지원하지 않는 conf property가 있으므로 임의로 혼용하지 않는 편이 안전하다.

Kafka 버전이 3.7.0인 최신 버전으로 적용되어 있는데, 이후 strimzi 버전을 명시하고 모두 올렸을 때 3.2~ 이하 버전은 호환이 되지 않았다.

Kafka listeners에 tls를 해제한 포트와 사용한 포트를 두개 열어두었는데, 이것저것 만지다 보면 tls cert 없이 접근하려 하는 경우 막히는 경우가 있었다. 미리 설정을 잘해두면 좋을 것 같다.

ssl.endpoint.identification.algorithm: ""

부분에서는 SSL connection을 위한 hostname verification을 해제해둔 부분인데, prod 환경에서는 권장하지 않는다.

template:
  kafkaContainer:
    env:
      - name: KAFKA_HEAP_OPTS
        value: "-Xms1G -Xmx1G"

는 Kafka heap allocation 설정이다. 이 값을 지정하지 않고 배포했을 때 문제가 발생한 경험이 있다.

clusterCa:
  generateCertificateAuthority: true
clientsCa:
  generateCertificateAuthority: true

부분은 옵션인데, Ca와 pem 등을 직접 만들어 사용해도 무관하다.

openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj "/CN=kafka-connect"
kubectl create secret generic kafka-connect-tls --from-file=tls.key=key.pem --from-file=tls.crt=cert.pem -n strimzi-debezium

직접 만들어 사용하는 부분의 일부인데, 나머지는 상세히 검색하면 잘 나와있다.

다음으로 kafka-connect.yaml이다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: strimzi-debezium
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: ystc1247/kafka-strimzi:0.0.3
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
annotations:
  strimzi.io/use-connector-resources: "true"

부분은 좀 중요한데, 이걸 해둬야 connector가 정상적으로 돌아간다. image같은 경우에는 난 직접 구성하여 DockerHub에 올린걸 사용했는데, strimzi + mysql-connector 가 달린 이미지다. 다른걸 찾아서 넣어도 관계없다.

Dockerfile을 첨부한다.

FROM strimzi/kafka:0.20.1-kafka-2.5.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001

사용한 MySQL connector:

curl -L https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz | tar -xz
bootstrapServers: my-cluster-kafka-bootstrap:9093

부분에 no-tls로 설정한 9092를 써도 되나, 인증 이슈가 터질수도 있다.

다음으로 kafka-connector.yaml 이다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: inventory-connector
  namespace: strimzi-debezium
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    topic.prefix: ${원하는 topic prefix}
    database.hostname: dbhost
    database.port: dbport
    database.user: 유저
    database.password: 비번
    database.dbname: db이름
    database.server.id: 185239 (고유한 id를 사용)
    database.server.name: db server
    table.include.list: 중요! 변화를 감지하고자 하는 table
    schema.history.internal.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    schema.history.internal.kafka.topic: "schema-changes.inventory"
    database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    database.history.kafka.topic: "database-changes.inventory"
    include.schema.changes: "true"
    database.history.kafka.reset.offset: "true"
    snapshot.mode: "when_needed"

snapshot.mode 는 좀 중요한데, 사용할 수 있는 설정이 많으니 필요한 동작에 맞춰 선택하면 된다. 자세한 옵션은 Debezium MySQL connector documentation에서 확인할 수 있다.

pod 생성과 소멸을 반복하다 Debezium에서 offset을 놓친 경우, 현재 snapshot과 이용 가능한 snapshot에 불일치가 발생할 수 있다.

 INFO: Connected to {} at mysql-bin-changelog.400734/891 (sid:185054, cid:15583019)                                                                          │
 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Connected to binlog at dev.data.yeogiya.io:3306, starting at BinlogOffsetContext{sourceInfoSchema=Schema{io.debezium.co
 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Waiting for keepalive thread to start (io.debezium.connector.binlog.BinlogStreamingChangeEventSource) [debezium-mysqlco │
│ 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Creating thread debezium-mysqlconnector-{}-binlog-client (io.debezium.util.Threads) [blc-{} │
│ 2024-07-24 06:42:19,297 ERROR [inventory-connector|task-0] Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin-changelog.400734/89 │
│ 2024-07-24 06:42:19,298 ERROR [inventory-connector|task-0] Producer failure (io.debezium.pipeline.ErrorHandler) [blc-{}:3306]                                    │
│ io.debezium.DebeziumException: Could not find first log file name in binary log index file Error code: 1236; SQLSTATE: HY000.

이런게 나왔다면 당첨이다. mysql-bin-changelog.400734가 현재 없으나 Debezium에서 읽고 싶어하는 경우가 에러의 원인의 다수이다. 외에도 MySQL version가 업데이트된 경우도 있을 수 있다.

우선 전자의 경우 해결법이다.

SHOW VARIABLES LIKE 'log_bin';

먼저 binary logging이 ON인지 확인한다.

다음으로

mysql> SHOW BINARY LOGS;
+----------------------------+-----------+-----------+
| Log_name                   | File_size | Encrypted |
+----------------------------+-----------+-----------+
| mysql-bin-changelog.400803 |      2436 | No        |
| mysql-bin-changelog.400804 |      2436 | No        |
| mysql-bin-changelog.400805 |       891 | No        |
+----------------------------+-----------+-----------+

현재 Debezium에서 읽는 버전과 현재 최신 버전의 차이를 보자.

추가적으로 expire_logs_days, binlog_expire_logs_seconds 같은 global variable 들에 문제가 있을 수 있으나, 변경한 적이 없고 default로 되어있다면 문제될건 없을 것이다.

이제 Kafka, Debezium에서 읽고자 하는 offset을 강제로 변경해줘야 하는데, 다음의 절차를 밟는다.

apiVersion: v1
kind: Pod
metadata:
  name: kafka-client
  namespace: default
spec:
  containers:
    - name: kafka
      image: confluentinc/cp-kafka:latest
      command:
        - sleep
        - "3600"

먼저 helper pod를 띄운다. Kafka shell tool을 이미 사용할 수 있다면 생략해도 된다.

이후

kubectl exec -it kafka-client -n default -- /bin/sh
cd /usr/bin
./kafka-consumer-groups --bootstrap-server my-cluster-kafka-bootstrap.strimzi-debezium.svc.cluster.local:9092 --group inventory-connector --reset-offsets --to-latest --all-topics --execute

이후 아래 command를 실행하면 모든 topic의 offset이 latest로 이동한다. 그 사이의 database change event는 감지하지 못하고 유실되므로, recovery 상황에서만 신중하게 사용해야 한다.

다음은 결과들이다.

결과를 어떻게 검증했는가

Strimzi를 사용하여 Kafka, Debezium을 Kubernetes에 띄우기 이미지 01

kubectl exec -it my-cluster-kafka-0 -n strimzi-debezium -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic {토픽명} --from-beginning

운영 기준

Strimzi는 Kafka 운영을 Kubernetes object로 끌어올려주지만, 운영 책임을 없애주지는 않는다. TLS, heap, connector image, topic replication, binlog retention, offset reset은 모두 실제 장애와 연결된다. CDC pipeline은 한 번 붙이면 데이터 흐름의 일부가 되므로, 처음부터 실패와 복구를 같이 설계해야 한다.

이를 실행하면 Kafka가 produce하고 있는 message들이 나온다.