123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- #include <unistd.h>
- #include "err.h"
- #include "polling.h"
- #include "msgr.h"
- static void
- msgr_dispose(void *dat, void *extra)
- {
- Msgr *msgr = dat;
- Disposer *dspr = extra;
- /* If this fails, whatever. */
- if (msgr->fd >= 0)
- close(msgr->fd);
- list_dispose(&msgr->inbox);
- if (msgr->wip) {
- msg_dispose(msgr->wip);
- free(msgr->wip);
- }
- if (dspr)
- dspr->dispose(msgr->extra, dspr->extra);
- free(msgr);
- }
- int
- msgrs_init(Msgrs *msgrs, Disposer *dspr, Err *err)
- {
- return map_init(msgrs, dspr, 0, err);
- }
- void
- msgrs_dispose(Msgrs *msgrs)
- {
- Disposer msgr_dspr = {
- .dispose = msgr_dispose,
- .extra = msgrs->disposer
- };
- msgrs->disposer = &msgr_dspr;
- map_dispose(msgrs);
- }
- static void
- dspr_msg_dispose(void *dat, void *extra)
- {
- (void) extra;
- msg_dispose(dat);
- free(dat);
- }
- static Msgr *
- msgr_new(int fd, void *extra, Err *err)
- {
- static Disposer msg_dspr = {
- .dispose = dspr_msg_dispose,
- .extra = NULL
- };
- Msgr *msgr = malloc(sizeof(*msgr));
- if (!msgr) {
- err_std(err);
- return NULL;
- }
- msgr->fd = fd;
- list_init(&msgr->inbox, &msg_dspr);
- msgr->wip = NULL;
- msgr->extra = extra;
- return msgr;
- }
- Msgr *
- msgr_add(Msgrs *msgrs, uint32_t key_len, const void *key,
- int fd, int epoll_fd, void *extra, Err *err)
- {
- Msgr *msgr = msgr_new(fd, extra, err);
- Err tmp_err;
- if (!msgr)
- return NULL;
- if (map_add(msgrs, key_len, key, msgr, err) < 0) {
- msgr_dispose(msgr, NULL);
- return NULL;
- }
- if (epoll_fd >= 0 && fd >= 0 &&
- poll_add(epoll_fd, fd, err) < 0) {
- map_remove(msgrs, key_len, key, (void **) msgr, &tmp_err);
- msgr_dispose(msgr, NULL);
- return NULL;
- }
- return msgr;
- }
- int
- msgr_remove(Msgrs *msgrs, uint32_t key_len, const void *key,
- void **extra, Err *err)
- {
- Msgr *msgr;
- if (map_remove(msgrs, key_len, key, (void **) &msgr, err) < 0)
- return -1;
- if (extra)
- *extra = msgr->extra;
- msgr_dispose(msgr, NULL);
- return 0;
- }
- Msgr *
- msgr_get(Msgrs *msgrs, uint32_t key_len, const void *key)
- {
- Msgr *msgr;
- if (!map_get(msgrs, key_len, key, (void **) &msgr))
- return NULL;
- return msgr;
- }
- int
- msgr_recv(Msgr *msgr, uint32_t max_len, int wait, Err *err)
- {
- if (!msgr->wip) {
- if (!(msgr->wip = malloc(sizeof(*msgr->wip)))) {
- err_std(err);
- return -1;
- }
- msg_recv_init(msgr->wip);
- }
- return msg_recv(msgr->fd, msgr->wip, max_len, wait, err);
- }
- int
- msgr_post(Msgr *msgr, Msg *msg, Err *err)
- {
- return list_add(&msgr->inbox, msgr->inbox.tail, msg, err);
- }
- int
- msgr_send(Msgr *msgr, int epoll_fd, int wait, Err *err)
- {
- Msg *msg;
- if (!msgr->inbox.len)
- return 0;
- msg = *(Msg **) list_peek(&msgr->inbox);
- if (msg_send(msgr->fd, msg, wait, err) < 0)
- return -1;
- if (!msg_tnsfd(msg)) {
- poll_out(epoll_fd, msgr->fd, err);
- return 0;
- }
- /* If this fails, it's no big deal. This msgr_send() will probably get
- called again and when it is, only this function will be called */
- if (poll_out_off(epoll_fd, msgr->fd, err) < 0)
- return 0;
- list_remove(&msgr->inbox, NULL, (void **) &msg, err);
- msg_dispose(msg);
- free(msg);
- return 0;
- }
|