msg_server.c 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. /* This file is part of libepistle.
  2. *
  3. * libepistle is free software: you can redistribute it and/or modify
  4. * it under the terms of the GNU Lesser General Public License as published by
  5. * the Free Software Foundation, either version 3 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * libepistle is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Lesser General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Lesser General Public License
  14. * along with libepistle. If not, see <http://www.gnu.org/licenses/>.
  15. */
  16. #include <unistd.h>
  17. #include <sys/epoll.h>
  18. #include "err.h"
  19. #include "polling.h"
  20. #include "msg_server.h"
  21. struct tmp_extra {
  22. Msg_server *server;
  23. Add_msgr add;
  24. Remove_msgr remove;
  25. React_msg react;
  26. void *extra;
  27. };
  28. int
  29. msg_server_init(Msg_server *server, int epoll_fd, uint32_t max_len,
  30. Disposer *extra_dspr, Err *err)
  31. {
  32. server->max_len = max_len;
  33. if (msgrs_init(&server->msgrs, extra_dspr, err) < 0)
  34. return -1;
  35. if (epoll_fd < 0 && (epoll_fd = epoll_create1(0)) < 0) {
  36. err_std(err);
  37. map_dispose(&server->msgrs);
  38. return -1;
  39. }
  40. server->epoll_fd = epoll_fd;
  41. return 0;
  42. }
  43. void
  44. msg_server_dispose(Msg_server *server)
  45. {
  46. close(server->epoll_fd);
  47. map_dispose(&server->msgrs);
  48. }
  49. int
  50. msg_server_on(Msg_server *server, int socket, Err *err)
  51. {
  52. return poll_add(server->epoll_fd, socket, err);
  53. }
  54. int
  55. msg_server_off(Msg_server *server, int socket, Err *err)
  56. {
  57. return poll_remove(server->epoll_fd, socket, err);
  58. }
  59. static int
  60. s_check(int fd, void *extra)
  61. {
  62. struct tmp_extra *tmp_extra = extra;
  63. /* If msgrs doesn't contains the fd it is a server */
  64. return !set_contains(&tmp_extra->server->msgrs, sizeof(fd), &fd);
  65. }
  66. static int
  67. msg_react(int epoll_fd, int fd, int server_fd, int pm_event,
  68. Err *err, void *extra)
  69. {
  70. struct tmp_extra *tmp_extra = extra;
  71. Msg_server *server = tmp_extra->server;
  72. Msgr *msgr;
  73. (void) epoll_fd;
  74. switch (pm_event) {
  75. case PM_ADD:
  76. return tmp_extra->add(server, fd, server_fd,
  77. err, tmp_extra->extra);
  78. case PM_REMOVE:
  79. return tmp_extra->remove(server, fd, err,
  80. tmp_extra->extra);
  81. case PM_INPUT:
  82. msgr = msgr_get(&server->msgrs, sizeof(fd), &fd);
  83. /* we haven't yet dealt with previous message! */
  84. if (msgr->wip && msg_tnsfd(msgr->wip))
  85. return tmp_extra->react(server, msgr, err,
  86. tmp_extra->extra);
  87. if (msgr_recv(msgr, server->max_len, 0, err) < 0)
  88. return tmp_extra->remove(server, fd, err,
  89. tmp_extra->extra);
  90. if (msgr->wip && msg_tnsfd(msgr->wip))
  91. return tmp_extra->react(server, msgr, err,
  92. tmp_extra->extra);
  93. return POLLING_OKAY;
  94. case PM_OUTPUT:
  95. msgr = msgr_get(&server->msgrs, sizeof(fd), &fd);
  96. if (msgr_send(msgr, server->epoll_fd, 0, err) < 0)
  97. return tmp_extra->remove(server, fd, err,
  98. tmp_extra->extra);
  99. return POLLING_OKAY;
  100. }
  101. err_epistle(err, EPISTLE_ERR_TYPE);
  102. return POLLING_FATAL;
  103. }
  104. Msgr *
  105. msg_server_add(Msg_server *server, int fd, void *extra, Err *err)
  106. {
  107. return msgr_add(&server->msgrs, sizeof(fd),
  108. &fd, fd, server->epoll_fd, extra, err);
  109. }
  110. int
  111. msg_server_remove(Msg_server *server, int fd, void **extra, Err *err)
  112. {
  113. return msgr_remove(&server->msgrs, sizeof(fd), &fd, extra, err);
  114. }
  115. Msgr *
  116. msg_server_get(Msg_server *server, int fd)
  117. {
  118. return msgr_get(&server->msgrs, sizeof(fd), &fd);
  119. }
  120. int
  121. msg_server(Msg_server *server, Add_msgr add, Remove_msgr remove,
  122. React_msg react, Err *err, void *extra)
  123. {
  124. struct tmp_extra tmp_extra = {
  125. .server = server, .add = add, .remove = remove,
  126. .react = react, .extra = extra
  127. };
  128. return polling_mode(msg_react, &server->epoll_fd,
  129. NULL, s_check, err, &tmp_extra);
  130. }