/ crates / arroyo-controller / queries / controller_queries.sql
controller_queries.sql
  1  --! all_jobs : Job(ttl_micros?, state?, start_time?, finish_time?, tasks?, failure_message?, run_id?, pipeline_path?, wasm_path?)
  2  SELECT
  3      c.id as id,
  4      c.organization_id as org_id,
  5      pipeline_name,
  6      pipeline_id,
  7      checkpoint_interval_micros,
  8      ttl_micros,
  9      parallelism_overrides,
 10      stop,
 11      state,
 12      start_time,
 13      finish_time,
 14      tasks,
 15      failure_message,
 16      restarts,
 17      run_id,
 18      pipeline_path,
 19      wasm_path,
 20      c.restart_nonce as config_restart_nonce,
 21      s.restart_nonce as status_restart_nonce,
 22      restart_mode
 23  FROM job_configs c
 24  LEFT JOIN job_statuses s ON c.id = s.id;
 25  
 26  --! update_job_status (start_time?, finish_time?, tasks?, failure_message?, pipeline_path?, wasm_path?)
 27  UPDATE job_statuses
 28  SET state = :state,
 29      start_time = :start_time,
 30      finish_time = :finish_time,
 31      tasks = :tasks,
 32      failure_message = :failure_message,
 33      restarts = :restarts,
 34      pipeline_path = :pipeline_path,
 35      wasm_path = :wasm_path,
 36      run_id = :run_id,
 37      restart_nonce = :restart_nonce
 38  WHERE id = :job_id;
 39  
 40  --! get_program
 41  SELECT program, proto_version FROM pipelines WHERE id = :id;
 42  
 43  --! mark_checkpoints_compacted
 44  UPDATE checkpoints
 45      set state = 'compacted'
 46  WHERE job_id = :job_id AND epoch < :epoch;
 47  
 48  --! drop_old_checkpoint_rows
 49  DELETE FROM checkpoints
 50  WHERE job_id = :job_id AND epoch < :epoch;
 51  
 52  --! create_checkpoint
 53  INSERT INTO checkpoints
 54  (pub_id, organization_id, job_id, state_backend, epoch, min_epoch, start_time)
 55  VALUES (:pub_id, :organization_id, :job_id, :state_backend, :epoch, :min_epoch, :start_time);
 56  
 57  --! update_checkpoint (finish_time?)
 58  UPDATE checkpoints
 59  SET
 60      operators = :operators,
 61      finish_time = :finish_time,
 62      state = :state
 63  WHERE pub_id = :pub_id;
 64  
 65  --! commit_checkpoint
 66  UPDATE checkpoints
 67  SET
 68      finish_time = :finish_time,
 69      state = 'ready'
 70  WHERE pub_id = :pub_id;
 71  
 72  --! mark_compacting
 73  UPDATE checkpoints
 74  SET
 75      state = 'compacting'
 76  WHERE job_id = :job_id AND epoch >= :min_epoch AND epoch < :epoch;
 77  
 78  --! mark_failed
 79  UPDATE checkpoints
 80  SET
 81      state = 'failed'
 82  WHERE job_id = :job_id AND epoch >= :epoch;
 83  
 84  --! last_successful_checkpoint
 85  SELECT pub_id, epoch, min_epoch, state = 'committing' as needs_commits
 86  FROM checkpoints
 87  WHERE job_id = :job_id AND (state = 'ready' or state = 'committing')
 88  ORDER BY epoch DESC
 89  LIMIT 1;
 90  
 91  --! create_job_log_message
 92  INSERT INTO job_log_messages (pub_id, job_id, operator_id, task_index, log_level, message, details)
 93  VALUES (:pub_id, :job_id, :operator_id, :task_index, :log_level, :message, :details);
 94  
 95  --! clean_preview_pipelines
 96  DELETE FROM pipelines WHERE id in (
 97    SELECT jc.pipeline_id
 98    FROM job_configs jc
 99    INNER JOIN job_statuses js ON jc.id = js.id
100    WHERE (js.state = 'Finished' OR js.state = 'Stopped' OR js.state = 'Failed')
101      AND jc.ttl_micros > 0
102      AND jc.created_at < :created_at);