Where the Problem Started

The hard part of introducing CDC is not only starting a connector. It is deciding how database changes become an event pipeline, how Kafka and Debezium should run inside the application cluster, and how to recover when offsets break. This post records the Strimzi setup and operational points I ran into while running Kafka and Debezium on Kubernetes.

Debezium is a common CDC(Change Data Capture) tool.

The reason for using CDC is to quickly detect database changes, emit events, and run work from them.

To briefly explain the use case that led to introducing it,

A POST API request enters the application -> information is stored in database A -> an external API is called -> if the external call succeeds, information is stored in database B -> follow-up service methods run

In that flow, database A could be updated even when the external API failed, so the whole process was not protected by one transaction boundary.

Adding @Transactional and rolling back on failure sounds like the obvious answer, but the external API call was being sent asynchronously, so that did not solve the consistency problem.

The target design was to call the external API first, then persist a detectable row only after success. Debezium could then capture that change, emit an event, and trigger the remaining workflow.

Several pub/sub systems or message queues can handle that event flow. This post focuses on Kafka, and more specifically on running Kafka and Debezium as pods inside the Kubernetes cluster instead of operating them as separate instances.

Strimzi is one practical way to deploy Kafka on Kubernetes. A Kafka cluster, Kafka Connect, and connector resources can be managed with a small set of YAML files, so I will organize the setup around that path.

Define the namespace first.

Implementation Path

kubectl create ns strimzi-debezium

Additionally, if you are using helm:

helm install strimzi-operator strimzi/strimzi-kafka-operator -n strimzi-debezium

Create the Kafka cluster resource next.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: strimzi-debezium
spec:
  kafka:
    version: 3.7.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 10Gi
          deleteClaim: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      ssl.endpoint.identification.algorithm: ""
      log4j.logger.org.apache.kafka: DEBUG
      log4j.logger.kafka: DEBUG
    resources:
      requests:
        memory: 1Gi
        cpu: "0.5"
      limits:
        memory: 2Gi
        cpu: "1"
    template:
      kafkaContainer:
        env:
          - name: KAFKA_HEAP_OPTS
            value: "-Xms1G -Xmx1G"
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
    resources:
      requests:
        memory: 512Mi
        cpu: "0.5"
      limits:
        memory: 1Gi
        cpu: "1"
    config:
      autopurge.snapRetainCount: 3
      autopurge.purgeInterval: 1
      clientCnxnSocket: org.apache.zookeeper.ClientCnxnSocketNetty
      serverCnxnFactory: org.apache.zookeeper.server.NettyServerCnxnFactory
      quorum.ssl.enabled: true
      ssl.quorum.protocol: TLSv1.2
      tickTime: 2000
      initLimit: 15
      syncLimit: 10
    logging:
      type: inline
      loggers:
        zookeeper.root.logger: "INFO,CONSOLE"
    template:
      pod:
        securityContext:
          runAsUser: 1001
          fsGroup: 1001
  entityOperator:
    topicOperator: {}
    userOperator: {}
  clusterCa:
    generateCertificateAuthority: true
  clientsCa:
    generateCertificateAuthority: true

Here, Kafka and ZooKeeper are each created with three replicas, and CPU/memory are allocated fairly generously. The replica count should follow the use case, but these resource values were close to the minimum I felt comfortable with for reliable database-change delivery.

For apiVersion, you can use versions like v1alpha in addition to v1beta2, but some conf properties are missing depending on the version, so do not mix them casually.

Kafka is set to 3.7.0, which was the latest version at the time. When I pinned the Strimzi version and brought the stack up together, Kafka versions around 3.2 or lower were not compatible.

I exposed two Kafka listeners: one plaintext port and one TLS port. While experimenting with the settings, I saw connection failures when a client tried to connect without the expected TLS certificate, so this part is worth setting carefully from the beginning.

ssl.endpoint.identification.algorithm: ""

This part disables hostname verification for SSL connection, which is not recommended in a prod environment.

template:
  kafkaContainer:
    env:
      - name: KAFKA_HEAP_OPTS
        value: "-Xms1G -Xmx1G"

This config sets the Kafka heap allocation. Without it, the pod can run into memory pressure during startup.

clusterCa:
  generateCertificateAuthority: true
clientsCa:
  generateCertificateAuthority: true

This part is optional. You can also create and use the CA and pem yourself.

openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj "/CN=kafka-connect"
kubectl create secret generic kafka-connect-tls --from-file=tls.key=key.pem --from-file=tls.crt=cert.pem -n strimzi-debezium

This is the manual certificate creation path. The remaining details follow the standard Kubernetes secret and Strimzi certificate flow.

Next is kafka-connect.yaml.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: strimzi-debezium
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: ystc1247/kafka-strimzi:0.0.3
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
annotations:
  strimzi.io/use-connector-resources: "true"

This annotation is important because it allows connector resources to be managed properly. For the image, I used one I built and pushed to DockerHub with Strimzi and the MySQL connector included. A different image is fine as long as it contains the required connector.

I am attaching the Dockerfile.

FROM strimzi/kafka:0.20.1-kafka-2.5.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001

The MySQL connector I used:

curl -L https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz | tar -xz
bootstrapServers: my-cluster-kafka-bootstrap:9093

You can use 9092, which was configured without tls, for this part, but authentication issues may occur.

Next is kafka-connector.yaml.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: inventory-connector
  namespace: strimzi-debezium
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    topic.prefix: ${원하는 topic prefix}
    database.hostname: dbhost
    database.port: dbport
    database.user: 유저
    database.password: 비번
    database.dbname: db이름
    database.server.id: 185239 (고유한 id를 사용)
    database.server.name: db server
    table.include.list: 중요! 변화를 감지하고자 하는 table
    schema.history.internal.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    schema.history.internal.kafka.topic: "schema-changes.inventory"
    database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092"
    database.history.kafka.topic: "database-changes.inventory"
    include.schema.changes: "true"
    database.history.kafka.reset.offset: "true"
    snapshot.mode: "when_needed"

snapshot.mode is important because Debezium provides many different snapshot behaviors. Choose the mode that matches the recovery behavior you need. The detailed options are in the Debezium MySQL connector documentation.

If pod creation and deletion repeat and Debezium misses an offset, a mismatch can occur between the current snapshot and the available snapshot.

 INFO: Connected to {} at mysql-bin-changelog.400734/891 (sid:185054, cid:15583019)                                                                          │
 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Connected to binlog at dev.data.yeogiya.io:3306, starting at BinlogOffsetContext{sourceInfoSchema=Schema{io.debezium.co
 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Waiting for keepalive thread to start (io.debezium.connector.binlog.BinlogStreamingChangeEventSource) [debezium-mysqlco │
│ 2024-07-24 06:42:19,297 INFO [inventory-connector|task-0] Creating thread debezium-mysqlconnector-{}-binlog-client (io.debezium.util.Threads) [blc-{} │
│ 2024-07-24 06:42:19,297 ERROR [inventory-connector|task-0] Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin-changelog.400734/89 │
│ 2024-07-24 06:42:19,298 ERROR [inventory-connector|task-0] Producer failure (io.debezium.pipeline.ErrorHandler) [blc-{}:3306]                                    │
│ io.debezium.DebeziumException: Could not find first log file name in binary log index file Error code: 1236; SQLSTATE: HY000.

This error usually means Debezium is trying to read a binary log file that no longer exists, such as mysql-bin-changelog.400734. A MySQL version change can also create related offset/snapshot mismatches.

For the missing-binlog case, start here.

SHOW VARIABLES LIKE 'log_bin';

Check that binary logging is enabled.

Next:

mysql> SHOW BINARY LOGS;
+----------------------------+-----------+-----------+
| Log_name                   | File_size | Encrypted |
+----------------------------+-----------+-----------+
| mysql-bin-changelog.400803 |      2436 | No        |
| mysql-bin-changelog.400804 |      2436 | No        |
| mysql-bin-changelog.400805 |       891 | No        |
+----------------------------+-----------+-----------+

Compare the version Debezium is currently reading with the current latest version.

Additionally, there could be problems with global variables like expire_logs_days and binlog_expire_logs_seconds, but if you have not changed them and they are still at the default, they probably are not the issue.

Then force Kafka/Debezium to move the consumer offset to a valid position.

apiVersion: v1
kind: Pod
metadata:
  name: kafka-client
  namespace: default
spec:
  containers:
    - name: kafka
      image: confluentinc/cp-kafka:latest
      command:
        - sleep
        - "3600"

Bring up this helper pod first. If the Kafka shell tools are already available elsewhere, this step can be skipped.

After that:

kubectl exec -it kafka-client -n default -- /bin/sh
cd /usr/bin
./kafka-consumer-groups --bootstrap-server my-cluster-kafka-bootstrap.strimzi-debezium.svc.cluster.local:9092 --group inventory-connector --reset-offsets --to-latest --all-topics --execute

This resets the connector group’s offsets for all topics to the latest position. Any database changes between the old offset and the new one will not be captured, so this is a recovery operation that must be used carefully.

Here are the results.

How I Verified It

Running Kafka and Debezium on Kubernetes with Strimzi screenshot 01

kubectl exec -it my-cluster-kafka-0 -n strimzi-debezium -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic {topic-name} --from-beginning

Operational Takeaway

Strimzi lifts Kafka operation into Kubernetes objects, but it does not remove operational responsibility. TLS, heap sizing, connector images, topic replication, binlog retention, and offset reset all connect directly to real failure modes. Once a CDC pipeline is attached, it becomes part of the data flow, so failure and recovery need to be designed from the beginning.

Running this shows the messages Kafka is producing.