nexmark_q5.sql
1 CREATE TABLE bids ( 2 datetime TIMESTAMP, 3 auction BIGINT 4 ) WITH ( 5 connector = 'single_file', 6 path = '$input_dir/nexmark_bids.json', 7 format = 'json', 8 type = 'source', 9 event_time_field = 'datetime' 10 ); 11 CREATE TABLE top_auctions ( 12 auction BIGINT, 13 count INT 14 ) WITH ( 15 connector = 'single_file', 16 path = '$output_path', 17 format = 'json', 18 type = 'sink' 19 ); 20 21 INSERT INTO top_auctions 22 SELECT AuctionBids.auction, AuctionBids.num 23 FROM ( 24 SELECT 25 auction, 26 count(*) AS num, 27 hop(interval '2 second', interval '10 seconds') as window 28 FROM bids 29 GROUP BY auction, window 30 ) AS AuctionBids 31 JOIN ( 32 SELECT 33 max(CountBids.num) AS maxn, 34 CountBids.window 35 FROM ( 36 SELECT 37 count(*) AS num, 38 hop(interval '2 second', interval '10 seconds') as window 39 FROM bids 40 GROUP BY auction, window 41 ) AS CountBids 42 GROUP BY CountBids.window 43 ) AS MaxBids 44 ON AuctionBids.window = MaxBids.window AND AuctionBids.num >= MaxBids.maxn;