How to create a Topic in Kafka through Java

Jaya Ananthram picture Jaya Ananthram · Nov 20, 2014 · Viewed 37.3k times · Source

I want to create a topic in Kafka (kafka_2.8.0-0.8.1.1) through java. It is working fine if I create a topic in command prompt, and If I push message through java api. But I want to create a topic through java api. After a long search I found below code,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

I tried above code and it is showing that topic is created but I am not able to push message in the topic. Any thing wrong in my code? Or any other way to achieve the above?

Answer

Jaya Ananthram picture Jaya Ananthram · Nov 20, 2014

Edit - Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannur for API version 0.11.0+



Original answer

I fixed it.. After a long research..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,

First import the below statement,

import kafka.utils.ZKStringSerializer$;

and create object for ZkClient in the following way,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Edit 1: (for @ajkret comment)

The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9


import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}