beast-scrapper.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. //
  2. // Copyright (c) 2019 Ivan Baidakou (basiliscos) (the dot dmol at gmail dot com)
  3. //
  4. // Distributed under the MIT Software License
  5. //
  6. /*
  7. The current example in some extent mimics curl usage example in CAF
  8. ( https://github.com/actor-framework/actor-framework/blob/master/examples/curl/curl_fuse.cpp ),
  9. but instead of CAF + curl pair, the rotor + bease pair is used.
  10. There is a single client (can be multiple), there is a single http-manager (supervisor),
  11. which holds a pool of multiple http-workers.
  12. client -> (timerC_1) request_1 -> http-manager -> (timerM_1) request_1 -> http-worker_1
  13. (timerC_2) request_2 -> -> (timerM_2) request_1 -> http-worker_2
  14. ... ... ...
  15. (timerC_n) request_n -> -> (timerM_n) request_1 -> http-worker_n
  16. 1. The client makes an request, which contains the URL of the remote resource, and the buffer
  17. where the parsed reply is put.
  18. 2. The reply is intrusive pointer to the real reply, i.e. to avoid unnecessary copying.
  19. 3. The client makes as many requests as it needs, and the manager just for forwards them
  20. to free http-workers. The amounts of simultaneous requests and http-workers are "coordinated",
  21. as well as MAX_FAILURES = 1. If it is not desirable, it can be impoved:
  22. 3.1. If MAX_FAILURES != 1, then it might be the case when timeout timer just has triggered
  23. on the client, but corresponding timer wasn't triggered on the manager. In the current code
  24. there is just an assert, but the situation where client asks more then http-manager is
  25. capable to serve can be handled, i.e. via queueing.
  26. 3.2. If the pool size and client's simultaneous requests are uncoordinated, then http-manager
  27. can queue requests. However, some *back-pressure* mechanisms should be imposed into http-manager
  28. to prevent the queue to grow infinitely.
  29. 3.3. The http-requests are stateless (i.e. no http/1.1). This cannot be improved whitout
  30. internal protocol change, as the socket should not be closed, the same http-client
  31. should continue serving the requests on the same host from the client etc.
  32. 3.4. The http-requests do not contain headers/cookies etc. It can be improved via additional
  33. fields in the http-requests.
  34. 3.5. There is no cancellation facilities. Again, the protocol should be changed to support
  35. that: client have to know *what* to cancel (i.e. some work_id/request_id should be returned
  36. immediately from http-worker to http-client). As the result in the current code the
  37. client *have to wait* until all requests will be finished either successfully or via
  38. timeout triggering. The noticible delay in shutdown can be observed, and it is described
  39. here.
  40. 4. The care should be taken to properly shut down the application: only when there is
  41. no I/O activities from client perspective, the shut down is initiated.
  42. 5. There are 2 timers per request: the 1st one validates manager's responsibilities
  43. on client, the 2nd one validates worker's responsibilities on manager.
  44. Technically, there could be only one timer per request, however, the main question
  45. is how reliable do you treat your actors. If they are unreliable (i.e. might have
  46. bugs etc.), then there should be 2 timers. On the later stage of the development,
  47. it might be switched to one timer per request if reliability has been proven
  48. and it is desirable to get rid of additional timer from performance point of view.
  49. 6. There should be no crashes, no memory leaks
  50. The output sample for the localhost is:
  51. ./beast-scrapper --workers_count=50 --timeout=5000 --max_requests=50000 --url=http://127.0.0.1:80/index.html
  52. using 50 workers for 127.0.0.1:80/index.html, timeout: 00:00:05
  53. starting shutdown
  54. client_t::shutdown_start
  55. client_t::shutdown_finish, stats: 50000/50000 requests, in 2.96149s, rps = 16883.4
  56. client_t::shutdown_finish
  57. http_manager_t::shutdown_finish()
  58. */
  59. #include "rotor.hpp"
  60. #include "rotor/asio.hpp"
  61. #include <iostream>
  62. #include <chrono>
  63. #include <regex>
  64. #include <unordered_set>
  65. #include <unordered_map>
  66. #include <memory>
  67. #include <boost/program_options.hpp>
  68. #include <boost/beast/core.hpp>
  69. #include <boost/beast/http.hpp>
  70. #include <boost/beast/version.hpp>
  71. #include <boost/lexical_cast.hpp>
  72. namespace po = boost::program_options;
  73. namespace http = boost::beast::http;
  74. namespace sys = boost::system;
  75. namespace asio = boost::asio;
  76. namespace pt = boost::posix_time;
  77. namespace ra = rotor::asio;
  78. namespace r = rotor;
  79. using tcp = asio::ip::tcp;
  80. struct URL {
  81. std::string host;
  82. std::string port;
  83. std::string path;
  84. };
  85. using raw_http_response_t = http::response<http::string_body>;
  86. constexpr const std::uint32_t RX_BUFF_SZ = 10 * 1024;
  87. constexpr const std::uint32_t MAX_HTTP_FAILURES = 1;
  88. namespace payload {
  89. struct http_response_t : public r::arc_base_t<http_response_t> {
  90. raw_http_response_t response;
  91. std::size_t bytes;
  92. http_response_t(raw_http_response_t &&response_, std::size_t bytes_)
  93. : response(std::move(response_)), bytes{bytes_} {}
  94. };
  95. struct http_request_t : public r::arc_base_t<http_response_t> {
  96. using rx_buff_t = boost::beast::flat_buffer;
  97. using rx_buff_ptr_t = std::shared_ptr<rx_buff_t>;
  98. using duration_t = r::pt::time_duration;
  99. using response_t = r::intrusive_ptr_t<http_response_t>;
  100. http_request_t(const URL &url_, rx_buff_ptr_t rx_buff_, std::size_t rx_buff_size_, duration_t timeout_)
  101. : url{url_}, rx_buff{rx_buff_}, rx_buff_size{rx_buff_size_}, timeout{timeout_} {}
  102. URL url;
  103. rx_buff_ptr_t rx_buff;
  104. std::size_t rx_buff_size;
  105. duration_t timeout;
  106. };
  107. } // namespace payload
  108. namespace message {
  109. using http_request_t = r::request_traits_t<payload::http_request_t>::request::message_t;
  110. using http_response_t = r::request_traits_t<payload::http_request_t>::response::message_t;
  111. } // namespace message
  112. static_assert(r::details::is_constructible_v<payload::http_response_t, raw_http_response_t, std::size_t>, "zzz");
  113. static_assert(std::is_constructible_v<payload::http_response_t, raw_http_response_t, std::size_t>, "zzz");
  114. struct http_worker_t : public r::actor_base_t {
  115. using r::actor_base_t::actor_base_t;
  116. using request_ptr_t = r::intrusive_ptr_t<message::http_request_t>;
  117. using tcp_socket_ptr_t = std::unique_ptr<tcp::socket>;
  118. using resolve_results_t = tcp::resolver::results_type;
  119. using resolve_it_t = resolve_results_t::iterator;
  120. explicit http_worker_t(r::supervisor_t &sup)
  121. : r::actor_base_t{sup}, strand{static_cast<ra::supervisor_asio_t &>(sup).get_strand()}, resolver{
  122. strand.context()} {}
  123. inline asio::io_context::strand &get_strand() noexcept { return strand; }
  124. void init_start() noexcept override {
  125. subscribe(&http_worker_t::on_request);
  126. rotor::actor_base_t::init_start();
  127. }
  128. void init_finish() noexcept override { init_request.reset(); }
  129. bool try_shutdown() {
  130. if (shutdown_request) {
  131. if (resolver_active) {
  132. resolver.cancel();
  133. } else if (sock) {
  134. sys::error_code ec;
  135. sock->cancel(ec);
  136. assert(!ec);
  137. sock->close(ec);
  138. assert(!ec);
  139. } else {
  140. r::actor_base_t::shutdown_start();
  141. }
  142. return true;
  143. }
  144. return false;
  145. }
  146. void shutdown_start() noexcept override { try_shutdown(); }
  147. void on_request(message::http_request_t &req) noexcept {
  148. assert(!orig_req);
  149. assert(!shutdown_request);
  150. orig_req.reset(&req);
  151. response.clear();
  152. conditional_start();
  153. }
  154. void conditional_start() noexcept {
  155. if (sock) {
  156. auto self = r::intrusive_ptr_t<http_worker_t>(this);
  157. asio::defer(strand, [self = self]() {
  158. self->conditional_start();
  159. self->get_supervisor().do_process();
  160. });
  161. }
  162. start_request();
  163. }
  164. void start_request() noexcept {
  165. auto &url = orig_req->payload.request_payload.url;
  166. auto fwd = ra::forwarder_t(*this, &http_worker_t::on_resolve, &http_worker_t::on_resolve_error);
  167. resolver.async_resolve(url.host, url.port, std::move(fwd));
  168. resolver_active = true;
  169. }
  170. void request_fail(const sys::error_code &ec) noexcept {
  171. reply_with_error(*orig_req, ec);
  172. orig_req.reset();
  173. sock.reset();
  174. }
  175. void on_resolve_error(const sys::error_code &ec) noexcept {
  176. resolver_active = false;
  177. request_fail(ec);
  178. try_shutdown();
  179. }
  180. void on_resolve(resolve_results_t results) noexcept {
  181. resolver_active = false;
  182. if (try_shutdown()) {
  183. return;
  184. }
  185. sock = std::make_unique<tcp::socket>(strand.context());
  186. sock->open(tcp::v4());
  187. auto fwd = ra::forwarder_t(*this, &http_worker_t::on_connect, &http_worker_t::on_tcp_error);
  188. asio::async_connect(*sock, results.begin(), results.end(), std::move(fwd));
  189. }
  190. void on_tcp_error(const sys::error_code &ec) noexcept {
  191. request_fail(ec);
  192. try_shutdown();
  193. }
  194. void on_connect(resolve_it_t) noexcept {
  195. auto &url = orig_req->payload.request_payload.url;
  196. if (try_shutdown()) {
  197. return;
  198. }
  199. request.method(http::verb::get);
  200. request.version(11);
  201. request.target(url.path);
  202. request.set(http::field::host, url.host);
  203. request.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
  204. auto fwd = ra::forwarder_t(*this, &http_worker_t::on_request_sent, &http_worker_t::on_tcp_error);
  205. http::async_write(*sock, request, std::move(fwd));
  206. }
  207. void on_request_sent(std::size_t /* bytes */) noexcept {
  208. if (try_shutdown())
  209. return;
  210. auto fwd = ra::forwarder_t(*this, &http_worker_t::on_request_read, &http_worker_t::on_tcp_error);
  211. auto &rx_buff = orig_req->payload.request_payload.rx_buff;
  212. rx_buff->prepare(orig_req->payload.request_payload.rx_buff_size);
  213. http::async_read(*sock, *rx_buff, response, std::move(fwd));
  214. }
  215. void on_request_read(std::size_t bytes) noexcept {
  216. reply_to(*orig_req, std::move(response), bytes);
  217. orig_req.reset();
  218. sock->cancel();
  219. sock->close();
  220. sock.reset();
  221. try_shutdown();
  222. }
  223. asio::io_context::strand &strand;
  224. tcp::resolver resolver;
  225. tcp_socket_ptr_t sock;
  226. request_ptr_t orig_req;
  227. http::request<http::empty_body> request;
  228. http::response<http::string_body> response;
  229. bool resolver_active = false;
  230. };
  231. struct http_manager_config_t : public ra::supervisor_config_asio_t {
  232. using strand_ptr_t = ra::supervisor_config_asio_t::strand_ptr_t;
  233. std::size_t worker_count;
  234. r::pt::time_duration init_timeout;
  235. http_manager_config_t(std::size_t worker_count_, const r::pt::time_duration &init_timeout_,
  236. const r::pt::time_duration &shutdown_duration_, strand_ptr_t srtand_)
  237. : ra::supervisor_config_asio_t{shutdown_duration_, srtand_}, worker_count{worker_count_}, init_timeout{
  238. init_timeout_} {}
  239. };
  240. struct http_manager_t : public ra::supervisor_asio_t {
  241. using workers_set_t = std::unordered_set<r::address_ptr_t>;
  242. using request_id = r::supervisor_t::timer_id_t;
  243. using request_ptr_t = r::intrusive_ptr_t<message::http_request_t>;
  244. using req_mapping_t = std::unordered_map<request_id, request_ptr_t>;
  245. http_manager_t(supervisor_t *parent_, const http_manager_config_t &config_)
  246. : ra::supervisor_asio_t{parent_, config_} {
  247. worker_count = config_.worker_count;
  248. init_timeout = config_.init_timeout;
  249. }
  250. void init_start() noexcept override {
  251. for (std::size_t i = 0; i < worker_count; ++i) {
  252. auto addr = create_actor<http_worker_t>(init_timeout)->get_address();
  253. workers.emplace(std::move(addr));
  254. }
  255. subscribe(&http_manager_t::on_request);
  256. subscribe(&http_manager_t::on_response);
  257. ra::supervisor_asio_t::init_start();
  258. }
  259. void shutdown_finish() noexcept override {
  260. ra::supervisor_asio_t::shutdown_finish();
  261. std::cerr << "http_manager_t::shutdown_finish()\n";
  262. }
  263. void on_request(message::http_request_t &req) noexcept {
  264. auto it = workers.begin();
  265. if (it == workers.end()) {
  266. std::abort();
  267. }
  268. auto worker_addr = *it;
  269. workers.erase(it);
  270. auto &payload = req.payload.request_payload;
  271. auto request_id = request<payload::http_request_t>(worker_addr, payload).send(payload.timeout);
  272. req_mapping.emplace(request_id, &req);
  273. }
  274. void on_response(message::http_response_t &res) noexcept {
  275. auto it = req_mapping.find(res.payload.request_id());
  276. auto worker_addr = res.payload.req->address;
  277. workers.emplace(std::move(worker_addr));
  278. reply_to(*it->second, res.payload.ec, std::move(res.payload.res));
  279. req_mapping.erase(it);
  280. }
  281. std::size_t worker_count;
  282. r::pt::time_duration init_timeout;
  283. workers_set_t workers;
  284. req_mapping_t req_mapping;
  285. };
  286. struct client_t : r::actor_base_t {
  287. using r::actor_base_t::actor_base_t;
  288. using timepoint_t = std::chrono::time_point<std::chrono::high_resolution_clock>;
  289. void init_start() noexcept override {
  290. subscribe(&client_t::on_status);
  291. subscribe(&client_t::on_response);
  292. poll_status();
  293. }
  294. void shutdown_start() noexcept override { r::actor_base_t::shutdown_start(); }
  295. void shutdown_finish() noexcept override {
  296. auto end = std::chrono::high_resolution_clock::now();
  297. std::chrono::duration<double> diff = end - start;
  298. r::actor_base_t::shutdown_finish();
  299. double rps = success_requests / diff.count();
  300. std::cerr << "client_t::shutdown_finish, stats: " << success_requests << "/" << total_requests
  301. << " requests, in " << diff.count() << "s, rps = " << rps << "\n";
  302. send<r::payload::shutdown_trigger_t>(supervisor.get_address(), manager_addr);
  303. manager_addr.reset();
  304. supervisor.do_shutdown(); /* trigger all system to shutdown */
  305. }
  306. void poll_status() noexcept {
  307. if (poll_attempts < 3) {
  308. request<r::payload::state_request_t>(manager_addr, manager_addr).send(timeout);
  309. ++poll_attempts;
  310. } else {
  311. supervisor.do_shutdown();
  312. }
  313. }
  314. void on_status(r::message::state_response_t &msg) noexcept {
  315. if (msg.payload.ec) {
  316. return supervisor.do_shutdown();
  317. }
  318. if (msg.payload.res.state != r::state_t::OPERATIONAL) {
  319. poll_status();
  320. } else {
  321. r::actor_base_t::init_start();
  322. }
  323. }
  324. void make_request() noexcept {
  325. if (!shutdown_request) {
  326. if (active_requests < concurrency && total_requests < max_requests) {
  327. auto rx_buff = std::make_shared<payload::http_request_t::rx_buff_t>();
  328. request<payload::http_request_t>(manager_addr, url, std::move(rx_buff), RX_BUFF_SZ, timeout)
  329. .send(timeout);
  330. ++active_requests;
  331. ++total_requests;
  332. }
  333. }
  334. }
  335. void on_start(r::message::start_trigger_t &msg) noexcept override {
  336. r::actor_base_t::on_start(msg);
  337. start = std::chrono::high_resolution_clock::now();
  338. for (std::size_t i = 0; i < concurrency; ++i) {
  339. make_request();
  340. }
  341. }
  342. void on_response(message::http_response_t &msg) noexcept {
  343. --active_requests;
  344. bool err = false;
  345. if (!msg.payload.ec) {
  346. auto &res = msg.payload.res->response;
  347. if (res.result() == http::status::ok) {
  348. // std::cerr << "." << std::flush;
  349. ++success_requests;
  350. } else {
  351. std::cerr << "http error: " << res.result_int() << "\n";
  352. err = true;
  353. }
  354. } else {
  355. std::cerr << "request error: " << msg.payload.ec.message() << "\n";
  356. err = true;
  357. }
  358. if (err) {
  359. ++http_errors;
  360. }
  361. if (http_errors < MAX_HTTP_FAILURES) {
  362. make_request();
  363. }
  364. if (active_requests == 0) {
  365. std::cerr << "starting shutdown\n";
  366. do_shutdown();
  367. }
  368. }
  369. std::size_t poll_attempts = 0;
  370. std::size_t http_errors = 0;
  371. std::size_t active_requests = 0;
  372. std::size_t success_requests = 0;
  373. std::size_t total_requests = 0;
  374. std::size_t max_requests = 0;
  375. r::address_ptr_t manager_addr;
  376. URL url;
  377. r::pt::time_duration timeout;
  378. std::size_t concurrency;
  379. timepoint_t start;
  380. };
  381. int main(int argc, char **argv) {
  382. using url_ptr_t = std::unique_ptr<URL>;
  383. // clang-format off
  384. /* parse command-line & config options */
  385. po::options_description cmdline_descr("Allowed options");
  386. cmdline_descr.add_options()
  387. ("help", "show this help message")
  388. ("url", po::value<std::string>()->default_value("http://www.example.com:80/index.html"), "URL to poll")
  389. ("workers_count", po::value<std::size_t>()->default_value(10), "concurrency (number of http workers)")
  390. ("timeout", po::value<std::size_t>()->default_value(1000), "generic timeout (in milliseconds)")
  391. ("max_requests", po::value<std::size_t>()->default_value(100), "maximum amount of requests before shutting down");
  392. // clang-format on
  393. po::variables_map vm;
  394. po::store(po::parse_command_line(argc, argv, cmdline_descr), vm);
  395. po::notify(vm);
  396. bool show_help = vm.count("help");
  397. if (show_help) {
  398. std::cout << cmdline_descr << "\n";
  399. return 1;
  400. }
  401. url_ptr_t url;
  402. auto url_str = vm["url"].as<std::string>();
  403. std::regex re("(\\w+)://([^/ :]+):(\\d+)(/.+)");
  404. std::smatch what;
  405. if (regex_match(url_str, what, re)) {
  406. auto host = std::string(what[2].first, what[2].second);
  407. auto port = std::string(what[3].first, what[3].second);
  408. auto path = std::string(what[4].first, what[4].second);
  409. url = std::make_unique<URL>(URL{std::move(host), std::move(port), std::move(path)});
  410. } else {
  411. std::cout << "wrong url format. It should be like http://www.example.com:80/index.html"
  412. << "\n";
  413. return 1;
  414. }
  415. auto req_timeout = r::pt::milliseconds{vm["timeout"].as<std::size_t>()};
  416. auto workers_count = vm["workers_count"].as<std::size_t>();
  417. std::cerr << "using " << workers_count << " workers for " << url->host << ":" << url->port << url->path
  418. << ", timeout: " << req_timeout << "\n";
  419. asio::io_context io_context;
  420. auto system_context = ra::system_context_asio_t::ptr_t{new ra::system_context_asio_t(io_context)};
  421. auto strand = std::make_shared<asio::io_context::strand>(io_context);
  422. auto man_timeout = req_timeout + r::pt::milliseconds{workers_count * 2};
  423. ra::supervisor_config_asio_t conf{man_timeout, strand};
  424. auto sup = system_context->create_supervisor<ra::supervisor_asio_t>(conf);
  425. auto worker_timeout = req_timeout * 2;
  426. http_manager_config_t http_conf{workers_count, worker_timeout, worker_timeout, strand};
  427. auto man = sup->create_actor<http_manager_t>(man_timeout, http_conf);
  428. auto client = sup->create_actor<client_t>(worker_timeout);
  429. client->manager_addr = man->get_address();
  430. client->timeout = req_timeout;
  431. client->concurrency = workers_count;
  432. client->url = *url;
  433. client->max_requests = vm["max_requests"].as<std::size_t>();
  434. sup->start();
  435. io_context.run();
  436. return 0;
  437. }