前回はstatelessな操作を試してみたので、statefulな操作のうち簡単なものだけ動かしてみた。
sourceとなるtopicには以下のようにしてレコードを流している。
% echo "a:naoty" | kcat -b localhost:9092 -t streams-plaintext-input -K :
% echo "a:naoty" | kcat -b localhost:9092 -t streams-plaintext-input -K :
% echo "b:naoty" | kcat -b localhost:9092 -t streams-plaintext-input -K :
count
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("streams-plaintext-input");
source.groupByKey()
.count()
.toStream()
.print(Printed.toSysOut());
[KTABLE-TOSTREAM-0000000003]: a, 1
[KTABLE-TOSTREAM-0000000003]: a, 2
[KTABLE-TOSTREAM-0000000003]: b, 1
reduce
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("streams-plaintext-input");
source.groupByKey()
.reduce((result, value) -> result + value)
.toStream()
.print(Printed.toSysOut());
[KTABLE-TOSTREAM-0000000003]: a, naoty
[KTABLE-TOSTREAM-0000000003]: a, naotynaoty
[KTABLE-TOSTREAM-0000000003]: b, naoty
aggregate
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("streams-plaintext-input");
source.groupByKey()
.aggregate(() -> "", (resultKey, value, result) -> result + value.toUpperCase())
.toStream()
.print(Printed.toSysOut());
[KTABLE-TOSTREAM-0000000003]: a, �������NAOTY
[KTABLE-TOSTREAM-0000000003]: a, �������NAOTYNAOTY
[KTABLE-TOSTREAM-0000000003]: b, �������NAOTY
なぜか先頭にゴミが入ってしまった。
整理
groupByKey
,group
などのメソッドで集約する単位を定義したKGroupedStream
を返す。KGroupedStream
に対してcount
などの集約するメソッドが実行でき、これらはKTable
を返す。上流から流れてきたレコードはState Storeと呼ばれるローカルのキーバリューストアに保存され、更新があったレコードを下流に流す。KTable
に対してtoStream()
を実行すると、更新されたレコードが流れるKStream
が返る。