Print Kafka Stream Input out to console?

Zeliax picture Zeliax · Sep 5, 2016 · Viewed 16.1k times · Source

I've been looking through a lot of the Kafka documentation for a java application that I am working on. I've tried getting into the lambda syntax introduced in Java 8, but I am a little sketchy on that ground and don't feel too confident that it should be what I use as of yet.

I've a Kafka/Zookeeper Service running without any troubles, and what I want to do is write a small example program that based on the input will write it out, but not do a wordcount as there are so many examples of already.

As for sample data I will be getting a string of following structure:

Example data

This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.

Question

I want to be able to extract the 3 letter keywords and print them with a System.out.println. How do I get a string variable containing the input? I know how to apply regular expressions or even just searching through the string to get the keywords.

Code

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    final Serde<String> stringSerde = Serdes.String();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    //How do I assign the input from in-stream to the following variable?
    String variable = ?
}

I have zookeeper, kafka, producer and consumer running all hooked up to the same topic so I want to basically see the same String appear on all of the instances (producer, consumer and stream).

Answer

Matthias J. Sax picture Matthias J. Sax · Sep 6, 2016

If you use Kafka Streams, you need to apply functions/operators on your data streams. In your case, you create a KStream object, thus, you want to apply an operator to source.

Depending on what you want to do, there are operators that apply a function to each record in the stream independently (eg. map()), or other operators that apply a function to multiple record together (eg. aggregateByKey()). You should have a look into the documentation: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl and examples https://github.com/confluentinc/kafka-streams-examples

Thus, you never create local variables using Kafka Streams as you show in your example above, but rather embed everything in operators/functions that get chained together.

For example, if you want to print all input record to stdout, you could do

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
    void apply(String key, String value) {
        System.out.println(key + ": " + value);
    }
 });

Thus, after you start your application via streams.start(), it will consumer the records from you input topic and for each record of your topic, a call to apply(...) is done, which prints the record on stdout.

Of course, a more native way for printing the stream to the console would be to use source.print() (which internally is basically the same as the shown foreach() operator with an already given ForeachAction.)

For your example with assigning the string to a local variable, you would need to put your code into apply(...) and do your regex-stuff etc. there to "extract the 3 letter keywords".

The best way to express this, would however be via a combination of flatMapValues() and print() (ie, source.flatMapValues(...).print()). flatMapValues() is called for each input record (in your case, I assume key will be null so you can ignore it). Within your flatMapValue function, you apply your regex and for each match, you add the match to a list of values that you finally return.

source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        ArrayList<String> keywords = new ArrayList<String>();

        // apply regex to value and for each match add it to keywords

        return keywords;
    }
}

The output of flatMapValues will be a KStream again, containing a record for each found keyword (ie, the output stream is a "union" over all lists your return in ValueMapper#apply()). Finally, you just print your result to console via print(). (Of course, you could also use a single foreach instead of flatMapValue+print but this would be less modular.)