Networking mindset hint: try to think of messages as if they where UDP-datagrams, supervisors as different network IP-addresses (which might or might not belong to the same host), and actors as an opened ports (or as endpoints, i.e. as IP-address:port).
While request-response approach is widely know, it has it's own specific on the actor-model:
-# the response (message) arrives asynchronously and there is need to match the original request (message) -# the response might not arrive at all (e.g. an actor is down)
The first issue is solved in rotor via including full original message (intrusive pointer) into response (message). This also means, that the receiver (the "server") replies not to the message with the original user-defined payload, but slightly enreached one; the same relates to the response (the "client" side).
The second issues is solved via spawning a timer. Obviously, that the timer should be spawned on the client-side. In the case of timeout, the client-side should receive the response message with the timeout error (and if the response arrives in a moment later it should be discarded). All the underhood meachanics is performed by supervisor, and there is a need of generic request/response matching, which can be done by introducing some synthetic message id per request. Hence, the request can't be just original user-defined payload, it's needed to be enriched a little bit to.
rotor
provides support for the request-response pattern.
First, you need to define your payloads in the request and response messages, linking the both types
namespace payload {
struct my_response_t {
// my data fields
};
struct my_request_t {
using response_t = my_response_t;
// my data fields
}
};
Second, you need to wrap them to let rotor
knows that this is request/response pair:
namespace message {
using request_t = rotor::request_traits_t<payload::my_request_t>::request::message_t;
using response_t = rotor::request_traits_t<payload::my_request_tt>::response::message_t;
}
Third, on the client side, the request
method should be used (or request_via
if
the answer is expected on the non-default address) and a bit specific access to
the user defined payload should be used, i.e.
struct client_actor_t : public r::actor_base_t {
r::address_ptr_t server_addr;
void init_start() noexcept override {
subscribe(&client_actor_t::on_reply);
r::actor_base_t::init_start();
}
void on_start(r::message::start_trigger_t &msg) noexcept override {
auto timeout = r::pt::milliseconds{10};
request<payload::my_request_t>(server_addr /*, fields-forwaded-for-request-payload */)
.send(timeout);
}
void on_reply(message::response_t& msg) noexcept override {
if (msg.payload.ec) {
// react somehow to the error, i.e. timeout
return;
}
auto& req = msg.payload.req->payload; // original request payload
auto& res = msg.payload.res; // original response payload
}
}
Forth, on the server side the reply_to
or reply_with_error
methods should be used, i.e.:
struct server_actor_t : public r::actor_base_t {
r::address_ptr_t server_addr;
void init_start() noexcept override {
subscribe(&server_actor_t::on_request);
r::actor_base_t::init_start();
}
void on_request(message::request_t& msg) noexcept override {
auto& req = msg.payload.request_payload; // original request payload
if (some_condition) {
reply_to(msg, /*, fields-forwaded-for-response-payload */);
return;
}
std::eror_code ec = /* .. make somehow app-specific error code */;
reply_with_error(msg, ec);
}
}
However, the story does not end here. As you might already guess, the response
message arrives to the client supervisor first, where it might be discarded
(if timeout timer already triggered), or it migth be delivered further to the client.
As the rotor
library should not modify the user-defined message at will,
the new response message is created via copying the original one. As this
might be not desirable, rotor is able to handle that: instead of copying
the content, the intrusive pointer to it can be created, i.e.
namespace payload {
struct my_response_t: r::arc_base_t<my_response_t> { // intrusive pointer support
// my data fields
explicit my_response_t(int value_) { ... } // the constructor must be provided
virtual ~my_response_t() {} // the virtual destructor must be provided
};
struct my_request_t {
using response_t = r::intrusive_ptr_t<my_response_t>; // that's also changed
// my data fields
}
};
That's way responses, with heavy to- copy payload might be created.
See examples/boost-asio/request-response.cpp
as the example.
A message
is delivered to address
, independently of subscriber or subscribers,
i.e. to one address
there can subscribed many actors, as well as messages
can be send from multiple sources to the same address
.
It should be noted, that an message delivery order is source-actor sequenced, so it is wrong assumption that the same message will be delivered simultaneously to different subscribers (actors), if they belong to different supervisors/threads. Never assume that, nor assume that the message will be delivered with some guaranteed timeframe.
Technically in rotor
it is implemented the following way: address
is produced
by some supervisor
. The sent to the addres message it is processed by
the supervisor: if the actor-subscriber is local (i.e. created on the supervisor
),
then the message is delivered immediately to it, othewise the message is wrapped
and forwarded to the supervisor of the actor (i.e. to some foreign supervisor),
and then it is unwrapped and delivered to the actor.
Each actor
has it's own address
. Due to MPMC-feature above it is possible that
first actor will receive messages for processing, and some other actor ( foreign
actor) is able to subscribe to the same kind of messages and observe them (with some
latency). It is possible observe even rotor
"internal" messages, which are
part of the API. In other words it is possible to do something like:
namespace r = rotor;
struct observer_t: public r::actor_base_t {
r::address_ptr_t observable;
void set_observable(r::address_ptr_t addr) { observable = std::move(addr); }
void init_start() noexcept override {
subscribe(&observer_t::on_target_initialize, observable);
subscribe(&observer_t::on_target_start, observable);
subscribe(&observer_t::on_target_shutdown, observable);
r::actor_base_t::init_start();
}
void on_target_initialize(r::message_t<r::payload::initialize_actor_t> &msg) noexcept {
// ...
}
void on_target_start(r::message_t<r::payload::start_actor_t> &) noexcept {
// ...
}
void on_target_shutdown(r::message_t<r::payload::shutdown_request_t> &) noexcept {
// ...
}
};
int main() {
...
auto observer = sup->create_actor<observer_t>();
auto target_actor = sup->create_actor<...>();
observer->set_observable(sample_actor->get_address());
...
}
It should noted, that subscription request is regular rotor
message, i.e. sequence
of arrival of messages is undefined as soon as they are generated in different places;
hence, an observer might be subscired too late, while the original
messages has already been delivered to original recipient and the observer "misses" the
message. See the pattern below how to synronize actors.
The distinguish of foreign and non-foreign actors or MPMC pattern is completely
architectural and application specific, i.e. whether it is known apriori that
there are multiple subscribers (MPMC) or single subsciber and other subscribes
are are hidden from the original message flow. There is no difference between them
at the rotor
core, i.e.
// MPMC: an address is shared between actors
auto dest1 = supervisor->make_address();
auto actor_a = sup->create_actor<...>();
auto actor_b = sup->create_actor<...>();
actor_a->set_destination(dest1);
actor_b->set_destination(dest1);
// observer: actor_c own address is exposed for actor_d
auto actor_c = sup->create_actor<...>();
auto actor_d = sup->create_actor<...>();
actor_d->set_c_addr(actor_c->get_address());
Of course, actors can dynamically subscribe/unsubscribe from address at runtime
Every actor has it's "main" address; however it is possible for it to have multiple. This makes it available to have "inside actor" routing, or polymorphism. This is useful when the same type of messages arrive in response to different queries.
For example, let's assume that there is an "http-actor", which is able to "execute" http requests in generic way and return back the replies. If there is a SOAP/WSDL -webservice, the first query will be "get list of serices", and the second query will be "execute an action X". The both responses will be HTTP-replies.
Something like the following can be done:
struct client_t: public r::actor_base_t {
r::address_ptr_t http_client;
r::address_ptr_t wsdl_addr;
r::address_ptr_t action_addr;
void init_start() noexcept override {
...
wsdl_addr = create_address();
action_addr = create_address();
subscribe(&client_t::on_wsdl, wsdl_addr);
subscribe(&client_t::on_action, action_addr);
}
void on_wsdl(http_message_t& msg) noexcept {
...
auto timeout = r::pt::seconds(1);
request_via<htt::request_t>(http_client, action_addr, /* request params */ ).send(timeout);
}
void on_action(http_message_t& msg) noexcept {
...
}
void on_a_start(r::message_t<r::payload::start_actor_t> &msg) noexcept override {
...
auto timeout = r::pt::seconds(1);
request_via<htt::request_t>(http_client, wsdl_addr, /* request params */ ).send(timeout);
}
}
There is a known get-actor-address problem: how one actor should know the address of the other actor? Well known way is to carefully craft initialization taking addresses of just created actors and pass them in constructor to the other actors etc. The approach will work in the certain circumstances; however it leads to boilerplate and fragile code, which "smells bad", as some initialization is performed inside actors and some is outside; it also does not handles well the case of dynamic (virtual) addresses.
The better solution is to have "the registry" actor, known to all other actors. Each actor, which provides some services registers it's main or virtual address in the registry via some application-known string names; upon the termination it undoes the registration. Each "client-actor" asks for the predefined service point by it's name in the actor initialization phase; once all addresses for the needed services are found, the initialization can continue and the actor then becomes "operational".
Since the registry does not perform any I/O and can be implemented in loop-agnostic
way, it was included in rotor
since v0.06
.
Let's assume there are two actors, which need to communicate:
namespace r = rotor;
struct payload{};
struct actor_A_t: public r::actor_base_t {
void on_start(r::message_t<r::payload::start_actor_t> &msg) noexcept override {
r::actor_base_t::on_start(msg);
subscribe(&actor_A_t::on_message);
}
void on_message(r::message_t<payload> &msg) noexcept {
//processing logic is here
}
};
struct actor_B_t : public r::actor_base_t {
void set_target_addr(const r::address_ptr_t &addr) { target_addr = addr; }
void on_start(r::message_t<r::payload::start_actor_t> &msg) noexcept override {
r::actor_base_t::on_start(msg);
send<payload>(target_addr);
}
r::address_ptr_t target_addr;
};
int main() {
...;
auto supervisor = ...;
auto actor_a = supervisor->create_actor<actor_A_t>();
auto actor_b = supervisor->create_actor<actor_B_t>();
actor_b->set_target_addr(actor_b->get_address());
supervisor->start();
...;
};
However here is a problem: the message delivery order is source-actor sequenced,
it migth happen actor_b
started be before actor_a
, and the message with payload
will be lost.
The following trick is possible:
struct actor_A_t: public r::actor_base_t {
// instead of on_start
void init_start() noexcept override {
subscribe(&actor_A_t::on_message);
r::actor_base_t::init_start()
}
}
or even that way:
struct actor_A_t: public r::actor_base_t {
// instead of on_start / on_initialize
void do_initialize(r::system_context_t* ctx) noexcept override {
r::actor_base_t::do_initialize(ctx);
subscribe(&actor_A_t::on_message);
}
}
That tricky way will definitely work under certain circumstances, i.e. when actors are created sequentially and they use the same supervisor etc.; however in general case there will be unavoidable race, and the approach will not work when different supervisors / event loops are used, or when some I/O is involved in the scheme (i.e. it needed to establish connection before subscription). This is not networking mindset neither.
The more robust approach is to start actor_b
as usual, observe on_start
event
from on_initialize
and poll (request) the actor_a
status. Then, actor_b
will either
first receive the on_start
event from actor_a
, which means that actor_a
is ready,
or it will receive r::message::state_response_t
and further analysis
should be checked (i.e. if status is initialized
or started
etc.).
Further, if it is desirable to scale this pattern, then actor_b
should not even start
unless actor_a
is started, then actor_b
should suspend it's init_start
message. The following code demonstrates this approach:
struct actor_A_t: public r::actor_base_t {
// we need to be ready to accept messages, when on_start message arrives
void init_start() noexcept override {
subscribe(&actor_A_t::on_message);
r::actor_base_t::init_start()
}
}
struct actor_B_t : public r::actor_base_t {
r::message_t<r::payload::initialize_actor_t> init_message;
void init_start() noexcept override {
// we are not finished initialization:
// r::actor_base_t::init_start();
subscribe(&actor_B_t::on_a_state);
subscribe(&actor_B_t::on_a_start, target_addr);
poll_a_state();
}
void poll_a_state() noexcept {
auto& sup_addr = target_addr->supervisor.get_address();
auto reply_addr = get_address();
// ask actor_a supervisor about actor_a state, and deliver reply back to me
auto timeout = r::pt::seconds{1};
request<r::payload::state_request_t>(sup_addr, target_addr).send(timeout);
}
void finish_init() noexcept {
r::actor_base_t::init_start()
unsubscribe(&actor_B_t::on_a_state); // optional
unsubscribe(&actor_B_t::on_a_start, target_addr); // optional
}
void on_a_state(r::message::state_response_t &msg) noexcept {
if (state == r::state_t::INITIALIZED) {
return; // we are already initialized
}
if (msg.payload.ec) {
return do_shutdown(); // something bad happen
}
auto target_state = msg.payload.res.state;
if (state == r::state_t::OPERATIONAL) {
finish_init();
} else {
poll_a_state();
}
}
void on_a_start(r::message_t<r::payload::start_actor_t> &msg) noexcept override {
if (init_message) {
finish_init();
}
}
}
We covered some cases, i.e. when there is no any actor is listening on the address, or
when the supervisor does not replies with the certain timeframe - in that cases
the response_t
will contain error code with timeout. Howevere, a few things
still need to be done: it should be prevented to from infinite polling. To do that
it should use some finite couter; if all attemps failed, then initiate actor_B
shutdown.
The sample does not cover the case, when actor_A
decided to shutdown,
the actor_B
should be notified and take appropriate actions. As it seems
generic pattern (i.e. actor_B
uses actor_A
as a client), this pattern,
probably, will be supported in the future version of rotor
core.
sobjectizer ships with build-in message box protection, i.e. when inbound message queue hits certain threshold an predefined action can be performed: an message can be silently dropped (the newest one), it can be transformed to some other kind of message, or actor or application can be shutted down etc.
In rotor
there is no "inbound" queue, and the sobjectizer's approach is
not flexible enough: the overloading not always measured in number of
unprocessed messages, it can be measured in time for processing single message.
For example, there is a queue of request to compute Nth-prime number. If the
N lies within 1000, then queue size of 1000 messages is probably OK; however
if there is an request to compute 10_000_000-th prime number an actor will
certainly be overloaded.
There can be at least two approaches, depending how fast the reaction to overload
should be triggered. In the simplest case, when there is no timeframe guarantee
for overload reaction, it can be do as the following: an custom supervisor
shoud be written, messages to protected supervisor should be delivered not
immediately, but with some delay (i.e. loop->postone([&](supervisor->do_process())
)
and before message delivery to the actor the queue size (or other criteria for
overloading condition) should be checked, then overload-reaction should be performed.
Another approach will be write an front-actor, which will run on dedicated supervisor / thread. The actor will forward requests to protected worker-actor, if the worker-actor answers within certain timeframe, or immediately react with overload action. This will work, if the request-message, contains reply address, which will be remembered and overwritten by front-actor, before forwaring the message to worker-actor, and in the reply-message the address might be needed to be overwritten too. The strategy can be extended to use several workers, and, hence, provide application-specific load balancing.
This is not yet started, however a lot of building blocks for networking are already here: the location transparency, message passing and reactiveness are here. The missing blocks are: service discovery, handshake, and message serialization.
The final goal is: the send<payload>(destination_address, args...)
should
send the message to some local destination_address
, which is the representative
of some remote peer actor address, where the addresses will be NAT-ed and message
will be serialized and transferred over the wire to remote host, where it (request)
will be deserialized, processed and replied back and reverse procedure will
happen.
Whilst the actual network transmission cannot implemented in a event loop agnostic
way, I think the abovementioned protocol seems quite an loop independent.
This is the area of further rotor
research & development.