controller_queries.sql
1 --! all_jobs : Job(ttl_micros?, state?, start_time?, finish_time?, tasks?, failure_message?, run_id?, pipeline_path?, wasm_path?) 2 SELECT 3 c.id as id, 4 c.organization_id as org_id, 5 pipeline_name, 6 pipeline_id, 7 checkpoint_interval_micros, 8 ttl_micros, 9 parallelism_overrides, 10 stop, 11 state, 12 start_time, 13 finish_time, 14 tasks, 15 failure_message, 16 restarts, 17 run_id, 18 pipeline_path, 19 wasm_path, 20 c.restart_nonce as config_restart_nonce, 21 s.restart_nonce as status_restart_nonce, 22 restart_mode 23 FROM job_configs c 24 LEFT JOIN job_statuses s ON c.id = s.id; 25 26 --! update_job_status (start_time?, finish_time?, tasks?, failure_message?, pipeline_path?, wasm_path?) 27 UPDATE job_statuses 28 SET state = :state, 29 start_time = :start_time, 30 finish_time = :finish_time, 31 tasks = :tasks, 32 failure_message = :failure_message, 33 restarts = :restarts, 34 pipeline_path = :pipeline_path, 35 wasm_path = :wasm_path, 36 run_id = :run_id, 37 restart_nonce = :restart_nonce 38 WHERE id = :job_id; 39 40 --! get_program 41 SELECT program, proto_version FROM pipelines WHERE id = :id; 42 43 --! mark_checkpoints_compacted 44 UPDATE checkpoints 45 set state = 'compacted' 46 WHERE job_id = :job_id AND epoch < :epoch; 47 48 --! drop_old_checkpoint_rows 49 DELETE FROM checkpoints 50 WHERE job_id = :job_id AND epoch < :epoch; 51 52 --! create_checkpoint 53 INSERT INTO checkpoints 54 (pub_id, organization_id, job_id, state_backend, epoch, min_epoch, start_time) 55 VALUES (:pub_id, :organization_id, :job_id, :state_backend, :epoch, :min_epoch, :start_time); 56 57 --! update_checkpoint (finish_time?) 58 UPDATE checkpoints 59 SET 60 operators = :operators, 61 finish_time = :finish_time, 62 state = :state 63 WHERE pub_id = :pub_id; 64 65 --! commit_checkpoint 66 UPDATE checkpoints 67 SET 68 finish_time = :finish_time, 69 state = 'ready' 70 WHERE pub_id = :pub_id; 71 72 --! mark_compacting 73 UPDATE checkpoints 74 SET 75 state = 'compacting' 76 WHERE job_id = :job_id AND epoch >= :min_epoch AND epoch < :epoch; 77 78 --! mark_failed 79 UPDATE checkpoints 80 SET 81 state = 'failed' 82 WHERE job_id = :job_id AND epoch >= :epoch; 83 84 --! last_successful_checkpoint 85 SELECT pub_id, epoch, min_epoch, state = 'committing' as needs_commits 86 FROM checkpoints 87 WHERE job_id = :job_id AND (state = 'ready' or state = 'committing') 88 ORDER BY epoch DESC 89 LIMIT 1; 90 91 --! create_job_log_message 92 INSERT INTO job_log_messages (pub_id, job_id, operator_id, task_index, log_level, message, details) 93 VALUES (:pub_id, :job_id, :operator_id, :task_index, :log_level, :message, :details); 94 95 --! clean_preview_pipelines 96 DELETE FROM pipelines WHERE id in ( 97 SELECT jc.pipeline_id 98 FROM job_configs jc 99 INNER JOIN job_statuses js ON jc.id = js.id 100 WHERE (js.state = 'Finished' OR js.state = 'Stopped' OR js.state = 'Failed') 101 AND jc.ttl_micros > 0 102 AND jc.created_at < :created_at);