/ examples / stream_output_via_api.py
stream_output_via_api.py
  1  #!/usr/bin/env python
  2  # -*- coding: utf-8 -*-
  3  """
  4  @Time    : 2024/3/27 9:44
  5  @Author  : leiwu30
  6  @File    : stream_output_via_api.py
  7  @Description    : Stream log information and communicate over the network via web api.
  8  """
  9  import asyncio
 10  import json
 11  import socket
 12  import threading
 13  from contextvars import ContextVar
 14  
 15  from flask import Flask, Response, jsonify, request, send_from_directory
 16  
 17  from metagpt.const import TUTORIAL_PATH
 18  from metagpt.logs import logger, set_llm_stream_logfunc
 19  from metagpt.roles.tutorial_assistant import TutorialAssistant
 20  from metagpt.utils.stream_pipe import StreamPipe
 21  
 22  app = Flask(__name__)
 23  
 24  
 25  def stream_pipe_log(content):
 26      print(content, end="")
 27      stream_pipe = stream_pipe_var.get(None)
 28      if stream_pipe:
 29          stream_pipe.set_message(content)
 30  
 31  
 32  def write_tutorial(message):
 33      async def main(idea, stream_pipe):
 34          stream_pipe_var.set(stream_pipe)
 35          role = TutorialAssistant()
 36          await role.run(idea)
 37  
 38      def thread_run(idea: str, stream_pipe: StreamPipe = None):
 39          """
 40          Convert asynchronous function to thread function
 41          """
 42          asyncio.run(main(idea, stream_pipe))
 43  
 44      stream_pipe = StreamPipe()
 45      thread = threading.Thread(
 46          target=thread_run,
 47          args=(
 48              message["content"],
 49              stream_pipe,
 50          ),
 51      )
 52      thread.start()
 53  
 54      while thread.is_alive():
 55          msg = stream_pipe.get_message()
 56          yield stream_pipe.msg2stream(msg)
 57  
 58  
 59  @app.route("/v1/chat/completions", methods=["POST"])
 60  def completions():
 61      """
 62      data: {
 63          "model": "write_tutorial",
 64          "stream": true,
 65          "messages": [
 66              {
 67                  "role": "user",
 68                  "content": "Write a tutorial about MySQL"
 69              }
 70          ]
 71      }
 72      """
 73  
 74      data = json.loads(request.data)
 75      logger.info(json.dumps(data, indent=4, ensure_ascii=False))
 76  
 77      # Non-streaming interfaces are not supported yet
 78      stream_type = True if data.get("stream") else False
 79      if not stream_type:
 80          return jsonify({"status": 400, "msg": "Non-streaming requests are not supported, please use `stream=True`."})
 81  
 82      # Only accept the last user information
 83      # openai['model'] ~ MetaGPT['agent']
 84      last_message = data["messages"][-1]
 85      model = data["model"]
 86  
 87      # write_tutorial
 88      if model == "write_tutorial":
 89          return Response(write_tutorial(last_message), mimetype="text/plain")
 90      else:
 91          return jsonify({"status": 400, "msg": "No suitable agent found."})
 92  
 93  
 94  @app.route("/download/<path:filename>")
 95  def download_file(filename):
 96      return send_from_directory(TUTORIAL_PATH, filename, as_attachment=True)
 97  
 98  
 99  if __name__ == "__main__":
100      """
101      curl https://$server_address:$server_port/v1/chat/completions -X POST -d '{
102          "model": "write_tutorial",
103          "stream": true,
104          "messages": [
105            {
106                 "role": "user",
107                 "content": "Write a tutorial about MySQL"
108            }
109          ]
110      }'
111      """
112      server_port = 7860
113      server_address = socket.gethostbyname(socket.gethostname())
114  
115      set_llm_stream_logfunc(stream_pipe_log)
116      stream_pipe_var: ContextVar[StreamPipe] = ContextVar("stream_pipe")
117      app.run(port=server_port, host=server_address)