offset_impulse_join.sql
1 CREATE TABLE impulse_source ( 2 timestamp TIMESTAMP, 3 counter bigint unsigned not null, 4 subtask_index bigint unsigned not null 5 ) WITH ( 6 connector = 'single_file', 7 path = '$input_dir/impulse.json', 8 format = 'json', 9 type = 'source', 10 event_time_field = 'timestamp' 11 ); 12 CREATE TABLE delayed_impulse_source ( 13 timestamp TIMESTAMP, 14 counter bigint unsigned not null, 15 subtask_index bigint unsigned not null, 16 watermark timestamp GENERATED ALWAYS AS (timestamp - INTERVAL '10 minute') STORED 17 ) WITH ( 18 connector = 'single_file', 19 path = '$input_dir/impulse.json', 20 format = 'json', 21 type = 'source', 22 event_time_field = 'timestamp', 23 watermark_field = 'watermark' 24 ); 25 CREATE TABLE offset_output ( 26 start timestamp, 27 counter bigint 28 ) WITH ( 29 connector = 'single_file', 30 path = '$output_path', 31 format = 'json', 32 type = 'sink' 33 ); 34 INSERT INTO offset_output 35 SELECT window.start, a.counter as counter 36 FROM (SELECT TUMBLE(interval '1 second'), counter, count(*) FROM impulse_source GROUP BY 1,2) a 37 JOIN (SELECT TUMBLE(interval '1 second') as window, counter , count(*) FROM delayed_impulse_source GROUP BY 1,2) b 38 ON a.counter = b.counter