/ crates / arroyo-sql-testing / src / test / queries / offset_impulse_join.sql
offset_impulse_join.sql
 1  CREATE TABLE impulse_source (
 2        timestamp TIMESTAMP,
 3        counter bigint unsigned not null,
 4        subtask_index bigint unsigned not null
 5      ) WITH (
 6        connector = 'single_file',
 7        path = '$input_dir/impulse.json',
 8        format = 'json',
 9        type = 'source',
10        event_time_field = 'timestamp'
11      );
12      CREATE TABLE delayed_impulse_source (
13        timestamp TIMESTAMP,
14        counter bigint unsigned not null,
15        subtask_index bigint unsigned not null,
16        watermark timestamp GENERATED ALWAYS AS (timestamp - INTERVAL '10 minute') STORED
17      ) WITH (
18        connector = 'single_file',
19        path = '$input_dir/impulse.json',
20        format = 'json',
21        type = 'source',
22        event_time_field = 'timestamp',
23        watermark_field = 'watermark'
24      );
25      CREATE TABLE offset_output (
26        start timestamp,
27        counter bigint
28      ) WITH (
29        connector = 'single_file',
30        path = '$output_path',
31        format = 'json',
32        type = 'sink'
33      );
34      INSERT INTO offset_output
35      SELECT window.start, a.counter as counter
36      FROM (SELECT TUMBLE(interval '1 second'),  counter, count(*) FROM impulse_source GROUP BY 1,2) a
37      JOIN (SELECT TUMBLE(interval '1 second') as window, counter , count(*) FROM delayed_impulse_source GROUP BY 1,2) b
38      ON a.counter = b.counter