123456789101112131415161718192021222324252627282930313233343536373839 |
- import json
- from websockets.sync.client import connect as wsconnect
- class RpcError(Exception):
- def __init__(self, message: str):
- super().__init__(message)
- def _make_req_payload(method: str, rid: int, params) -> str:
- j = {"jsonrpc": "2.0", "method": method, "id": rid, "params": params}
- return json.dumps(j)
- def _send_ws_req(url: str, request: str) -> str:
- with wsconnect(url) as sock:
- sock.send(request)
- reply = sock.recv()
- return reply
- def _handle_reply(resp: str):
- j = json.loads(resp)
- if "error" in j:
- e = j["error"]
- raise RpcError(e["message"])
- return j["result"]
- class RpcClient:
- def __init__(self, url: str):
- self.url = url
- def call(self, method: str, args):
- payload = _make_req_payload(method, 1, args)
- resp = _send_ws_req(self.url, payload)
- return _handle_reply(resp)
- def __getattr__(self, name):
- def __call(*args):
- return self.call(name, args)
- return __call
|