30.07.2021 | Sascha Müller | comment icon 0 Comment

Kafka Producer and Consumer in Java

In the last part of the series I showed you how to setup a Kafka cluster for development with Docker. This time it’s all about producing and consuming messages with good ol’ Java.

Shall we?

Create a new project

I’m going to use IntelliJ IDEA, but you can use Eclipse, Netbeans, VS Code, or even a plain text editor as long as you’re comfortable programming and running Java programs with it. Also, I’m using Java 15 and Gradle for my projects.

First of all we create a new Java Project and give it a name of “kafka-java” and a group name of “com.exxeta”. Then we need to add the Kafka dependencies to “build.gradle”. As we used “2.7.0” for the Kafka containers in our docker-compose.yml file, we should also use the corresponding Java library version. We should also add a dependency for Sfl4j to log messages to the command line and Lombok for making our Java classes more concise. Here’s the complete “build.gradle” already including the tasks to run our producer and consumer later on:

<1> Create a Gradle task for consumer

<2> Create a Gradle task for producer


Don’t forget to enable “Annotation Processing” if you use IntelliJ IDEA.

Annotation Processor Settings in IDEA

The producer

If you want to consume messages, you first need to create them. So let’s start off with writing our producer.

Create a new package “com.exxeta.producer” under “src/main/java” and put a “Main.java” and a “ProducerDemo.java” in it. We’ll use the “Main.java” only for creating the producer and as general entry point.

This is what the “Main.java” looks like:

<1> Create a new PrucerDemo instance and call the run method

The “ProducerDemo.java” should look like this:

There’s a lot to unravel here.

<1> This block parses the “producer.properties” file. (We’ll also need one for the consumer later.)

<2> Then we use these properties to configure our producer

<3> We then create ten messages individually

<4> For that we need a “ProducerRecord” specifying the topic (I used “exxeta” in this example, but you could name it anything else – also see Kafka Cluster Setup) where the message should be written to, the ID (“Integer.toString(i)”) we’d like to give the message and a message payload (“I said ‘Hello Kafka’ X times.”). In the real world this could be temperature data from an IoT device.

<5> Then we tell the producer to send the record to Kafka. This method call returns a Future<RecordMetadata> on which we will listen.

<6> If the future has returned and did not produce an error, we log the metadata that Kafka returned, else we log an error. 

<7> Using a CountDownLatch we wait max. 10 seconds for the operation to complete. This should be plenty of time to receive the desired data. 

Finally, we have to create the properties file that holds the configuration of the producer. We create a new file “producer.properties” in “src/main/resources” and add the following content to it:

<1> List of known Kafka brokers

<2> Number of acknowledgements a producer requires for a request to be complete (can be 0, 1, or ‘all’), meaning all brokers need to confirm that the message was received successfully

<3> Define how the key has to be serialized

<4> Define how the value has to serialized

<5> The unique name of the producer

See http://kafka.apache.org/documentation/#configuration for more information on these. 

The consumer

The general structure of the consumer is not that different from the producer. Like above we just add three files: “Main.java” and “ConsumerDemo.java” in package “com.exxeta.consumer” under “src/main/java” and a “consumer.properties” under “src/main/resources”.

This “Main.java” is as simple as the one for the producer:

The “ConsumerDemo.java” looks like this:

<1> First we create a KafkaConsumer with the given properties, just like we did with the producer above

<2> Then we subscribe to the “exxeta” topic. Note, that a consumer can subscribe to multiple topics. Therefore, we need to pass a list to the subscribe method

<3> Next, we loop infinitely, because we want to handle all future events. Normally, you would implement a shutdown hook and close the consumer gracefully if the JVM terminates, but for the simplicity of this demo we skip that step. 

<4> Now, we poll the broker for new messages on the subscribed topics (see https://kafka.apache.org/documentation/#design_pull why we need to pull the data). Depending on the amount and size of messages you expect, you could set a higher or lower duration here. 

<5> Then, we actually start processing with the received messages. In this case we log the offset and the message payload to the console.

<6> After processing the messages, we commit the offsets to Kafka. This way Kafka keeps track of the messages we already processed and won’t send it again. (We could also let Kafka do the heavy lifting with the consumer property ‘enable.auto.commit=true’, but we would lose the control over what get’s committed and when).

The corresponding properties file looks like this:

<1> the Kafka servers we want to use

<2> this tells Kafka how the key of the message should be deserialized

<3> same as above, but for value deserialization

<4> group.id is the consumer group our consumer belongs to or is associated with (so each event gets processes by only one consumer from a group)

More on the consumer config can be found at http://kafka.apache.org/documentation/#configuration.

Conclusion

This time I demonstrated how you can implement a rudimentary producer and consumer in plain Java. We touched the basics of configuring consumers and producers as well as some of the internal workings of Kafka.

apache kafka apache zookeeper cluster kafka

Leave a Comment