rpc.py 1023 B

123456789101112131415161718192021222324252627282930313233343536373839
  1. import json
  2. from websockets.sync.client import connect as wsconnect
  3. class RpcError(Exception):
  4. def __init__(self, message: str):
  5. super().__init__(message)
  6. def _make_req_payload(method: str, rid: int, params) -> str:
  7. j = {"jsonrpc": "2.0", "method": method, "id": rid, "params": params}
  8. return json.dumps(j)
  9. def _send_ws_req(url: str, request: str) -> str:
  10. with wsconnect(url) as sock:
  11. sock.send(request)
  12. reply = sock.recv()
  13. return reply
  14. def _handle_reply(resp: str):
  15. j = json.loads(resp)
  16. if "error" in j:
  17. e = j["error"]
  18. raise RpcError(e["message"])
  19. return j["result"]
  20. class RpcClient:
  21. def __init__(self, url: str):
  22. self.url = url
  23. def call(self, method: str, args):
  24. payload = _make_req_payload(method, 1, args)
  25. resp = _send_ws_req(self.url, payload)
  26. return _handle_reply(resp)
  27. def __getattr__(self, name):
  28. def __call(*args):
  29. return self.call(name, args)
  30. return __call