stream_output_via_api.py
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 """ 4 @Time : 2024/3/27 9:44 5 @Author : leiwu30 6 @File : stream_output_via_api.py 7 @Description : Stream log information and communicate over the network via web api. 8 """ 9 import asyncio 10 import json 11 import socket 12 import threading 13 from contextvars import ContextVar 14 15 from flask import Flask, Response, jsonify, request, send_from_directory 16 17 from metagpt.const import TUTORIAL_PATH 18 from metagpt.logs import logger, set_llm_stream_logfunc 19 from metagpt.roles.tutorial_assistant import TutorialAssistant 20 from metagpt.utils.stream_pipe import StreamPipe 21 22 app = Flask(__name__) 23 24 25 def stream_pipe_log(content): 26 print(content, end="") 27 stream_pipe = stream_pipe_var.get(None) 28 if stream_pipe: 29 stream_pipe.set_message(content) 30 31 32 def write_tutorial(message): 33 async def main(idea, stream_pipe): 34 stream_pipe_var.set(stream_pipe) 35 role = TutorialAssistant() 36 await role.run(idea) 37 38 def thread_run(idea: str, stream_pipe: StreamPipe = None): 39 """ 40 Convert asynchronous function to thread function 41 """ 42 asyncio.run(main(idea, stream_pipe)) 43 44 stream_pipe = StreamPipe() 45 thread = threading.Thread( 46 target=thread_run, 47 args=( 48 message["content"], 49 stream_pipe, 50 ), 51 ) 52 thread.start() 53 54 while thread.is_alive(): 55 msg = stream_pipe.get_message() 56 yield stream_pipe.msg2stream(msg) 57 58 59 @app.route("/v1/chat/completions", methods=["POST"]) 60 def completions(): 61 """ 62 data: { 63 "model": "write_tutorial", 64 "stream": true, 65 "messages": [ 66 { 67 "role": "user", 68 "content": "Write a tutorial about MySQL" 69 } 70 ] 71 } 72 """ 73 74 data = json.loads(request.data) 75 logger.info(json.dumps(data, indent=4, ensure_ascii=False)) 76 77 # Non-streaming interfaces are not supported yet 78 stream_type = True if data.get("stream") else False 79 if not stream_type: 80 return jsonify({"status": 400, "msg": "Non-streaming requests are not supported, please use `stream=True`."}) 81 82 # Only accept the last user information 83 # openai['model'] ~ MetaGPT['agent'] 84 last_message = data["messages"][-1] 85 model = data["model"] 86 87 # write_tutorial 88 if model == "write_tutorial": 89 return Response(write_tutorial(last_message), mimetype="text/plain") 90 else: 91 return jsonify({"status": 400, "msg": "No suitable agent found."}) 92 93 94 @app.route("/download/<path:filename>") 95 def download_file(filename): 96 return send_from_directory(TUTORIAL_PATH, filename, as_attachment=True) 97 98 99 if __name__ == "__main__": 100 """ 101 curl https://$server_address:$server_port/v1/chat/completions -X POST -d '{ 102 "model": "write_tutorial", 103 "stream": true, 104 "messages": [ 105 { 106 "role": "user", 107 "content": "Write a tutorial about MySQL" 108 } 109 ] 110 }' 111 """ 112 server_port = 7860 113 server_address = socket.gethostbyname(socket.gethostname()) 114 115 set_llm_stream_logfunc(stream_pipe_log) 116 stream_pipe_var: ContextVar[StreamPipe] = ContextVar("stream_pipe") 117 app.run(port=server_port, host=server_address)