msgr.c 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #include <unistd.h>
  2. #include "err.h"
  3. #include "polling.h"
  4. #include "msgr.h"
  5. static void
  6. msgr_dispose(void *dat, void *extra)
  7. {
  8. Msgr *msgr = dat;
  9. Disposer *dspr = extra;
  10. /* If this fails, whatever. */
  11. if (msgr->fd >= 0)
  12. close(msgr->fd);
  13. list_dispose(&msgr->inbox);
  14. if (msgr->wip) {
  15. msg_dispose(msgr->wip);
  16. free(msgr->wip);
  17. }
  18. if (dspr)
  19. dspr->dispose(msgr->extra, dspr->extra);
  20. free(msgr);
  21. }
  22. int
  23. msgrs_init(Msgrs *msgrs, Disposer *dspr, Err *err)
  24. {
  25. return map_init(msgrs, dspr, 0, err);
  26. }
  27. void
  28. msgrs_dispose(Msgrs *msgrs)
  29. {
  30. Disposer msgr_dspr = {
  31. .dispose = msgr_dispose,
  32. .extra = msgrs->disposer
  33. };
  34. msgrs->disposer = &msgr_dspr;
  35. map_dispose(msgrs);
  36. }
  37. static void
  38. dspr_msg_dispose(void *dat, void *extra)
  39. {
  40. (void) extra;
  41. msg_dispose(dat);
  42. free(dat);
  43. }
  44. static Msgr *
  45. msgr_new(int fd, void *extra, Err *err)
  46. {
  47. static Disposer msg_dspr = {
  48. .dispose = dspr_msg_dispose,
  49. .extra = NULL
  50. };
  51. Msgr *msgr = malloc(sizeof(*msgr));
  52. if (!msgr) {
  53. err_std(err);
  54. return NULL;
  55. }
  56. msgr->fd = fd;
  57. list_init(&msgr->inbox, &msg_dspr);
  58. msgr->wip = NULL;
  59. msgr->extra = extra;
  60. return msgr;
  61. }
  62. Msgr *
  63. msgr_add(Msgrs *msgrs, uint32_t key_len, const void *key,
  64. int fd, int epoll_fd, void *extra, Err *err)
  65. {
  66. Msgr *msgr = msgr_new(fd, extra, err);
  67. Err tmp_err;
  68. if (!msgr)
  69. return NULL;
  70. if (map_add(msgrs, key_len, key, msgr, err) < 0) {
  71. msgr_dispose(msgr, NULL);
  72. return NULL;
  73. }
  74. if (epoll_fd >= 0 && fd >= 0 &&
  75. poll_add(epoll_fd, fd, err) < 0) {
  76. map_remove(msgrs, key_len, key, (void **) msgr, &tmp_err);
  77. msgr_dispose(msgr, NULL);
  78. return NULL;
  79. }
  80. return msgr;
  81. }
  82. int
  83. msgr_remove(Msgrs *msgrs, uint32_t key_len, const void *key,
  84. void **extra, Err *err)
  85. {
  86. Msgr *msgr;
  87. if (map_remove(msgrs, key_len, key, (void **) &msgr, err) < 0)
  88. return -1;
  89. if (extra)
  90. *extra = msgr->extra;
  91. msgr_dispose(msgr, NULL);
  92. return 0;
  93. }
  94. Msgr *
  95. msgr_get(Msgrs *msgrs, uint32_t key_len, const void *key)
  96. {
  97. Msgr *msgr;
  98. if (!map_get(msgrs, key_len, key, (void **) &msgr))
  99. return NULL;
  100. return msgr;
  101. }
  102. int
  103. msgr_recv(Msgr *msgr, uint32_t max_len, int wait, Err *err)
  104. {
  105. if (!msgr->wip) {
  106. if (!(msgr->wip = malloc(sizeof(*msgr->wip)))) {
  107. err_std(err);
  108. return -1;
  109. }
  110. msg_recv_init(msgr->wip);
  111. }
  112. return msg_recv(msgr->fd, msgr->wip, max_len, wait, err);
  113. }
  114. int
  115. msgr_post(Msgr *msgr, Msg *msg, Err *err)
  116. {
  117. return list_add(&msgr->inbox, msgr->inbox.tail, msg, err);
  118. }
  119. int
  120. msgr_send(Msgr *msgr, int epoll_fd, int wait, Err *err)
  121. {
  122. Msg *msg;
  123. if (!msgr->inbox.len)
  124. return 0;
  125. msg = *(Msg **) list_peek(&msgr->inbox);
  126. if (msg_send(msgr->fd, msg, wait, err) < 0)
  127. return -1;
  128. if (!msg_tnsfd(msg)) {
  129. poll_out(epoll_fd, msgr->fd, err);
  130. return 0;
  131. }
  132. /* If this fails, it's no big deal. This msgr_send() will probably get
  133. called again and when it is, only this function will be called */
  134. if (poll_out_off(epoll_fd, msgr->fd, err) < 0)
  135. return 0;
  136. list_remove(&msgr->inbox, NULL, (void **) &msg, err);
  137. msg_dispose(msg);
  138. free(msg);
  139. return 0;
  140. }