1+ package com .javahelps .siddhimigrationdemo ;
2+
3+ import java .util .Random ;
4+
5+ import org .wso2 .siddhi .core .ExecutionPlanRuntime ;
6+ import org .wso2 .siddhi .core .SiddhiManager ;
7+ import org .wso2 .siddhi .core .event .Event ;
8+ import org .wso2 .siddhi .core .stream .input .InputHandler ;
9+ import org .wso2 .siddhi .core .stream .output .StreamCallback ;
10+
11+ public class Main {
12+ // SiddhiDe
13+ private static final String [] CUSTOMERS = {"Alice" , "Barby" , "Carol" , "Diana" };
14+ private static final String [] ITEMS = {"Cocoa-Butter Lotion" , "Purse-XL" , "Purse-L" , "Beer" , "Biscuit" ,
15+ "Chocolate" , "ZMA" };
16+ private static final Random RANDOM = new Random ();
17+
18+ public static void main (String [] args ) throws InterruptedException {
19+ SiddhiManager siddhiManager = new SiddhiManager ();
20+
21+ String streams = "define stream purchaseStream (customerName string, item string, timestamp long); " ;
22+ String query = "partition with (customerName of purchaseStream) " +
23+ "begin " +
24+ "from purchaseStream[item == 'Cocoa-Butter Lotion' OR item == 'Purse-XL' OR item == 'ZMA']#window" +
25+ ".unique:externalTimeBatch(item, timestamp, 500 milliseconds) " +
26+ "select customerName, convert(count(item), 'double') / 3.0 * 100.0 as noOfPurchases insert into " +
27+ "possiblePregnant; " +
28+ "end " ;
29+
30+ // Create ExecutionPlanRuntime using stream definition and query
31+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager .createExecutionPlanRuntime (streams + query );
32+
33+ try {
34+ // Receive the output events
35+ executionPlanRuntime .addCallback ("possiblePregnant" , new StreamCallback () {
36+ @ Override
37+ public void receive (Event [] events ) {
38+ String output = String .format ("\t \t \t %s is pregnant with %.2f%% confidence." , events [0 ].getData (0 ),
39+ events [0 ].getData (1 ));
40+ System .out .println (output );
41+ }
42+ });
43+
44+ // Send input events
45+ InputHandler purchaseStream = executionPlanRuntime .getInputHandler ("purchaseStream" );
46+ executionPlanRuntime .start ();
47+ for (int i = 0 ; i < 1000 ; i ++) {
48+ Object [] event = generateEvent ();
49+ purchaseStream .send (event );
50+ Thread .sleep (10 ); // Delay for 10 milliseconds
51+ }
52+ } finally {
53+ executionPlanRuntime .shutdown ();
54+ }
55+ }
56+
57+ private static Object [] generateEvent () {
58+ String name = CUSTOMERS [RANDOM .nextInt (CUSTOMERS .length )];
59+ String item = ITEMS [RANDOM .nextInt (ITEMS .length )];
60+ long time = System .currentTimeMillis (); // Current time
61+
62+ System .out .println (name + " buys " + item );
63+ Object [] event = new Object []{name , item , time };
64+ return event ;
65+ }
66+
67+ }
0 commit comments