I am very much new to Kafka
and we are using Kafka 0.8.1
.
What I need to do is to consume a message from topic. For that, I will have to write one consumer in Java which will consume a message from topic and then save that message to database. After a message is saved, some acknowledgement will be sent to Java consumer. If acknowledgement is true, then next message should be consumed from the topic. If acknowldgement is false(which means due to some error message,read from the topic, couldn't be saved into the database), then again that message should be read.
I think I need to use Simple Consumer
,to have control over message offset and have gone through the Simple Consumer example as given in this link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.
In this example, offset is evaluated in run method as 'readOffset
'. Do I need to play with that? For e.g. I can use LatestTime()
instead of EarliestTime()
and in case of false, I will reset the offset to the one before using offset - 1
.
Is this how I should proceed?
I think you can get along with using the high level consumer (http://kafka.apache.org/documentation.html#highlevelconsumerapi), that should be easier to use than the SimpleConsumer. I don't think the consumer needs to reread messages from Kafka on database failure, as the consumer already has those messages and can resend them to the DB or do anything else it sees fit.
High-level consumers store the last offset read from a specific partition in Zookeeper (based on the consumer group name), so that when a consumer process dies and is later restarted (potentially on an other host), it can continue processing messages where it left off. It's possible to autosave this offset to Zookeeper periodically (see the consumer properties auto.commit.enable and auto.commit.interval.ms), or have it saved by application logic by calling ConsumerConnector.commitOffsets
. See also https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .
I suggest you turn auto-commit off and commit your offsets yourselves once you received DB acknowledgement. Thus, you can make sure unprocessed messages are reread from Kafka in case of consumer failure and all messages commited to Kafka will eventually reach the DB at least once (but not 'exactly once').