/ bin / deg / src / rpc.py
rpc.py
 1  # This file is part of DarkFi (https://dark.fi)
 2  #
 3  # Copyright (C) 2020-2025 Dyne.org foundation
 4  #
 5  # This program is free software: you can redistribute it and/or modify
 6  # it under the terms of the GNU Affero General Public License as
 7  # published by the Free Software Foundation, either version 3 of the
 8  # License, or (at your option) any later version.
 9  #
10  # This program is distributed in the hope that it will be useful,
11  # but WITHOUT ANY WARRANTY; without even the implied warranty of
12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  # GNU Affero General Public License for more details.
14  #
15  # You should have received a copy of the GNU Affero General Public License
16  # along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  
18  import json
19  import random
20  import asyncio
21  
22  class JsonRpc:
23  
24      async def start(self, server, port):
25          reader, writer = await asyncio.open_connection(server, port, limit=1024 ** 3)
26          self.reader = reader
27          self.writer = writer
28  
29      async def stop(self):
30          self.writer.close()
31          await self.writer.wait_closed()
32  
33      async def _make_request(self, method, params):
34          ident = random.randint(0, 2**16)
35          #print(ident)
36          request = {
37              "jsonrpc": "2.0",
38              "method": method,
39              "params": params,
40              "id": ident,
41          }
42  
43          message = json.dumps(request) + "\n"
44          self.writer.write(message.encode())
45          await self.writer.drain()
46  
47          data = await self.reader.readline()
48          message = data.decode().strip()
49          response = json.loads(message)
50          #print(response)
51          return response
52  
53      async def _subscribe(self, method, params):
54          ident = random.randint(0, 2**16)
55          request = {
56              "jsonrpc": "2.0",
57              "method": method,
58              "params": params,
59              "id": ident,
60          }
61  
62          message = json.dumps(request) + "\n"
63          self.writer.write(message.encode())
64          await self.writer.drain()
65          #print("Subscribed")
66  
67      async def ping(self):
68          return await self._make_request("ping", [])
69  
70      async def deg_switch(self, state):
71          return await self._make_request("deg.switch", [state])
72  
73      async def deg_subscribe_events(self):
74          return await self._subscribe("deg.subscribe_events", [])