sliding_window_end.sql
1 CREATE TABLE impulse_source ( 2 timestamp TIMESTAMP, 3 counter bigint unsigned not null, 4 subtask_index bigint unsigned not null 5 ) WITH ( 6 connector = 'single_file', 7 path = '$input_dir/impulse.json', 8 format = 'json', 9 event_time_field = 'timestamp', 10 type = 'source' 11 ); 12 CREATE TABLE impulse_sink ( 13 count bigint, 14 min bigint, 15 max bigint, 16 start timestamp, 17 end timestamp 18 ) WITH ( 19 connector = 'single_file', 20 path = '$output_path', 21 format = 'json', 22 type = 'sink' 23 ); 24 25 INSERT INTO impulse_sink 26 SELECT count, min, max, window.start, window.end FROM ( 27 SELECT 28 hop(interval '2 second', interval '10 second' ) as window, 29 count(*) as count, 30 min(counter) as min, 31 max(counter) as max 32 from impulse_source 33 GROUP BY 1 34 );