most_active_driver_last_hour.sql
1 CREATE TABLE cars ( 2 timestamp TIMESTAMP, 3 driver_id BIGINT, 4 event_type TEXT, 5 location TEXT, 6 watermark timestamp GENERATED ALWAYS AS (timestamp - interval '1 hour') STORED 7 ) WITH ( 8 connector = 'single_file', 9 path = '$input_dir/cars.json', 10 format = 'json', 11 type = 'source', 12 event_time_field = 'timestamp', 13 watermark_field = 'watermark' 14 ); 15 CREATE TABLE most_active_driver ( 16 start TIMESTAMP, 17 end TIMESTAMP, 18 driver_id BIGINT, 19 count BIGINT, 20 row_number BIGINT 21 ) WITH ( 22 connector = 'single_file', 23 path = '$output_path', 24 format = 'json', 25 type = 'sink' 26 ); 27 INSERT INTO most_active_driver 28 SELECT window.start, window.end, driver_id, count, row_number FROM ( 29 SELECT *, ROW_NUMBER() OVER ( 30 PARTITION BY window 31 ORDER BY count DESC, driver_id desc) as row_number 32 FROM ( 33 SELECT driver_id, 34 hop(INTERVAL '1' minute, INTERVAL '1' hour ) as window, 35 count(*) as count 36 FROM cars 37 GROUP BY 1,2) ) where row_number = 1