main.py
1 # Copyright 2025 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 import os 16 from datetime import timedelta 17 from typing import TypedDict 18 19 from langchain_anthropic import ChatAnthropic 20 from langgraph.graph import END, StateGraph 21 from opensandbox import Sandbox 22 from opensandbox.config import ConnectionConfig 23 24 25 class WorkflowState(TypedDict): 26 sandbox: Sandbox | None 27 run_output: str 28 summary: str 29 last_error: str 30 attempt: int 31 max_attempts: int 32 command: str 33 fallback_command: str 34 cleaned: bool 35 36 37 def _configure_anthropic_env() -> None: 38 api_key = os.getenv("ANTHROPIC_API_KEY") 39 auth_token = os.getenv("ANTHROPIC_AUTH_TOKEN") 40 41 if auth_token: 42 os.environ["ANTHROPIC_AUTH_TOKEN"] = auth_token 43 os.environ.pop("ANTHROPIC_API_KEY", None) 44 return 45 46 if api_key: 47 os.environ["ANTHROPIC_API_KEY"] = api_key 48 os.environ.pop("ANTHROPIC_AUTH_TOKEN", None) 49 return 50 51 raise RuntimeError("ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN is required") 52 53 54 def _build_llm() -> ChatAnthropic: 55 _configure_anthropic_env() 56 anthropic_base_url = os.getenv("ANTHROPIC_BASE_URL") 57 model_name = os.getenv("ANTHROPIC_MODEL", "claude-3-5-sonnet-latest") 58 59 return ChatAnthropic( 60 model=model_name, 61 anthropic_api_url=anthropic_base_url, 62 ) 63 64 65 def _format_execution(execution) -> str: 66 stdout = "\n".join(msg.text for msg in execution.logs.stdout) 67 stderr = "\n".join(msg.text for msg in execution.logs.stderr) 68 69 if execution.error: 70 stderr = "\n".join( 71 [ 72 stderr, 73 f"[error] {execution.error.name}: {execution.error.value}", 74 ] 75 ).strip() 76 77 output = stdout.strip() 78 if stderr: 79 output = "\n".join([output, f"[stderr]\n{stderr}"]).strip() 80 return output or "(no output)" 81 82 83 async def create_sandbox(state: WorkflowState) -> WorkflowState: 84 print("[create] Creating sandbox") 85 domain = os.getenv("SANDBOX_DOMAIN", "localhost:8080") 86 api_key = os.getenv("SANDBOX_API_KEY") 87 image = os.getenv( 88 "SANDBOX_IMAGE", 89 "sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/code-interpreter:v1.0.2", 90 ) 91 92 config = ConnectionConfig( 93 domain=domain, 94 api_key=api_key, 95 request_timeout=timedelta(seconds=120), 96 ) 97 98 sandbox = await Sandbox.create( 99 image, 100 connection_config=config, 101 ) 102 103 print(f"[create] Sandbox ready: {sandbox.id}") 104 105 return {**state, "sandbox": sandbox} 106 107 108 async def prepare_workspace(state: WorkflowState) -> WorkflowState: 109 print("[prepare] Writing job files") 110 sandbox = state["sandbox"] 111 if sandbox is None: 112 raise RuntimeError("Sandbox not initialized") 113 114 await sandbox.files.write_file( 115 "/tmp/math.py", 116 "result = 137 * 42\nprint(result)\n", 117 ) 118 await sandbox.files.write_file( 119 "/tmp/notes.txt", 120 "LangGraph + OpenSandbox\n", 121 ) 122 123 print("[prepare] Files written") 124 125 return state 126 127 128 async def run_job(state: WorkflowState) -> WorkflowState: 129 attempt = state["attempt"] + 1 130 max_attempts = state["max_attempts"] 131 command = state.get("command") or "python3 /tmp/math.py" 132 print(f"[run] Executing job (attempt {attempt}/{max_attempts})") 133 sandbox = state["sandbox"] 134 if sandbox is None: 135 raise RuntimeError("Sandbox not initialized") 136 137 execution = await sandbox.commands.run(command) 138 run_output = _format_execution(execution) 139 last_error = "" 140 next_command = command 141 142 if execution.error: 143 last_error = f"{execution.error.name}: {execution.error.value}" 144 if attempt < max_attempts: 145 next_command = state.get("fallback_command", "python /tmp/math.py") 146 print(f"[run] Failed, scheduling fallback: {next_command}") 147 148 print(f"[run] Output: {run_output}") 149 150 return { 151 **state, 152 "run_output": run_output, 153 "last_error": last_error, 154 "attempt": attempt, 155 "command": next_command, 156 } 157 158 159 def decide_next(state: WorkflowState) -> str: 160 if state.get("last_error") and state["attempt"] < state["max_attempts"]: 161 print("[decide] Retry with fallback command") 162 return "run" 163 164 print("[decide] Proceeding to inspect") 165 return "inspect" 166 167 168 async def inspect_results(state: WorkflowState) -> WorkflowState: 169 print("[inspect] Reading notes and summarizing") 170 sandbox = state["sandbox"] 171 if sandbox is None: 172 raise RuntimeError("Sandbox not initialized") 173 174 notes = await sandbox.files.read_file("/tmp/notes.txt") 175 llm = _build_llm() 176 prompt = ( 177 "Summarize the sandbox run result and notes in one sentence. " 178 f"Run output: {state.get('run_output', '')}. " 179 f"Notes: {notes.strip()}." 180 ) 181 response = await llm.ainvoke(prompt) 182 183 print(f"[inspect] Summary: {response.content}") 184 185 return {**state, "summary": response.content} 186 187 188 async def cleanup_sandbox(state: WorkflowState) -> WorkflowState: 189 print("[cleanup] Cleaning up sandbox") 190 sandbox = state.get("sandbox") 191 if sandbox is not None: 192 await sandbox.kill() 193 await sandbox.close() 194 195 print("[cleanup] Done") 196 197 return {**state, "sandbox": None, "cleaned": True} 198 199 200 async def main() -> None: 201 graph = StateGraph(WorkflowState) 202 graph.add_node("create", create_sandbox) 203 graph.add_node("prepare", prepare_workspace) 204 graph.add_node("run", run_job) 205 graph.add_node("inspect", inspect_results) 206 graph.add_node("cleanup", cleanup_sandbox) 207 graph.set_entry_point("create") 208 graph.add_edge("create", "prepare") 209 graph.add_edge("prepare", "run") 210 graph.add_conditional_edges( 211 "run", 212 decide_next, 213 { 214 "run": "run", 215 "inspect": "inspect", 216 }, 217 ) 218 graph.add_edge("inspect", "cleanup") 219 graph.add_edge("cleanup", END) 220 app = graph.compile() 221 222 initial_state = { 223 "sandbox": None, 224 "run_output": "", 225 "summary": "", 226 "last_error": "", 227 "attempt": 0, 228 "max_attempts": 2, 229 "command": "python3 /tmp/math.py", 230 "fallback_command": "python /tmp/math.py", 231 "cleaned": False, 232 } 233 234 state = initial_state 235 try: 236 async for update in app.astream(initial_state, stream_mode="values"): 237 state = update 238 finally: 239 if not state.get("cleaned"): 240 sandbox = state.get("sandbox") 241 if sandbox is not None: 242 await sandbox.kill() 243 await sandbox.close() 244 245 print(f"Run output: {state['run_output']}") 246 print(f"Summary: {state['summary']}") 247 248 249 if __name__ == "__main__": 250 import asyncio 251 252 asyncio.run(main())