Skip to content
gerritjvv edited this page Nov 27, 2012 · 2 revisions

Have a look at ContinuousAggregation

Kafka subproject has a KafkaDataSource implementation that will return the KafkaSream(s) as Iterators via the MultiThreadedDataSource implementation, allowing KafkaStream(s) to be aggregated in parallel. Note that each stream will carry its own aggregation.

The KafkaDataSource takes as argument a Transform instance that allows you to plug in any message format support to the data source.

e.g.

new KafkaDataSource(streamsList, new Transform<Message>{
   Object[] apply(Message m){
     new Object[]{getName(m), getURL(m)}
   }
 });

Clone this wiki locally