How to use KAFKA's producer API in librdkafka with c++ code on windows

CodeOverflow picture CodeOverflow · Jun 10, 2017 · Viewed 9.6k times · Source

I am trying to write a client as an producer. I followed examples to create a new win32 console project. I found the API doesn't work for me unless I add getline() function at the end of my program.

If I remove the getline(), the produce(..) method still returns the result of success. However, I cannot see any response in the command window of kafka-console-consumer

I got a little bit confused. Is that right? How can I send message without using getline()? Anyone knows?

I found why it doesn't work. It seems too fast to delete the producer object causing the producer cannot send messages to broker.

When I add sleep 1000 between produce method and delete producer object, producer can send message correctly.

So, the question is How to send message immediately. How can I make sure those message have been send totally before I destroy the producer object?

How to fix this problem, actually I don't like to add some sleep() in my source code.

win10+vs2015+kafka_2.10-0.9.0.1+zookeeper-3.4.6+librdkafka Please check out the following code

    // kafka_test_win32_nomfc.cpp 
//

#include "stdafx.h"
#include <iostream>
#include "librdkafka/rdkafkacpp.h"


int static producer_1()
{
    std::string brokers = "127.0.0.1";
    std::string errstr;
    std::string topic_str = "linli";
    std::string mode;
    std::string debug;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;
    // MyHashPartitionerCb hash_partitioner;
    int use_ccb = 0;

    /*
    * Create configuration objects
    */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("metadata.broker.list", brokers, errstr);

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    /*
    * Create topic handle.
    */
    RdKafka::Topic *topic = NULL;
    if (!topic_str.empty()) {
        topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
        if (!topic) {
            std::cerr << "Failed to create topic: " << errstr << std::endl;
            exit(1);
        }
    }

    RdKafka::ErrorCode resp = producer->produce(topic, partition,
        RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
        const_cast<char *>("hello worlf"), 11,
        NULL, NULL);

    delete topic;
    delete producer;
    return 0;
}


int static producer_2()
{
    std::string brokers = "127.0.0.1";
    std::string errstr;
    std::string topic_str = "linli";
    std::string mode;
    std::string debug;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;
    // MyHashPartitionerCb hash_partitioner;
    int use_ccb = 0;

    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("metadata.broker.list", brokers, errstr);

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    RdKafka::ErrorCode resp = producer->produce(topic_str, partition,
        RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
        (void *)"hi", 2,
        NULL, 0, 0, NULL);



    std::string errs(RdKafka::err2str(resp));
    std::cout << errs << std::endl;
    //producer->poll(0);


    delete producer;

    return 0;
}


int main()
{

    producer_2();

    return 0;
}

Answer

Edenhill picture Edenhill · Jun 11, 2017

The librdkafka produce() API (both C and C++) is asynchronous, your message will initially only be enqueued on the internal producer queue and only later (see the queue.buffering.max.ms configuration property - default 1 second) be combined with other messages into a message batch (MessageSet) and sent to the broker from a background thread.

Your program calls produce() and then quickly exits, long before the background producer thread has had a chance to send the message to the broker, much less receive an acknowledgement from the broker.

To make sure all outstanding messages have been sent, call flush(), before terminating your application.

If your application is long-lived you should call poll() at regular intervals to serve any delivery report callbacks you've registered.