top of page
Search

Kafka Producers & Consumers

  • Writer: rahulshettigar13
    rahulshettigar13
  • Jan 25
  • 7 min read

Kafka Producers

Producers write the data to the topic, ie they create the messages. Producers know to which partitions to write to and which broker has it. A message can be produced to a specific topic and by default the producer will balance messages over all partitions of a topic evenly.


The message key can be used to direct messages to a specific partition. The message key and a partitioner will generate a hash of the key and map it to a specific partition. This ensures that all messages produced with a given key will get written to the same partition. 


In case of Kafka broker failure, producers will automatically recover.



We start producing messages to Kafka by creating a "ProducerRecord" which must include the following:

  • the topic we want to send the record to and its values

  • We can also include a specific key, partitions, timestamps, and headers optionally.

Once we send the ProducerRecords, the first thing the producer will do is serialize the kye and value object to byte arrays so that they can be sent over the network.


If we don't specify a partition, the data is sent to the partitioner. The partitions then choose the partitions, based on the ProducerRecord key. Once a partition is selected, the producer knows which topic and partition the record will go on to. It then adds the record to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers.


When the broker receives the message, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with topic, partitions, and offer of the record within the partition. If the broker fails to write the message, it will return an error.


When the producer receives an error, it may retry sending the message a few more times before giving up and returning an error.


Constructing a Kafka Producer.


A Kafka producer has three mandatory properties:

bootstart.servers: pairs of brokers that the producer will use to establish initial connections to the Kafka Cluster. This list doesn't need all the brokers as the producer will get more information after the initial connection. 


Kafka only accepts bytes as input from producers and sends bytes out as output to the consumer.

Message Serialization means transforming objects/data into bytes. They are used on the value and the key.

Common Serializers

  • String

  • Int, Float

  • Avro

  • Protobuf


key.serializer: Name of the calls that will be used to serialize the keys of the records that will be produced to Kafka. key.serializer should be set to the name of a class that implements the org.apache.kafka.common.serialization.Serializer interface. The producer will use this class to serialize the key object to a byte array. 


value.serializer: Name of the class that will  be used to serialize the value of the records that will be produced to Kafka


Once the producer is initialized we can start sending messages to the Kafka broker. There are 3 methods of sending a message

Fire and forget:

We send a message to the server and not worry if its arrives successfully or not..

Synchronous send:

Kafka producer is always asynchronous, we send a message and the send() method returns a Future object.  we use get() to wait on the Future and see if the send() was successful or not before sending the next record.

Asynchronous send

We call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker.


Configuring Producers:

client.id: client.id is a logical identifier for the client and the application it is used in. This can be a string and will be used by the brokers to identify messages sent from the client. It is used in logging and metrics and for quotas. Choosing a good client name will make troubleshooting much easier.


Producer Acks 

The producer can choose to receive acknowledgment of data writes. That means, the producers gets a confirmation from the broker that the write did happen.

acks=0: Producer wont wait for acknowledgement[possible data loss]. acks=1: The Producer will wait for the leader's acknowledgment

acks=all: Leader + replicas acknowledgment.


acks=0

When this is set up, the producer considers that the messages as written successfully, right at the moment the message was sent without waiting for the broker to accept it all.


If the broker goes offline or an exception happens, we will not know and it will cause data loss

Use Case: Useful in cases, where data loss is okay, such as in metrics collections

High throughput due to low network Overhead, since the producer doesn't wait for any response from the server, it can send messages as fast as the network will support. 


acks=1

Producer considers messages as “written Successfully” when the message is acknowledged by only the leader. This s the default for kafka v1.0 to 2.8.

Replication is not guaranteed. In case the leader is offline and replicas haven't replicated the data, then there is data loss.

If ack is not received, the producer may retry the request.

acks=all(acks=1)

Producer considers messages as “successful” when the message is received by all in sync replicas. This is Default for 3.0


Acks=all and min.insync.replicas


The leader replica for a partition checks to see if there are enough in-sync replicas for safely writing the message

min.insync.replicas=1 only the broker leader needs to successfully ack

min.insync.replicas=2 at least the broker leader and one replica need to ack.



Message Delivery Time


We divide the time spent sending a ProduceRecord into two time intervals that are handled separately:

  • Time until an async call to send() returns. During this interval, the thread that called send() will be blocked.

  • From the time an async call to send() returned successfully until the callback is triggered (with success or failure). This is the same as from the point a Produce Record was placed in a batch for sending until Kafka responds with success, non-retriable failure, or we run out of time allocated for sending.


max.block.ms: Control how long the producer may block when calling send() and when explicitly requesting metadata via partitions ().


delivery.timeout.ms: This configuration will limit the amount of time spent from the point a record is ready for sending, until either the broker responds or the client gives up, including time spent on retries. this time should be greater than linger.ms and request.timeout.ms. If you try to create a producer with an inconsistent timeout configuration, you will get an exception.


request.timeout.ms: Controls how long the producer will wait for a reply from the server when sending data. Note that this is the time spent waiting on each producer request before giving up; it does not include retries, time spent before sending, and so on. If the timeout is reached without reply, the producer will either retry sending or complete the callback with a TimeoutException.


retries and retry.backoff.ms: when the producer receives an error message from the server, the error could be transient.  In this case, the value of the retries parameter will control how many times the producer will retry sending the message before giving up and notifying the client of an issue.


linger.ms: controls the amount of time to wait for additional messages before sending the current batch. KafkaProducer sends a batch of messages either when the current batch is full or when the linger.ms limit is reached.By setting linger.ms higher than 0, we instruct the producer to wait a few milliseconds to add additional messages to the batch before sending it to the brokers. 


buffer.memory: Sets the amount of memory that the producer will use to buffer messages waiting to be sent to the brokers. If messages are sent by the application faster than they can be delivered to the server, the producer may run out of space, and additional send() calls will block for max.block.ms and wait for space to free up before throwing an exception. 


compression.type

By default, messages are sent uncompressed. This parameter can be set to snappy, gzip, lz4, or zstd, in which case the corresponding compression algorithms will be used to compress the data before sending it to the brokers.


batch.size

When multiple records are sent to the same partition, the producer will batch them together. This parameter controls the amount of memory in bytes (not messages!) that will be used for each batch. When the batch is full, all the messages in the batch will be sent. However, this does not mean that the producer will wait for the batch to become full. The producer will send half-full batches and even batches with just a single message in them. Therefore, setting the batch size too large will not cause delays in sending messages; it will just use more memory for the batches. Setting the batch size too small will add some overhead because the producer will need to send messages more frequently.


max.in.flight.requests.per.connection

This controls how many message batches the producer will send to the server without receiving responses. Higher settings can increase memory usage while improving throughput. 


max.request.size: Controls the size of a produce request sent by the producer. It caps both the size of the large message that can be sent and the number of messages that the producer can send in one request. 


message.max.bytes: controls the size of the messages it will accept in bytes.

Consumers

Consumer requests/reads data from a topic, via the pull model. Consumers will know which broker to read from by keeping track of the offset of the message. The consumer knows how to recover, in case of any broker failures. Data is read in order from low to high offset, within each partition.


Consumer Groups

All the consumers in an application read data as a consumer group. Each consumer within a group reads from exclusive partitions. This mapping of the consumers and the partition si called an ownership of the partition by the consumer.


If you have more consumers than partition, then some consumers will be inactive. You can also have multiple consumer groups on the same topic


You can create distinct consumer groups, we can use the consumer property group.id

Recent Posts

See All
Kafka Broker

A Kafka broker is just a server. The producers send messages to the brokers and the broker assigns offset to them and writes the message...

 
 
 
Kafka Introduction

Apache Kafka is an open-source distributed streaming platform used for building real-time data pipelines and streaming applications. The...

 
 
 

Comentários


bottom of page