123456789101112131415161718192021222324252627282930313233343536373839404142 |
- # Tests client ability to query queues and read server queue messages.
- import time
- import flexitest
- SERV_QUEUE_NAME = "server:homeserv_notif"
- @flexitest.register
- class Test(flexitest.Test):
- def __init__(self, ctx: flexitest.InitContext):
- ctx.set_env("basic")
- def main(self, ctx: flexitest.RunContext):
- serv = ctx.get_service("serv")
- client = ctx.get_service("cli0")
- srpc = serv.create_rpc()
- crpc = client.create_rpc()
- # List queues.
- queues = crpc.aspc_list_device_queues()
- print("queues", queues)
- assert SERV_QUEUE_NAME in queues, "missing expected queue"
- # Sending a message.
- res = srpc.asphs_queue_server_msg("hello")
- print("hello id", res)
- res = srpc.asphs_queue_server_msg("world")
- print("world id", res)
- res = srpc.asphs_queue_server_msg("asdf")
- print("asdf id", res)
- time.sleep(0.1)
- # Make sure it arrived.
- res = crpc.aspc_get_server_messages()
- assert len(res) == 3, "unexpected number of server messages"
- assert res[0] == "hello", "unexpected msg contents (0)"
- assert res[1] == "world", "unexpected msg contents (1)"
- assert res[2] == "asdf", "unexpected msg contents (2)"
- print("server messages", res)
|