QUESTION 35
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(“word-count-input”); KTable<String, Long> wordCounts = textLines
.mapValues(textLine -> textLine.toLowerCase())
.flatMapValues(textLine -> Arrays.asList(textLine.split(“W+”)))
.selectKey((key, word) -> word)
.groupByKey()
.count(Materialized.as(“Counts”));
wordCounts.toStream().to(“word-count-output”, Produced.with(Serdes.String(), Serdes.Long())); builder.build(); What is an adequate topic configuration for the topic word-count-output?
Result is aggregated into a table with key as the unique word and value its frequency. We have to enable log compaction for this topic to align the topic’s cleanup policy with KTable semantics.