/ examples / langgraph / main.py
main.py
  1  # Copyright 2025 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  import os
 16  from datetime import timedelta
 17  from typing import TypedDict
 18  
 19  from langchain_anthropic import ChatAnthropic
 20  from langgraph.graph import END, StateGraph
 21  from opensandbox import Sandbox
 22  from opensandbox.config import ConnectionConfig
 23  
 24  
 25  class WorkflowState(TypedDict):
 26      sandbox: Sandbox | None
 27      run_output: str
 28      summary: str
 29      last_error: str
 30      attempt: int
 31      max_attempts: int
 32      command: str
 33      fallback_command: str
 34      cleaned: bool
 35  
 36  
 37  def _configure_anthropic_env() -> None:
 38      api_key = os.getenv("ANTHROPIC_API_KEY")
 39      auth_token = os.getenv("ANTHROPIC_AUTH_TOKEN")
 40  
 41      if auth_token:
 42          os.environ["ANTHROPIC_AUTH_TOKEN"] = auth_token
 43          os.environ.pop("ANTHROPIC_API_KEY", None)
 44          return
 45  
 46      if api_key:
 47          os.environ["ANTHROPIC_API_KEY"] = api_key
 48          os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
 49          return
 50  
 51      raise RuntimeError("ANTHROPIC_API_KEY or ANTHROPIC_AUTH_TOKEN is required")
 52  
 53  
 54  def _build_llm() -> ChatAnthropic:
 55      _configure_anthropic_env()
 56      anthropic_base_url = os.getenv("ANTHROPIC_BASE_URL")
 57      model_name = os.getenv("ANTHROPIC_MODEL", "claude-3-5-sonnet-latest")
 58  
 59      return ChatAnthropic(
 60          model=model_name,
 61          anthropic_api_url=anthropic_base_url,
 62      )
 63  
 64  
 65  def _format_execution(execution) -> str:
 66      stdout = "\n".join(msg.text for msg in execution.logs.stdout)
 67      stderr = "\n".join(msg.text for msg in execution.logs.stderr)
 68  
 69      if execution.error:
 70          stderr = "\n".join(
 71              [
 72                  stderr,
 73                  f"[error] {execution.error.name}: {execution.error.value}",
 74              ]
 75          ).strip()
 76  
 77      output = stdout.strip()
 78      if stderr:
 79          output = "\n".join([output, f"[stderr]\n{stderr}"]).strip()
 80      return output or "(no output)"
 81  
 82  
 83  async def create_sandbox(state: WorkflowState) -> WorkflowState:
 84      print("[create] Creating sandbox")
 85      domain = os.getenv("SANDBOX_DOMAIN", "localhost:8080")
 86      api_key = os.getenv("SANDBOX_API_KEY")
 87      image = os.getenv(
 88          "SANDBOX_IMAGE",
 89          "sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/code-interpreter:v1.0.2",
 90      )
 91  
 92      config = ConnectionConfig(
 93          domain=domain,
 94          api_key=api_key,
 95          request_timeout=timedelta(seconds=120),
 96      )
 97  
 98      sandbox = await Sandbox.create(
 99          image,
100          connection_config=config,
101      )
102  
103      print(f"[create] Sandbox ready: {sandbox.id}")
104  
105      return {**state, "sandbox": sandbox}
106  
107  
108  async def prepare_workspace(state: WorkflowState) -> WorkflowState:
109      print("[prepare] Writing job files")
110      sandbox = state["sandbox"]
111      if sandbox is None:
112          raise RuntimeError("Sandbox not initialized")
113  
114      await sandbox.files.write_file(
115          "/tmp/math.py",
116          "result = 137 * 42\nprint(result)\n",
117      )
118      await sandbox.files.write_file(
119          "/tmp/notes.txt",
120          "LangGraph + OpenSandbox\n",
121      )
122  
123      print("[prepare] Files written")
124  
125      return state
126  
127  
128  async def run_job(state: WorkflowState) -> WorkflowState:
129      attempt = state["attempt"] + 1
130      max_attempts = state["max_attempts"]
131      command = state.get("command") or "python3 /tmp/math.py"
132      print(f"[run] Executing job (attempt {attempt}/{max_attempts})")
133      sandbox = state["sandbox"]
134      if sandbox is None:
135          raise RuntimeError("Sandbox not initialized")
136  
137      execution = await sandbox.commands.run(command)
138      run_output = _format_execution(execution)
139      last_error = ""
140      next_command = command
141  
142      if execution.error:
143          last_error = f"{execution.error.name}: {execution.error.value}"
144          if attempt < max_attempts:
145              next_command = state.get("fallback_command", "python /tmp/math.py")
146              print(f"[run] Failed, scheduling fallback: {next_command}")
147  
148      print(f"[run] Output: {run_output}")
149  
150      return {
151          **state,
152          "run_output": run_output,
153          "last_error": last_error,
154          "attempt": attempt,
155          "command": next_command,
156      }
157  
158  
159  def decide_next(state: WorkflowState) -> str:
160      if state.get("last_error") and state["attempt"] < state["max_attempts"]:
161          print("[decide] Retry with fallback command")
162          return "run"
163  
164      print("[decide] Proceeding to inspect")
165      return "inspect"
166  
167  
168  async def inspect_results(state: WorkflowState) -> WorkflowState:
169      print("[inspect] Reading notes and summarizing")
170      sandbox = state["sandbox"]
171      if sandbox is None:
172          raise RuntimeError("Sandbox not initialized")
173  
174      notes = await sandbox.files.read_file("/tmp/notes.txt")
175      llm = _build_llm()
176      prompt = (
177          "Summarize the sandbox run result and notes in one sentence. "
178          f"Run output: {state.get('run_output', '')}. "
179          f"Notes: {notes.strip()}."
180      )
181      response = await llm.ainvoke(prompt)
182  
183      print(f"[inspect] Summary: {response.content}")
184  
185      return {**state, "summary": response.content}
186  
187  
188  async def cleanup_sandbox(state: WorkflowState) -> WorkflowState:
189      print("[cleanup] Cleaning up sandbox")
190      sandbox = state.get("sandbox")
191      if sandbox is not None:
192          await sandbox.kill()
193          await sandbox.close()
194  
195      print("[cleanup] Done")
196  
197      return {**state, "sandbox": None, "cleaned": True}
198  
199  
200  async def main() -> None:
201      graph = StateGraph(WorkflowState)
202      graph.add_node("create", create_sandbox)
203      graph.add_node("prepare", prepare_workspace)
204      graph.add_node("run", run_job)
205      graph.add_node("inspect", inspect_results)
206      graph.add_node("cleanup", cleanup_sandbox)
207      graph.set_entry_point("create")
208      graph.add_edge("create", "prepare")
209      graph.add_edge("prepare", "run")
210      graph.add_conditional_edges(
211          "run",
212          decide_next,
213          {
214              "run": "run",
215              "inspect": "inspect",
216          },
217      )
218      graph.add_edge("inspect", "cleanup")
219      graph.add_edge("cleanup", END)
220      app = graph.compile()
221  
222      initial_state = {
223          "sandbox": None,
224          "run_output": "",
225          "summary": "",
226          "last_error": "",
227          "attempt": 0,
228          "max_attempts": 2,
229          "command": "python3 /tmp/math.py",
230          "fallback_command": "python /tmp/math.py",
231          "cleaned": False,
232      }
233  
234      state = initial_state
235      try:
236          async for update in app.astream(initial_state, stream_mode="values"):
237              state = update
238      finally:
239          if not state.get("cleaned"):
240              sandbox = state.get("sandbox")
241              if sandbox is not None:
242                  await sandbox.kill()
243                  await sandbox.close()
244  
245      print(f"Run output: {state['run_output']}")
246      print(f"Summary: {state['summary']}")
247  
248  
249  if __name__ == "__main__":
250      import asyncio
251  
252      asyncio.run(main())