Most of the problems that people mention with their Kafka implementations is that they don’t have the complete visibility required over the configuration and the API usage. Having more visibility into the commonly required tweaks can allow admins and developers to use Kafka as comfortably as a MySQL or PostgreSQL cluster.

I was recently working on implementing a custom CDC for Mongo. Using Kafka connect, we have out of the box solutions available to use Mongo CDC. However, there was a requirement which needed us to gain complete control over the CDC process.

In the upcoming sections, we will discuss the overall process in brief and then explain the individual steps in detail.

What is CDC?#

Quoting the mongodb site,

CDC is a software architecture that converts changes in a datastore into a stream of CDC events. A CDC event is a message containing a reproducible representation of a change performed on a datastore

CDC can be used to connect databases like mongodb to other sinks, for eg, elasticsearch, spark, s3, etc. Another major advantage is that it can also be used as a reliable log of all the mongo events in sequence. The CDC events are generated by using the mongo oplog which is also used to maintain replication among the mongo replica sets.

Prerequisites#

Delivery Semantics

  • At-least once delivery
  • Exactly once delivery

CDC Approach#

There can be at the minimum, 2 types of workers we can foresee:

  • Poller
  • Publisher

Sequence of actions#

  • Poller fetches the mongo oplog’s resume token if present
  • The resume token allows the poller to start from a specific offset at which an event was received.
  • Fetch a fixed number of events from the mongo oplog starting from the resume token
  • Send the batched events to the publisher
  • The publisher sends the batched events to Kafka
  • Once the publisher is able to send the events to Kafka, the poller should update the resume token as well
  • Repeat the process again

Rules of Resiliency#

The sequence is pretty simple to understand. However, there are also multiple points of failure that we need to guard against here.

  • Resume token should be updated only if the event batch has been published
  • If the event publish has failed, then the resume token shouldn’t be updated
  • If the resume token is not updated, the event publish should also fail
  • A publishing will be termed successful if all the partition replicas have received the message
  • The poller should not update the consumer’s offset till the resume token has been updated

Questions#

  • How does the consumer know which offset to start from?
  • How do we prevent the consumer from updating the offset at which the read just happened?
  • How does the consumer fetch the latest resume token?
  • How does the publisher ensure all partition replicas are updated?
  • How does the publisher batch the events without custom logic?
  • How do we ensure a transaction between the event batch publish and resume token update?

How does the consumer know which offset to start from?#

Each consumer is assigned to a kafka consumer group by default. Kafka maintains the latest offset read per consumer group. That’s one of the reasons why each partition can have only consumer in a consumer group. If the consumer group doesn’t have any offset maintained, then it starts from the earliest partition offset or the latest partition offset depending on the consumer’s configuration as shown below.

# Update consumer config with the following
auto.offset.reset: “latest” # “earliest” is also an option

How do we prevent the consumer from updating the offset at which the read just happened?#

By default, whenever a consumer reads from the offset of a specific partition, the consumer asynchronously updates the Kafka cluster about the read offsets and the Kafka updates the mapping. If the consumer fails unceremoniously before the offset update is completed, then there is a chance of messages being re-read which would lead to duplicate messages does violating the Exactly Once delivery guarantee.

The other approach is to commit the offsets only after the underlying tasks like pushing to Kafka is completed. This requires us to make the consumer offset update synchronous and should be controlled via an API. This can be configured by doing the following:

# Update consumer config to disable the auto offset commit
enable.offset.commit: False

# When the connected tasks are completed, the offset can be updated by running
consumer.commit(asynchronous=False)

How does the consumer fetch the latest resume token?#

We store the resume token per poller in a separate topic. So whenever the poller starts up, it should fetch the latest token from the topic. If the poller is starting up for the first time, then there will be no resume token, so the polling of the oplog will start from the last made change as per the mongo configuration.

For the consumer to jump directly to the last resume token, we can follow the below approach:

  • Get the highest watermark offset of the partition in the topic.
  • Get the consumer’s current offset of the partition.
  • Assign the offset of the partition to (offset-1)
  • Read from the offset

How does the publisher ensure all partition replicas are updated?#

Whenever we publish any message to Kafka without calling flush, it does an asynchronous publish of messages. Why do we need to ensure that all in-sync partitions are updated when sending a message.

In Kafka, the reliability of the cluster is dependent on the partitions and their replication factors. Each partition replica set has a leader where the writes happen which asynchronously syncs it to the other partition replicas.

In cases where we don’t wait for the acknowledgement between all the partition replicas, if the leader partition goes down before syncing it with the other partition replicas, then loss of data is possible. Hence it’s important to receive an acknowledgement from all the partitions to avoid a split brain or message loss situation.

Kafka provides 3 levels of acknowledgements from the kafka brokers:

  • No acknowledgement
  • Acknowledgement from the leader partition
  • Acknowledgement from all the in-sync partition replicas

The same can be configured by the following configuration in the publisher

“acks”: “all” # Also can be 0 and 1

How does the publisher batch the events without custom logic?#

Usually when we have to batch multiple events for any tool, we have a size threshold (let’s call it x) and a time threshold (this is y). It means we are saying that we have buffer the events till we reach x events. This helps us in reducing continuous network calls and provides the benefits of batching, like block size allocations, lower compression overhead, etc. However, we also shouldn’t wait for more than a certain duration till the event is published to reduce the latency or lag observed by the service. So we also have y with a higher precedence over x.

In a Kafka producer’s configuration, x correlates with batch.size. The main difference in the batch.size compared to x is that the events’ count number is the frequency of the events where batch.size is the actual size in bytes. The default is 16384 bytes.

The y correlates with linger.ms . The linger.ms configuration controls the time duration till which the message is kept in the buffer unless the buffer size is exceeded.

batch.size: 1638400
linger.ms: 2000

How do we ensure a transaction between the event batch publish and resume token update?#

Kafka has the flush call which forces the publisher to publish all the events to the Kafka brokers with the appropriate acknowledgement configurations.

However, the problem remains that kafka publishers also publish asynchronously, the flush call just ensures that the local buffers are emptied. This can lead to multiple instances of the same message arriving on the cluster.

After a little searching and to my honest amusement (this is incredible), Kafka has support for Transactions. It uses the 2 commit phase pattern to ensure the transaction.

IMO, understanding all of the above should give you a good control over how to configure your Kafka components.

I hope you liked the article. Please let me know if you have any queries regarding the article. Happy reading!!