rpc.py
1 # This file is part of DarkFi (https://dark.fi) 2 # 3 # Copyright (C) 2020-2025 Dyne.org foundation 4 # 5 # This program is free software: you can redistribute it and/or modify 6 # it under the terms of the GNU Affero General Public License as 7 # published by the Free Software Foundation, either version 3 of the 8 # License, or (at your option) any later version. 9 # 10 # This program is distributed in the hope that it will be useful, 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 # GNU Affero General Public License for more details. 14 # 15 # You should have received a copy of the GNU Affero General Public License 16 # along with this program. If not, see <https://www.gnu.org/licenses/>. 17 18 import json 19 import random 20 import asyncio 21 22 class JsonRpc: 23 24 async def start(self, server, port): 25 reader, writer = await asyncio.open_connection(server, port, limit=1024 ** 3) 26 self.reader = reader 27 self.writer = writer 28 29 async def stop(self): 30 self.writer.close() 31 await self.writer.wait_closed() 32 33 async def _make_request(self, method, params): 34 ident = random.randint(0, 2**16) 35 #print(ident) 36 request = { 37 "jsonrpc": "2.0", 38 "method": method, 39 "params": params, 40 "id": ident, 41 } 42 43 message = json.dumps(request) + "\n" 44 self.writer.write(message.encode()) 45 await self.writer.drain() 46 47 data = await self.reader.readline() 48 message = data.decode().strip() 49 response = json.loads(message) 50 #print(response) 51 return response 52 53 async def _subscribe(self, method, params): 54 ident = random.randint(0, 2**16) 55 request = { 56 "jsonrpc": "2.0", 57 "method": method, 58 "params": params, 59 "id": ident, 60 } 61 62 message = json.dumps(request) + "\n" 63 self.writer.write(message.encode()) 64 await self.writer.drain() 65 #print("Subscribed") 66 67 async def ping(self): 68 return await self._make_request("ping", []) 69 70 async def deg_switch(self, state): 71 return await self._make_request("deg.switch", [state]) 72 73 async def deg_subscribe_events(self): 74 return await self._subscribe("deg.subscribe_events", [])