123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920 |
- #include "rsync.h"
- #include "ifuncs.h"
- #define SELECT_TIMEOUT 60
- extern int bwlimit;
- extern size_t bwlimit_writemax;
- extern int io_timeout;
- extern int am_server;
- extern int am_daemon;
- extern int am_sender;
- extern int am_generator;
- extern int inc_recurse;
- extern int io_error;
- extern int eol_nulls;
- extern int flist_eof;
- extern int list_only;
- extern int read_batch;
- extern int compat_flags;
- extern int protect_args;
- extern int checksum_seed;
- extern int protocol_version;
- extern int remove_source_files;
- extern int preserve_hard_links;
- extern struct stats stats;
- extern struct file_list *cur_flist;
- #ifdef ICONV_OPTION
- extern int filesfrom_convert;
- extern iconv_t ic_send, ic_recv;
- #endif
- int csum_length = SHORT_SUM_LENGTH;
- int allowed_lull = 0;
- int ignore_timeout = 0;
- int batch_fd = -1;
- int msgdone_cnt = 0;
- int kluge_around_eof = 0;
- int msg_fd_in = -1;
- int msg_fd_out = -1;
- int sock_f_in = -1;
- int sock_f_out = -1;
- static int iobuf_f_in = -1;
- static char *iobuf_in;
- static size_t iobuf_in_siz;
- static size_t iobuf_in_ndx;
- static size_t iobuf_in_remaining;
- static int iobuf_f_out = -1;
- static char *iobuf_out;
- static int iobuf_out_cnt;
- int flist_forward_from = -1;
- static int io_multiplexing_out;
- static int io_multiplexing_in;
- static time_t last_io_in;
- static time_t last_io_out;
- static int no_flush;
- static int write_batch_monitor_in = -1;
- static int write_batch_monitor_out = -1;
- static int io_filesfrom_f_in = -1;
- static int io_filesfrom_f_out = -1;
- static xbuf ff_buf = EMPTY_XBUF;
- static char ff_lastchar;
- #ifdef ICONV_OPTION
- static xbuf iconv_buf = EMPTY_XBUF;
- #endif
- static int defer_forwarding_messages = 0, keep_defer_forwarding = 0;
- static int select_timeout = SELECT_TIMEOUT;
- static int active_filecnt = 0;
- static OFF_T active_bytecnt = 0;
- static int first_message = 1;
- static char int_byte_extra[64] = {
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
- 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6,
- };
- #define REMOTE_OPTION_ERROR "rsync: on remote machine: -"
- #define REMOTE_OPTION_ERROR2 ": unknown option"
- enum festatus { FES_SUCCESS, FES_REDO, FES_NO_SEND };
- static void check_timeout(void)
- {
- time_t t, chk;
- if (!io_timeout || ignore_timeout)
- return;
- t = time(NULL);
- if (!last_io_in)
- last_io_in = t;
- chk = MAX(last_io_out, last_io_in);
- if (t - chk >= io_timeout) {
- if (am_server || am_daemon)
- exit_cleanup(RERR_TIMEOUT);
- rprintf(FERROR, "[%s] io timeout after %d seconds -- exiting\n",
- who_am_i(), (int)(t-chk));
- exit_cleanup(RERR_TIMEOUT);
- }
- }
- static void readfd(int fd, char *buffer, size_t N);
- static void writefd(int fd, const char *buf, size_t len);
- static void writefd_unbuffered(int fd, const char *buf, size_t len);
- static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert);
- static flist_ndx_list redo_list, hlink_list;
- struct msg_list_item {
- struct msg_list_item *next;
- char convert;
- char buf[1];
- };
- struct msg_list {
- struct msg_list_item *head, *tail;
- };
- static struct msg_list msg_queue;
- static void got_flist_entry_status(enum festatus status, const char *buf)
- {
- int ndx = IVAL(buf, 0);
- struct file_list *flist = flist_for_ndx(ndx, "got_flist_entry_status");
- if (remove_source_files) {
- active_filecnt--;
- active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
- }
- if (inc_recurse)
- flist->in_progress--;
- switch (status) {
- case FES_SUCCESS:
- if (remove_source_files)
- send_msg(MSG_SUCCESS, buf, 4, 0);
-
- case FES_NO_SEND:
- #ifdef SUPPORT_HARD_LINKS
- if (preserve_hard_links) {
- struct file_struct *file = flist->files[ndx - flist->ndx_start];
- if (F_IS_HLINKED(file)) {
- if (status == FES_NO_SEND)
- flist_ndx_push(&hlink_list, -2);
- flist_ndx_push(&hlink_list, ndx);
- flist->in_progress++;
- }
- }
- #endif
- break;
- case FES_REDO:
- if (read_batch) {
- if (inc_recurse)
- flist->in_progress++;
- break;
- }
- if (inc_recurse)
- flist->to_redo++;
- flist_ndx_push(&redo_list, ndx);
- break;
- }
- }
- void io_set_sock_fds(int f_in, int f_out)
- {
- sock_f_in = f_in;
- sock_f_out = f_out;
- }
- void set_io_timeout(int secs)
- {
- io_timeout = secs;
- allowed_lull = (io_timeout + 1) / 2;
- if (!io_timeout || allowed_lull > SELECT_TIMEOUT)
- select_timeout = SELECT_TIMEOUT;
- else
- select_timeout = allowed_lull;
- if (read_batch)
- allowed_lull = 0;
- }
- void set_msg_fd_in(int fd)
- {
- msg_fd_in = fd;
- }
- void set_msg_fd_out(int fd)
- {
- msg_fd_out = fd;
- set_nonblocking(msg_fd_out);
- }
- static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len, int convert)
- {
- struct msg_list_item *m;
- int sz = len + 4 + sizeof m[0] - 1;
- if (!(m = (struct msg_list_item *)new_array(char, sz)))
- out_of_memory("msg_list_add");
- m->next = NULL;
- m->convert = convert;
- SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
- memcpy(m->buf + 4, buf, len);
- if (lst->tail)
- lst->tail->next = m;
- else
- lst->head = m;
- lst->tail = m;
- }
- static inline int flush_a_msg(int fd)
- {
- struct msg_list_item *m = msg_queue.head;
- int len = IVAL(m->buf, 0) & 0xFFFFFF;
- int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
- if (!(msg_queue.head = m->next))
- msg_queue.tail = NULL;
- defer_forwarding_messages++;
- mplex_write(fd, tag, m->buf + 4, len, m->convert);
- defer_forwarding_messages--;
- free(m);
- return len;
- }
- static void msg_flush(void)
- {
- if (am_generator) {
- while (msg_queue.head && io_multiplexing_out)
- stats.total_written += flush_a_msg(sock_f_out) + 4;
- } else {
- while (msg_queue.head)
- (void)flush_a_msg(msg_fd_out);
- }
- }
- static void check_for_d_option_error(const char *msg)
- {
- static char rsync263_opts[] = "BCDHIKLPRSTWabceghlnopqrtuvxz";
- char *colon;
- int saw_d = 0;
- if (*msg != 'r'
- || strncmp(msg, REMOTE_OPTION_ERROR, sizeof REMOTE_OPTION_ERROR - 1) != 0)
- return;
- msg += sizeof REMOTE_OPTION_ERROR - 1;
- if (*msg == '-' || (colon = strchr(msg, ':')) == NULL
- || strncmp(colon, REMOTE_OPTION_ERROR2, sizeof REMOTE_OPTION_ERROR2 - 1) != 0)
- return;
- for ( ; *msg != ':'; msg++) {
- if (*msg == 'd')
- saw_d = 1;
- else if (*msg == 'e')
- break;
- else if (strchr(rsync263_opts, *msg) == NULL)
- return;
- }
- if (saw_d) {
- rprintf(FWARNING,
- "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n");
- }
- }
- static void read_msg_fd(void)
- {
- char buf[2048];
- size_t n;
- struct file_list *flist;
- int fd = msg_fd_in;
- int tag, len;
-
- no_flush++;
- msg_fd_in = -1;
- defer_forwarding_messages++;
- readfd(fd, buf, 4);
- tag = IVAL(buf, 0);
- len = tag & 0xFFFFFF;
- tag = (tag >> 24) - MPLEX_BASE;
- switch (tag) {
- case MSG_DONE:
- if (len < 0 || len > 1 || !am_generator) {
- invalid_msg:
- rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
- tag, len, who_am_i(),
- inc_recurse ? "/inc" : "");
- exit_cleanup(RERR_STREAMIO);
- }
- if (len) {
- readfd(fd, buf, len);
- stats.total_read = read_varlong(fd, 3);
- }
- msgdone_cnt++;
- break;
- case MSG_REDO:
- if (len != 4 || !am_generator)
- goto invalid_msg;
- readfd(fd, buf, 4);
- got_flist_entry_status(FES_REDO, buf);
- break;
- case MSG_FLIST:
- if (len != 4 || !am_generator || !inc_recurse)
- goto invalid_msg;
- readfd(fd, buf, 4);
-
- assert(iobuf_in != NULL);
- assert(iobuf_f_in == fd);
- if (verbose > 3) {
- rprintf(FINFO, "[%s] receiving flist for dir %d\n",
- who_am_i(), IVAL(buf,0));
- }
- flist = recv_file_list(fd);
- flist->parent_ndx = IVAL(buf,0);
- #ifdef SUPPORT_HARD_LINKS
- if (preserve_hard_links)
- match_hard_links(flist);
- #endif
- break;
- case MSG_FLIST_EOF:
- if (len != 0 || !am_generator || !inc_recurse)
- goto invalid_msg;
- flist_eof = 1;
- break;
- case MSG_IO_ERROR:
- if (len != 4)
- goto invalid_msg;
- readfd(fd, buf, len);
- io_error |= IVAL(buf, 0);
- break;
- case MSG_DELETED:
- if (len >= (int)sizeof buf || !am_generator)
- goto invalid_msg;
- readfd(fd, buf, len);
- send_msg(MSG_DELETED, buf, len, 1);
- break;
- case MSG_SUCCESS:
- if (len != 4 || !am_generator)
- goto invalid_msg;
- readfd(fd, buf, 4);
- got_flist_entry_status(FES_SUCCESS, buf);
- break;
- case MSG_NO_SEND:
- if (len != 4 || !am_generator)
- goto invalid_msg;
- readfd(fd, buf, 4);
- got_flist_entry_status(FES_NO_SEND, buf);
- break;
- case MSG_ERROR_SOCKET:
- case MSG_ERROR_UTF8:
- case MSG_CLIENT:
- if (!am_generator)
- goto invalid_msg;
- if (tag == MSG_ERROR_SOCKET)
- io_end_multiplex_out();
-
- case MSG_INFO:
- case MSG_ERROR:
- case MSG_ERROR_XFER:
- case MSG_WARNING:
- case MSG_LOG:
- while (len) {
- n = len;
- if (n >= sizeof buf)
- n = sizeof buf - 1;
- readfd(fd, buf, n);
- rwrite((enum logcode)tag, buf, n, !am_generator);
- len -= n;
- }
- break;
- default:
- rprintf(FERROR, "unknown message %d:%d [%s]\n",
- tag, len, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
- no_flush--;
- msg_fd_in = fd;
- if (!--defer_forwarding_messages && !no_flush)
- msg_flush();
- }
- void increment_active_files(int ndx, int itemizing, enum logcode code)
- {
- while (1) {
-
- int limit = active_bytecnt >= 128*1024 ? 10 : 50;
- if (active_filecnt < limit)
- break;
- check_for_finished_files(itemizing, code, 0);
- if (active_filecnt < limit)
- break;
- if (iobuf_out_cnt)
- io_flush(NORMAL_FLUSH);
- else
- read_msg_fd();
- }
- active_filecnt++;
- active_bytecnt += F_LENGTH(cur_flist->files[ndx - cur_flist->ndx_start]);
- }
- static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert)
- {
- char buffer[BIGPATHBUFLEN];
- size_t n = len;
- #ifdef ICONV_OPTION
-
- if (convert && ic_send != (iconv_t)-1) {
- xbuf outbuf, inbuf;
- INIT_XBUF(outbuf, buffer + 4, 0, sizeof buffer - 4);
- INIT_XBUF(inbuf, (char*)buf, len, -1);
- iconvbufs(ic_send, &inbuf, &outbuf,
- ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE);
- if (inbuf.len > 0) {
- rprintf(FERROR, "overflowed conversion buffer in mplex_write");
- exit_cleanup(RERR_UNSUPPORTED);
- }
- n = len = outbuf.len;
- } else
- #endif
- if (n > 1024 - 4)
- n = 0;
- else
- memcpy(buffer + 4, buf, n);
- SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
- keep_defer_forwarding++;
- writefd_unbuffered(fd, buffer, n+4);
- keep_defer_forwarding--;
- if (len > n)
- writefd_unbuffered(fd, buf+n, len-n);
- if (!--defer_forwarding_messages && !no_flush)
- msg_flush();
- }
- int send_msg(enum msgcode code, const char *buf, int len, int convert)
- {
- if (msg_fd_out < 0) {
- if (!defer_forwarding_messages)
- return io_multiplex_write(code, buf, len, convert);
- if (!io_multiplexing_out)
- return 0;
- msg_list_add(&msg_queue, code, buf, len, convert);
- return 1;
- }
- if (flist_forward_from >= 0)
- msg_list_add(&msg_queue, code, buf, len, convert);
- else
- mplex_write(msg_fd_out, code, buf, len, convert);
- return 1;
- }
- void send_msg_int(enum msgcode code, int num)
- {
- char numbuf[4];
- SIVAL(numbuf, 0, num);
- send_msg(code, numbuf, 4, 0);
- }
- void wait_for_receiver(void)
- {
- if (io_flush(NORMAL_FLUSH))
- return;
- read_msg_fd();
- }
- int get_redo_num(void)
- {
- return flist_ndx_pop(&redo_list);
- }
- int get_hlink_num(void)
- {
- return flist_ndx_pop(&hlink_list);
- }
- void io_set_filesfrom_fds(int f_in, int f_out)
- {
- io_filesfrom_f_in = f_in;
- io_filesfrom_f_out = f_out;
- alloc_xbuf(&ff_buf, 2048);
- #ifdef ICONV_OPTION
- if (protect_args)
- alloc_xbuf(&iconv_buf, 1024);
- #endif
- }
- static void whine_about_eof(int fd)
- {
- if (kluge_around_eof && fd == sock_f_in) {
- int i;
- if (kluge_around_eof > 0)
- exit_cleanup(0);
-
- for (i = 10*1000/20; i--; )
- msleep(20);
- }
- rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
- "(%.0f bytes received so far) [%s]\n",
- (double)stats.total_read, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
- static int read_timeout(int fd, char *buf, size_t len)
- {
- int n, cnt = 0;
- io_flush(FULL_FLUSH);
- while (cnt == 0) {
-
- fd_set r_fds, w_fds;
- struct timeval tv;
- int maxfd = fd;
- int count;
- FD_ZERO(&r_fds);
- FD_ZERO(&w_fds);
- FD_SET(fd, &r_fds);
- if (io_filesfrom_f_out >= 0) {
- int new_fd;
- if (ff_buf.len == 0) {
- if (io_filesfrom_f_in >= 0) {
- FD_SET(io_filesfrom_f_in, &r_fds);
- new_fd = io_filesfrom_f_in;
- } else {
- io_filesfrom_f_out = -1;
- new_fd = -1;
- }
- } else {
- FD_SET(io_filesfrom_f_out, &w_fds);
- new_fd = io_filesfrom_f_out;
- }
- if (new_fd > maxfd)
- maxfd = new_fd;
- }
- tv.tv_sec = select_timeout;
- tv.tv_usec = 0;
- errno = 0;
- count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
- if (count <= 0) {
- if (errno == EBADF) {
- defer_forwarding_messages = 0;
- exit_cleanup(RERR_SOCKETIO);
- }
- check_timeout();
- continue;
- }
- if (io_filesfrom_f_out >= 0) {
- if (ff_buf.len) {
- if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
- int l = write(io_filesfrom_f_out,
- ff_buf.buf + ff_buf.pos,
- ff_buf.len);
- if (l > 0) {
- if (!(ff_buf.len -= l))
- ff_buf.pos = 0;
- else
- ff_buf.pos += l;
- } else if (errno != EINTR) {
-
- io_filesfrom_f_out = -1;
- }
- }
- } else if (io_filesfrom_f_in >= 0) {
- if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
- #ifdef ICONV_OPTION
- xbuf *ibuf = filesfrom_convert ? &iconv_buf : &ff_buf;
- #else
- xbuf *ibuf = &ff_buf;
- #endif
- int l = read(io_filesfrom_f_in, ibuf->buf, ibuf->size);
- if (l <= 0) {
- if (l == 0 || errno != EINTR) {
-
- memcpy(ff_buf.buf, "\0\0", 2);
- ff_buf.len = ff_lastchar? 2 : 1;
- ff_buf.pos = 0;
- io_filesfrom_f_in = -1;
- }
- } else {
- #ifdef ICONV_OPTION
- if (filesfrom_convert) {
- iconv_buf.pos = 0;
- iconv_buf.len = l;
- iconvbufs(ic_send, &iconv_buf, &ff_buf,
- ICB_EXPAND_OUT|ICB_INCLUDE_BAD|ICB_INCLUDE_INCOMPLETE);
- l = ff_buf.len;
- }
- #endif
- if (!eol_nulls) {
- char *s = ff_buf.buf + l;
-
- while (s-- > ff_buf.buf) {
- if (*s == '\n' || *s == '\r')
- *s = '\0';
- }
- }
- if (!ff_lastchar) {
-
- while (l && ff_buf.buf[ff_buf.pos] == '\0')
- ff_buf.pos++, l--;
- }
- if (!l)
- ff_buf.pos = 0;
- else {
- char *f = ff_buf.buf + ff_buf.pos;
- char *t = f;
- char *eob = f + l;
-
- while (f != eob) {
- if (!(*t++ = *f++)) {
- while (f != eob && !*f)
- f++, l--;
- }
- }
- ff_lastchar = f[-1];
- }
- ff_buf.len = l;
- }
- }
- }
- }
- if (!FD_ISSET(fd, &r_fds))
- continue;
- n = read(fd, buf, len);
- if (n <= 0) {
- if (n == 0)
- whine_about_eof(fd);
- if (errno == EINTR || errno == EWOULDBLOCK
- || errno == EAGAIN)
- continue;
-
- if (fd == sock_f_in) {
- io_end_multiplex_out();
- rsyserr(FERROR_SOCKET, errno, "read error");
- } else
- rsyserr(FERROR, errno, "read error");
- exit_cleanup(RERR_STREAMIO);
- }
- buf += n;
- len -= n;
- cnt += n;
- if (fd == sock_f_in && io_timeout)
- last_io_in = time(NULL);
- }
- return cnt;
- }
- int read_line(int fd, char *buf, size_t bufsiz, int flags)
- {
- char ch, *s, *eob;
- int cnt;
- #ifdef ICONV_OPTION
- if (flags & RL_CONVERT && iconv_buf.size < bufsiz)
- realloc_xbuf(&iconv_buf, bufsiz + 1024);
- #endif
- start:
- #ifdef ICONV_OPTION
- s = flags & RL_CONVERT ? iconv_buf.buf : buf;
- #else
- s = buf;
- #endif
- eob = s + bufsiz - 1;
- while (1) {
- cnt = read(fd, &ch, 1);
- if (cnt < 0 && (errno == EWOULDBLOCK
- || errno == EINTR || errno == EAGAIN)) {
- struct timeval tv;
- fd_set r_fds, e_fds;
- FD_ZERO(&r_fds);
- FD_SET(fd, &r_fds);
- FD_ZERO(&e_fds);
- FD_SET(fd, &e_fds);
- tv.tv_sec = select_timeout;
- tv.tv_usec = 0;
- if (!select(fd+1, &r_fds, NULL, &e_fds, &tv))
- check_timeout();
-
- continue;
- }
- if (cnt != 1)
- break;
- if (flags & RL_EOL_NULLS ? ch == '\0' : (ch == '\r' || ch == '\n')) {
-
- if (flags & RL_DUMP_COMMENTS && s == buf)
- continue;
- break;
- }
- if (s < eob)
- *s++ = ch;
- }
- *s = '\0';
- if (flags & RL_DUMP_COMMENTS && (*buf == '#' || *buf == ';'))
- goto start;
- #ifdef ICONV_OPTION
- if (flags & RL_CONVERT) {
- xbuf outbuf;
- INIT_XBUF(outbuf, buf, 0, bufsiz);
- iconv_buf.pos = 0;
- iconv_buf.len = s - iconv_buf.buf;
- iconvbufs(ic_recv, &iconv_buf, &outbuf,
- ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE);
- outbuf.buf[outbuf.len] = '\0';
- return outbuf.len;
- }
- #endif
- return s - buf;
- }
- void read_args(int f_in, char *mod_name, char *buf, size_t bufsiz, int rl_nulls,
- char ***argv_p, int *argc_p, char **request_p)
- {
- int maxargs = MAX_ARGS;
- int dot_pos = 0;
- int argc = 0;
- char **argv, *p;
- int rl_flags = (rl_nulls ? RL_EOL_NULLS : 0);
- #ifdef ICONV_OPTION
- rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0);
- #endif
- if (!(argv = new_array(char *, maxargs)))
- out_of_memory("read_args");
- if (mod_name && !protect_args)
- argv[argc++] = "rsyncd";
- while (1) {
- if (read_line(f_in, buf, bufsiz, rl_flags) == 0)
- break;
- if (argc == maxargs-1) {
- maxargs += MAX_ARGS;
- if (!(argv = realloc_array(argv, char *, maxargs)))
- out_of_memory("read_args");
- }
- if (dot_pos) {
- if (request_p) {
- *request_p = strdup(buf);
- request_p = NULL;
- }
- if (mod_name)
- glob_expand_module(mod_name, buf, &argv, &argc, &maxargs);
- else
- glob_expand(buf, &argv, &argc, &maxargs);
- } else {
- if (!(p = strdup(buf)))
- out_of_memory("read_args");
- argv[argc++] = p;
- if (*p == '.' && p[1] == '\0')
- dot_pos = argc;
- }
- }
- argv[argc] = NULL;
- glob_expand(NULL, NULL, NULL, NULL);
- *argc_p = argc;
- *argv_p = argv;
- }
- int io_start_buffering_out(int f_out)
- {
- if (iobuf_out) {
- assert(f_out == iobuf_f_out);
- return 0;
- }
- if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
- out_of_memory("io_start_buffering_out");
- iobuf_out_cnt = 0;
- iobuf_f_out = f_out;
- return 1;
- }
- int io_start_buffering_in(int f_in)
- {
- if (iobuf_in) {
- assert(f_in == iobuf_f_in);
- return 0;
- }
- iobuf_in_siz = 2 * IO_BUFFER_SIZE;
- if (!(iobuf_in = new_array(char, iobuf_in_siz)))
- out_of_memory("io_start_buffering_in");
- iobuf_f_in = f_in;
- return 1;
- }
- void io_end_buffering_in(void)
- {
- if (!iobuf_in)
- return;
- free(iobuf_in);
- iobuf_in = NULL;
- iobuf_in_ndx = 0;
- iobuf_in_remaining = 0;
- iobuf_f_in = -1;
- }
- void io_end_buffering_out(void)
- {
- if (!iobuf_out)
- return;
- io_flush(FULL_FLUSH);
- free(iobuf_out);
- iobuf_out = NULL;
- iobuf_f_out = -1;
- }
- void maybe_flush_socket(int important)
- {
- if (iobuf_out && iobuf_out_cnt
- && (important || time(NULL) - last_io_out >= 5))
- io_flush(NORMAL_FLUSH);
- }
- void maybe_send_keepalive(void)
- {
- if (time(NULL) - last_io_out >= allowed_lull) {
- if (!iobuf_out || !iobuf_out_cnt) {
- if (protocol_version < 29)
- send_msg(MSG_DATA, "", 0, 0);
- else if (protocol_version >= 30)
- send_msg(MSG_NOOP, "", 0, 0);
- else {
- write_int(sock_f_out, cur_flist->used);
- write_shortint(sock_f_out, ITEM_IS_NEW);
- }
- }
- if (iobuf_out)
- io_flush(NORMAL_FLUSH);
- }
- }
- void start_flist_forward(int f_in)
- {
- assert(iobuf_out != NULL);
- assert(iobuf_f_out == msg_fd_out);
- flist_forward_from = f_in;
- defer_forwarding_messages++;
- }
- void stop_flist_forward(void)
- {
- flist_forward_from = -1;
- defer_forwarding_messages--;
- io_flush(FULL_FLUSH);
- }
- static void read_loop(int fd, char *buf, size_t len)
- {
- while (len) {
- int n = read_timeout(fd, buf, len);
- buf += n;
- len -= n;
- }
- }
- static int readfd_unbuffered(int fd, char *buf, size_t len)
- {
- size_t msg_bytes;
- int tag, cnt = 0;
- char line[BIGPATHBUFLEN];
- if (!iobuf_in || fd != iobuf_f_in)
- return read_timeout(fd, buf, len);
- if (!io_multiplexing_in && iobuf_in_remaining == 0) {
- iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
- iobuf_in_ndx = 0;
- }
- while (cnt == 0) {
- if (iobuf_in_remaining) {
- len = MIN(len, iobuf_in_remaining);
- memcpy(buf, iobuf_in + iobuf_in_ndx, len);
- iobuf_in_ndx += len;
- iobuf_in_remaining -= len;
- cnt = len;
- break;
- }
- read_loop(fd, line, 4);
- tag = IVAL(line, 0);
- msg_bytes = tag & 0xFFFFFF;
- tag = (tag >> 24) - MPLEX_BASE;
- switch (tag) {
- case MSG_DATA:
- if (msg_bytes > iobuf_in_siz) {
- if (!(iobuf_in = realloc_array(iobuf_in, char,
- msg_bytes)))
- out_of_memory("readfd_unbuffered");
- iobuf_in_siz = msg_bytes;
- }
- read_loop(fd, iobuf_in, msg_bytes);
- iobuf_in_remaining = msg_bytes;
- iobuf_in_ndx = 0;
- break;
- case MSG_NOOP:
- if (msg_bytes != 0)
- goto invalid_msg;
- if (am_sender)
- maybe_send_keepalive();
- break;
- case MSG_IO_ERROR:
- if (msg_bytes != 4)
- goto invalid_msg;
- read_loop(fd, line, msg_bytes);
- send_msg_int(MSG_IO_ERROR, IVAL(line, 0));
- io_error |= IVAL(line, 0);
- break;
- case MSG_DELETED:
- if (msg_bytes >= sizeof line)
- goto overflow;
- #ifdef ICONV_OPTION
- if (ic_recv != (iconv_t)-1) {
- xbuf outbuf, inbuf;
- char ibuf[512];
- int add_null = 0;
- INIT_CONST_XBUF(outbuf, line);
- INIT_XBUF(inbuf, ibuf, 0, -1);
- while (msg_bytes) {
- inbuf.len = msg_bytes > sizeof ibuf
- ? sizeof ibuf : msg_bytes;
- read_loop(fd, inbuf.buf, inbuf.len);
- if (!(msg_bytes -= inbuf.len)
- && !ibuf[inbuf.len-1])
- inbuf.len--, add_null = 1;
- if (iconvbufs(ic_send, &inbuf, &outbuf,
- ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0)
- goto overflow;
- }
- if (add_null) {
- if (outbuf.len == outbuf.size)
- goto overflow;
- outbuf.buf[outbuf.len++] = '\0';
- }
- msg_bytes = outbuf.len;
- } else
- #endif
- read_loop(fd, line, msg_bytes);
-
- if (msg_bytes > 0 && !line[msg_bytes-1])
- log_delete(line, S_IFDIR);
- else {
- line[msg_bytes] = '\0';
- log_delete(line, S_IFREG);
- }
- break;
- case MSG_SUCCESS:
- if (msg_bytes != 4) {
- invalid_msg:
- rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
- tag, (long)msg_bytes, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
- read_loop(fd, line, msg_bytes);
- successful_send(IVAL(line, 0));
- break;
- case MSG_NO_SEND:
- if (msg_bytes != 4)
- goto invalid_msg;
- read_loop(fd, line, msg_bytes);
- send_msg_int(MSG_NO_SEND, IVAL(line, 0));
- break;
- case MSG_INFO:
- case MSG_ERROR:
- case MSG_ERROR_XFER:
- case MSG_WARNING:
- if (msg_bytes >= sizeof line) {
- overflow:
- rprintf(FERROR,
- "multiplexing overflow %d:%ld [%s]\n",
- tag, (long)msg_bytes, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
- read_loop(fd, line, msg_bytes);
- rwrite((enum logcode)tag, line, msg_bytes, 1);
- if (first_message) {
- if (list_only && !am_sender && tag == 1) {
- line[msg_bytes] = '\0';
- check_for_d_option_error(line);
- }
- first_message = 0;
- }
- break;
- default:
- rprintf(FERROR, "unexpected tag %d [%s]\n",
- tag, who_am_i());
- exit_cleanup(RERR_STREAMIO);
- }
- }
- if (iobuf_in_remaining == 0)
- io_flush(NORMAL_FLUSH);
- return cnt;
- }
- static void readfd(int fd, char *buffer, size_t N)
- {
- int cnt;
- size_t total = 0;
- while (total < N) {
- cnt = readfd_unbuffered(fd, buffer + total, N-total);
- total += cnt;
- }
- if (fd == write_batch_monitor_in) {
- if ((size_t)write(batch_fd, buffer, total) != total)
- exit_cleanup(RERR_FILEIO);
- }
- if (fd == flist_forward_from)
- writefd(iobuf_f_out, buffer, total);
- if (fd == sock_f_in)
- stats.total_read += total;
- }
- unsigned short read_shortint(int f)
- {
- char b[2];
- readfd(f, b, 2);
- return (UVAL(b, 1) << 8) + UVAL(b, 0);
- }
- int32 read_int(int f)
- {
- char b[4];
- int32 num;
- readfd(f, b, 4);
- num = IVAL(b, 0);
- #if SIZEOF_INT32 > 4
- if (num & (int32)0x80000000)
- num |= ~(int32)0xffffffff;
- #endif
- return num;
- }
- int32 read_varint(int f)
- {
- union {
- char b[5];
- int32 x;
- } u;
- uchar ch;
- int extra;
- u.x = 0;
- readfd(f, (char*)&ch, 1);
- extra = int_byte_extra[ch / 4];
- if (extra) {
- uchar bit = ((uchar)1<<(8-extra));
- if (extra >= (int)sizeof u.b) {
- rprintf(FERROR, "Overflow in read_varint()\n");
- exit_cleanup(RERR_STREAMIO);
- }
- readfd(f, u.b, extra);
- u.b[extra] = ch & (bit-1);
- } else
- u.b[0] = ch;
- #if CAREFUL_ALIGNMENT
- u.x = IVAL(u.b,0);
- #endif
- #if SIZEOF_INT32 > 4
- if (u.x & (int32)0x80000000)
- u.x |= ~(int32)0xffffffff;
- #endif
- return u.x;
- }
- int64 read_varlong(int f, uchar min_bytes)
- {
- union {
- char b[9];
- int64 x;
- } u;
- char b2[8];
- int extra;
- #if SIZEOF_INT64 < 8
- memset(u.b, 0, 8);
- #else
- u.x = 0;
- #endif
- readfd(f, b2, min_bytes);
- memcpy(u.b, b2+1, min_bytes-1);
- extra = int_byte_extra[CVAL(b2, 0) / 4];
- if (extra) {
- uchar bit = ((uchar)1<<(8-extra));
- if (min_bytes + extra > (int)sizeof u.b) {
- rprintf(FERROR, "Overflow in read_varlong()\n");
- exit_cleanup(RERR_STREAMIO);
- }
- readfd(f, u.b + min_bytes - 1, extra);
- u.b[min_bytes + extra - 1] = CVAL(b2, 0) & (bit-1);
- #if SIZEOF_INT64 < 8
- if (min_bytes + extra > 5 || u.b[4] || CVAL(u.b,3) & 0x80) {
- rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
- exit_cleanup(RERR_UNSUPPORTED);
- }
- #endif
- } else
- u.b[min_bytes + extra - 1] = CVAL(b2, 0);
- #if SIZEOF_INT64 < 8
- u.x = IVAL(u.b,0);
- #elif CAREFUL_ALIGNMENT
- u.x = IVAL(u.b,0) | (((int64)IVAL(u.b,4))<<32);
- #endif
- return u.x;
- }
- int64 read_longint(int f)
- {
- #if SIZEOF_INT64 >= 8
- char b[9];
- #endif
- int32 num = read_int(f);
- if (num != (int32)0xffffffff)
- return num;
- #if SIZEOF_INT64 < 8
- rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
- exit_cleanup(RERR_UNSUPPORTED);
- #else
- readfd(f, b, 8);
- return IVAL(b,0) | (((int64)IVAL(b,4))<<32);
- #endif
- }
- void read_buf(int f, char *buf, size_t len)
- {
- readfd(f,buf,len);
- }
- void read_sbuf(int f, char *buf, size_t len)
- {
- readfd(f, buf, len);
- buf[len] = '\0';
- }
- uchar read_byte(int f)
- {
- uchar c;
- readfd(f, (char *)&c, 1);
- return c;
- }
- int read_vstring(int f, char *buf, int bufsize)
- {
- int len = read_byte(f);
- if (len & 0x80)
- len = (len & ~0x80) * 0x100 + read_byte(f);
- if (len >= bufsize) {
- rprintf(FERROR, "over-long vstring received (%d > %d)\n",
- len, bufsize - 1);
- return -1;
- }
- if (len)
- readfd(f, buf, len);
- buf[len] = '\0';
- return len;
- }
- void read_sum_head(int f, struct sum_struct *sum)
- {
- int32 max_blength = protocol_version < 30 ? OLD_MAX_BLOCK_SIZE : MAX_BLOCK_SIZE;
- sum->count = read_int(f);
- if (sum->count < 0) {
- rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
- (long)sum->count, who_am_i());
- exit_cleanup(RERR_PROTOCOL);
- }
- sum->blength = read_int(f);
- if (sum->blength < 0 || sum->blength > max_blength) {
- rprintf(FERROR, "Invalid block length %ld [%s]\n",
- (long)sum->blength, who_am_i());
- exit_cleanup(RERR_PROTOCOL);
- }
- sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
- if (sum->s2length < 0 || sum->s2length > MAX_DIGEST_LEN) {
- rprintf(FERROR, "Invalid checksum length %d [%s]\n",
- sum->s2length, who_am_i());
- exit_cleanup(RERR_PROTOCOL);
- }
- sum->remainder = read_int(f);
- if (sum->remainder < 0 || sum->remainder > sum->blength) {
- rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
- (long)sum->remainder, who_am_i());
- exit_cleanup(RERR_PROTOCOL);
- }
- }
- void write_sum_head(int f, struct sum_struct *sum)
- {
- static struct sum_struct null_sum;
- if (sum == NULL)
- sum = &null_sum;
- write_int(f, sum->count);
- write_int(f, sum->blength);
- if (protocol_version >= 27)
- write_int(f, sum->s2length);
- write_int(f, sum->remainder);
- }
- static void sleep_for_bwlimit(int bytes_written)
- {
- static struct timeval prior_tv;
- static long total_written = 0;
- struct timeval tv, start_tv;
- long elapsed_usec, sleep_usec;
- #define ONE_SEC 1000000L
- if (!bwlimit_writemax)
- return;
- total_written += bytes_written;
- gettimeofday(&start_tv, NULL);
- if (prior_tv.tv_sec) {
- elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
- + (start_tv.tv_usec - prior_tv.tv_usec);
- total_written -= (int64)elapsed_usec * bwlimit / (ONE_SEC/1024);
- if (total_written < 0)
- total_written = 0;
- }
- sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
- if (sleep_usec < ONE_SEC / 10) {
- prior_tv = start_tv;
- return;
- }
- tv.tv_sec = sleep_usec / ONE_SEC;
- tv.tv_usec = sleep_usec % ONE_SEC;
- select(0, NULL, NULL, NULL, &tv);
- gettimeofday(&prior_tv, NULL);
- elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
- + (prior_tv.tv_usec - start_tv.tv_usec);
- total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
- }
- static const char *what_fd_is(int fd)
- {
- static char buf[20];
- if (fd == sock_f_out)
- return "socket";
- else if (fd == msg_fd_out)
- return "message fd";
- else if (fd == batch_fd)
- return "batch file";
- else {
- snprintf(buf, sizeof buf, "fd %d", fd);
- return buf;
- }
- }
- static void writefd_unbuffered(int fd, const char *buf, size_t len)
- {
- size_t n, total = 0;
- fd_set w_fds, r_fds, e_fds;
- int maxfd, count, cnt, using_r_fds;
- int defer_inc = 0;
- struct timeval tv;
- if (no_flush++)
- defer_forwarding_messages++, defer_inc++;
- while (total < len) {
- FD_ZERO(&w_fds);
- FD_SET(fd, &w_fds);
- FD_ZERO(&e_fds);
- FD_SET(fd, &e_fds);
- maxfd = fd;
- if (msg_fd_in >= 0) {
- FD_ZERO(&r_fds);
- FD_SET(msg_fd_in, &r_fds);
- if (msg_fd_in > maxfd)
- maxfd = msg_fd_in;
- using_r_fds = 1;
- } else
- using_r_fds = 0;
- tv.tv_sec = select_timeout;
- tv.tv_usec = 0;
- errno = 0;
- count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
- &w_fds, &e_fds, &tv);
- if (count <= 0) {
- if (count < 0 && errno == EBADF)
- exit_cleanup(RERR_SOCKETIO);
- check_timeout();
- continue;
- }
-
- if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
- read_msg_fd();
- if (!FD_ISSET(fd, &w_fds))
- continue;
- n = len - total;
- if (bwlimit_writemax && n > bwlimit_writemax)
- n = bwlimit_writemax;
- cnt = write(fd, buf + total, n);
- if (cnt <= 0) {
- if (cnt < 0) {
- if (errno == EINTR)
- continue;
- if (errno == EWOULDBLOCK || errno == EAGAIN) {
- msleep(1);
- continue;
- }
- }
-
- if (fd == sock_f_out)
- io_end_multiplex_out();
-
- if (am_server && fd == msg_fd_out)
- exit_cleanup(RERR_STREAMIO);
- rsyserr(FERROR, errno,
- "writefd_unbuffered failed to write %ld bytes to %s [%s]",
- (long)len, what_fd_is(fd), who_am_i());
-
- while (!am_server && fd == sock_f_out && io_multiplexing_in) {
- char buf[1024];
- set_io_timeout(30);
- ignore_timeout = 0;
- readfd_unbuffered(sock_f_in, buf, sizeof buf);
- }
- exit_cleanup(RERR_STREAMIO);
- }
- total += cnt;
- defer_forwarding_messages++, defer_inc++;
- if (fd == sock_f_out) {
- if (io_timeout || am_generator)
- last_io_out = time(NULL);
- sleep_for_bwlimit(cnt);
- }
- }
- no_flush--;
- if (keep_defer_forwarding)
- defer_inc--;
- if (!(defer_forwarding_messages -= defer_inc) && !no_flush)
- msg_flush();
- }
- int io_flush(int flush_it_all)
- {
- int flushed_something = 0;
- if (no_flush)
- return 0;
- if (iobuf_out_cnt) {
- if (io_multiplexing_out)
- mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
- else
- writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
- iobuf_out_cnt = 0;
- flushed_something = 1;
- }
- if (flush_it_all && !defer_forwarding_messages && msg_queue.head) {
- msg_flush();
- flushed_something = 1;
- }
- return flushed_something;
- }
- static void writefd(int fd, const char *buf, size_t len)
- {
- if (fd == sock_f_out)
- stats.total_written += len;
- if (fd == write_batch_monitor_out)
- writefd_unbuffered(batch_fd, buf, len);
- if (!iobuf_out || fd != iobuf_f_out) {
- writefd_unbuffered(fd, buf, len);
- return;
- }
- while (len) {
- int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
- if (n > 0) {
- memcpy(iobuf_out+iobuf_out_cnt, buf, n);
- buf += n;
- len -= n;
- iobuf_out_cnt += n;
- }
- if (iobuf_out_cnt == IO_BUFFER_SIZE)
- io_flush(NORMAL_FLUSH);
- }
- }
- void write_shortint(int f, unsigned short x)
- {
- char b[2];
- b[0] = (char)x;
- b[1] = (char)(x >> 8);
- writefd(f, b, 2);
- }
- void write_int(int f, int32 x)
- {
- char b[4];
- SIVAL(b, 0, x);
- writefd(f, b, 4);
- }
- void write_varint(int f, int32 x)
- {
- char b[5];
- uchar bit;
- int cnt = 4;
- SIVAL(b, 1, x);
- while (cnt > 1 && b[cnt] == 0)
- cnt--;
- bit = ((uchar)1<<(7-cnt+1));
- if (CVAL(b, cnt) >= bit) {
- cnt++;
- *b = ~(bit-1);
- } else if (cnt > 1)
- *b = b[cnt] | ~(bit*2-1);
- else
- *b = b[cnt];
- writefd(f, b, cnt);
- }
- void write_varlong(int f, int64 x, uchar min_bytes)
- {
- char b[9];
- uchar bit;
- int cnt = 8;
- SIVAL(b, 1, x);
- #if SIZEOF_INT64 >= 8
- SIVAL(b, 5, x >> 32);
- #else
- if (x <= 0x7FFFFFFF && x >= 0)
- memset(b + 5, 0, 4);
- else {
- rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
- exit_cleanup(RERR_UNSUPPORTED);
- }
- #endif
- while (cnt > min_bytes && b[cnt] == 0)
- cnt--;
- bit = ((uchar)1<<(7-cnt+min_bytes));
- if (CVAL(b, cnt) >= bit) {
- cnt++;
- *b = ~(bit-1);
- } else if (cnt > min_bytes)
- *b = b[cnt] | ~(bit*2-1);
- else
- *b = b[cnt];
- writefd(f, b, cnt);
- }
- void write_longint(int f, int64 x)
- {
- char b[12], * const s = b+4;
- SIVAL(s, 0, x);
- if (x <= 0x7FFFFFFF && x >= 0) {
- writefd(f, s, 4);
- return;
- }
- #if SIZEOF_INT64 < 8
- rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
- exit_cleanup(RERR_UNSUPPORTED);
- #else
- memset(b, 0xFF, 4);
- SIVAL(s, 4, x >> 32);
- writefd(f, b, 12);
- #endif
- }
- void write_buf(int f, const char *buf, size_t len)
- {
- writefd(f,buf,len);
- }
- void write_sbuf(int f, const char *buf)
- {
- writefd(f, buf, strlen(buf));
- }
- void write_byte(int f, uchar c)
- {
- writefd(f, (char *)&c, 1);
- }
- void write_vstring(int f, const char *str, int len)
- {
- uchar lenbuf[3], *lb = lenbuf;
- if (len > 0x7F) {
- if (len > 0x7FFF) {
- rprintf(FERROR,
- "attempting to send over-long vstring (%d > %d)\n",
- len, 0x7FFF);
- exit_cleanup(RERR_PROTOCOL);
- }
- *lb++ = len / 0x100 + 0x80;
- }
- *lb = len;
- writefd(f, (char*)lenbuf, lb - lenbuf + 1);
- if (len)
- writefd(f, str, len);
- }
- void write_ndx(int f, int32 ndx)
- {
- static int32 prev_positive = -1, prev_negative = 1;
- int32 diff, cnt = 0;
- char b[6];
- if (protocol_version < 30 || read_batch) {
- write_int(f, ndx);
- return;
- }
-
- if (ndx >= 0) {
- diff = ndx - prev_positive;
- prev_positive = ndx;
- } else if (ndx == NDX_DONE) {
- *b = 0;
- writefd(f, b, 1);
- return;
- } else {
- b[cnt++] = (char)0xFF;
- ndx = -ndx;
- diff = ndx - prev_negative;
- prev_negative = ndx;
- }
-
- if (diff < 0xFE && diff > 0)
- b[cnt++] = (char)diff;
- else if (diff < 0 || diff > 0x7FFF) {
- b[cnt++] = (char)0xFE;
- b[cnt++] = (char)((ndx >> 24) | 0x80);
- b[cnt++] = (char)ndx;
- b[cnt++] = (char)(ndx >> 8);
- b[cnt++] = (char)(ndx >> 16);
- } else {
- b[cnt++] = (char)0xFE;
- b[cnt++] = (char)(diff >> 8);
- b[cnt++] = (char)diff;
- }
- writefd(f, b, cnt);
- }
- int32 read_ndx(int f)
- {
- static int32 prev_positive = -1, prev_negative = 1;
- int32 *prev_ptr, num;
- char b[4];
- if (protocol_version < 30)
- return read_int(f);
- readfd(f, b, 1);
- if (CVAL(b, 0) == 0xFF) {
- readfd(f, b, 1);
- prev_ptr = &prev_negative;
- } else if (CVAL(b, 0) == 0)
- return NDX_DONE;
- else
- prev_ptr = &prev_positive;
- if (CVAL(b, 0) == 0xFE) {
- readfd(f, b, 2);
- if (CVAL(b, 0) & 0x80) {
- b[3] = CVAL(b, 0) & ~0x80;
- b[0] = b[1];
- readfd(f, b+1, 2);
- num = IVAL(b, 0);
- } else
- num = (UVAL(b,0)<<8) + UVAL(b,1) + *prev_ptr;
- } else
- num = UVAL(b, 0) + *prev_ptr;
- *prev_ptr = num;
- if (prev_ptr == &prev_negative)
- num = -num;
- return num;
- }
- int read_line_old(int f, char *buf, size_t bufsiz)
- {
- bufsiz--;
- while (bufsiz > 0) {
- buf[0] = 0;
- read_buf(f, buf, 1);
- if (buf[0] == 0)
- return 0;
- if (buf[0] == '\n')
- break;
- if (buf[0] != '\r') {
- buf++;
- bufsiz--;
- }
- }
- *buf = '\0';
- return bufsiz > 0;
- }
- void io_printf(int fd, const char *format, ...)
- {
- va_list ap;
- char buf[BIGPATHBUFLEN];
- int len;
- va_start(ap, format);
- len = vsnprintf(buf, sizeof buf, format, ap);
- va_end(ap);
- if (len < 0)
- exit_cleanup(RERR_STREAMIO);
- if (len > (int)sizeof buf) {
- rprintf(FERROR, "io_printf() was too long for the buffer.\n");
- exit_cleanup(RERR_STREAMIO);
- }
- write_sbuf(fd, buf);
- }
- void io_start_multiplex_out(void)
- {
- io_flush(NORMAL_FLUSH);
- io_start_buffering_out(sock_f_out);
- io_multiplexing_out = 1;
- }
- void io_start_multiplex_in(void)
- {
- io_flush(NORMAL_FLUSH);
- io_start_buffering_in(sock_f_in);
- io_multiplexing_in = 1;
- }
- int io_multiplex_write(enum msgcode code, const char *buf, size_t len, int convert)
- {
- if (!io_multiplexing_out)
- return 0;
- io_flush(NORMAL_FLUSH);
- stats.total_written += (len+4);
- mplex_write(sock_f_out, code, buf, len, convert);
- return 1;
- }
- void io_end_multiplex_in(void)
- {
- io_multiplexing_in = 0;
- io_end_buffering_in();
- }
- void io_end_multiplex_out(void)
- {
- io_multiplexing_out = 0;
- io_end_buffering_out();
- }
- void start_write_batch(int fd)
- {
-
- write_int(batch_fd, protocol_version);
- if (protocol_version >= 30)
- write_byte(batch_fd, compat_flags);
- write_int(batch_fd, checksum_seed);
- if (am_sender)
- write_batch_monitor_out = fd;
- else
- write_batch_monitor_in = fd;
- }
- void stop_write_batch(void)
- {
- write_batch_monitor_out = -1;
- write_batch_monitor_in = -1;
- }
|