fn_cs_server_queue.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. # Tests client ability to query queues and read server queue messages.
  2. import time
  3. import flexitest
  4. SERV_QUEUE_NAME = "server:homeserv_notif"
  5. @flexitest.register
  6. class Test(flexitest.Test):
  7. def __init__(self, ctx: flexitest.InitContext):
  8. ctx.set_env("basic")
  9. def main(self, ctx: flexitest.RunContext):
  10. serv = ctx.get_service("serv")
  11. client = ctx.get_service("cli0")
  12. srpc = serv.create_rpc()
  13. crpc = client.create_rpc()
  14. # List queues.
  15. queues = crpc.aspc_list_device_queues()
  16. print("queues", queues)
  17. assert SERV_QUEUE_NAME in queues, "missing expected queue"
  18. # Sending a message.
  19. res = srpc.asphs_queue_server_msg("hello")
  20. print("hello id", res)
  21. res = srpc.asphs_queue_server_msg("world")
  22. print("world id", res)
  23. res = srpc.asphs_queue_server_msg("asdf")
  24. print("asdf id", res)
  25. time.sleep(0.1)
  26. # Make sure it arrived.
  27. res = crpc.aspc_get_server_messages()
  28. assert len(res) == 3, "unexpected number of server messages"
  29. assert res[0] == "hello", "unexpected msg contents (0)"
  30. assert res[1] == "world", "unexpected msg contents (1)"
  31. assert res[2] == "asdf", "unexpected msg contents (2)"
  32. print("server messages", res)