I am trying to write a client as an producer. I followed examples to create a new win32 console project. I found the API doesn't work for me unless I add getline() function at the end of my program.
If I remove the getline(), the produce(..) method still returns the result of success. However, I cannot see any response in the command window of kafka-console-consumer
I got a little bit confused. Is that right? How can I send message without using getline()? Anyone knows?
I found why it doesn't work. It seems too fast to delete the producer object causing the producer cannot send messages to broker.
When I add sleep 1000 between produce method and delete producer object, producer can send message correctly.
So, the question is How to send message immediately. How can I make sure those message have been send totally before I destroy the producer object?
How to fix this problem, actually I don't like to add some sleep() in my source code.
win10+vs2015+kafka_2.10-0.9.0.1+zookeeper-3.4.6+librdkafka Please check out the following code
// kafka_test_win32_nomfc.cpp
//
#include "stdafx.h"
#include <iostream>
#include "librdkafka/rdkafkacpp.h"
int static producer_1()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = NULL;
if (!topic_str.empty()) {
topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
}
RdKafka::ErrorCode resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>("hello worlf"), 11,
NULL, NULL);
delete topic;
delete producer;
return 0;
}
int static producer_2()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
RdKafka::ErrorCode resp = producer->produce(topic_str, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
(void *)"hi", 2,
NULL, 0, 0, NULL);
std::string errs(RdKafka::err2str(resp));
std::cout << errs << std::endl;
//producer->poll(0);
delete producer;
return 0;
}
int main()
{
producer_2();
return 0;
}
The librdkafka produce() API (both C and C++) is asynchronous, your message will initially only be enqueued on the internal producer queue and only later (see the queue.buffering.max.ms
configuration property - default 1 second) be combined with other messages into a message batch (MessageSet) and sent to the broker from a background thread.
Your program calls produce()
and then quickly exits, long before the background producer thread has had a chance to send the message to the broker, much less receive an acknowledgement from the broker.
To make sure all outstanding messages have been sent, call flush()
, before terminating your application.
If your application is long-lived you should call poll()
at regular intervals to serve any delivery report callbacks you've registered.