X-Git-Url: https://vcs.fsf.org/?p=exim.git;a=blobdiff_plain;f=src%2Fsrc%2Fqueue.c;h=53dc6e026c6ce6d4ceb9b364fa71c99206d83738;hp=60bf2ce77c39b67f2ef40a6670ea7a8a32a01127;hb=ecf0514306dd9c0baf94c43932cf77bd25fd8df0;hpb=32dfdf8baa8ccf091a0d5d4d75e8627424898756 diff --git a/src/src/queue.c b/src/src/queue.c index 60bf2ce77..53dc6e026 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -2,7 +2,7 @@ * Exim - an Internet mail transport agent * *************************************************/ -/* Copyright (c) University of Cambridge 1995 - 2015 */ +/* Copyright (c) University of Cambridge 1995 - 2018 */ /* See the file NOTICE for conditions of use and distribution. */ /* Functions that operate on the input queue. */ @@ -12,39 +12,6 @@ -/* Routines with knowledge of spool layout */ - -#ifndef COMPILE_UTILITY -static void -spool_pname_buf(uschar * buf, int len) -{ -snprintf(CS buf, len, "%s/%s/input", spool_directory, queue_name); -} - -uschar * -spool_dname(const uschar * purpose, uschar * subdir) -{ -return string_sprintf("%s/%s/%s/%s", - spool_directory, queue_name, purpose, subdir); -} -#endif - -uschar * -spool_sname(const uschar * purpose, uschar * subdir) -{ -return string_sprintf("%s%s%s%s%s", - queue_name, *queue_name ? "/" : "", - purpose, - *subdir ? "/" : "", subdir); -} - -uschar * -spool_fname(const uschar * purpose, const uschar * subdir, const uschar * fname, - const uschar * suffix) -{ -return string_sprintf("%s/%s/%s/%s/%s%s", - spool_directory, queue_name, purpose, subdir, fname, suffix); -} @@ -80,7 +47,11 @@ queue_filename *first = NULL; queue_filename **append = &first; while (a && b) - if (Ustrcmp(a->text, b->text) < 0) + { + int d; + if ((d = Ustrncmp(a->text, b->text, 6)) == 0) + d = Ustrcmp(a->text + 14, b->text + 14); + if (d < 0) { *append = a; append= &a->next; @@ -92,6 +63,7 @@ while (a && b) append= &b->next; b = b->next; } + } *append = a ? a : b; return first; @@ -138,13 +110,14 @@ Arguments: subdirs vector to store list of subdirchars subcount pointer to int in which to store count of subdirs randomize TRUE if the order of the list is to be unpredictable + pcount If not NULL, fill in with count of files and do not return list Returns: pointer to a chain of queue name items */ static queue_filename * queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount, - BOOL randomize) + BOOL randomize, unsigned * pcount) { int i; int flags = 0; @@ -162,7 +135,9 @@ according to the bits of the flags variable. Get a collection of bits from the current time. Use the bottom 16 and just keep re-using them if necessary. When not randomizing, initialize the sublists for the bottom-up merge sort. */ -if (randomize) +if (pcount) + *pcount = 0; +else if (randomize) resetflags = time(NULL) & 0xFFFF; else for (i = 0; i < LOG2_MAXNODES; i++) @@ -232,61 +207,63 @@ for (; i <= *subcount; i++) if (len == SPOOL_NAME_LENGTH && Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0) - { - queue_filename *next = - store_get(sizeof(queue_filename) + Ustrlen(name)); - Ustrcpy(next->text, name); - next->dir_uschar = subdirchar; - - /* Handle the creation of a randomized list. The first item becomes both - the top and bottom of the list. Subsequent items are inserted either at - the top or the bottom, randomly. This is, I argue, faster than doing a - sort by allocating a random number to each item, and it also saves having - to store the number with each item. */ - - if (randomize) - if (!yield) - { - next->next = NULL; - yield = last = next; - } - else - { - if (flags == 0) - flags = resetflags; - if ((flags & 1) == 0) - { - next->next = yield; - yield = next; - } - else - { - next->next = NULL; - last->next = next; - last = next; - } - flags = flags >> 1; - } + if (pcount) + (*pcount)++; + else + { + queue_filename *next = + store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name)); + Ustrcpy(next->text, name); + next->dir_uschar = subdirchar; + + /* Handle the creation of a randomized list. The first item becomes both + the top and bottom of the list. Subsequent items are inserted either at + the top or the bottom, randomly. This is, I argue, faster than doing a + sort by allocating a random number to each item, and it also saves having + to store the number with each item. */ + + if (randomize) + if (!yield) + { + next->next = NULL; + yield = last = next; + } + else + { + if (flags == 0) + flags = resetflags; + if ((flags & 1) == 0) + { + next->next = yield; + yield = next; + } + else + { + next->next = NULL; + last->next = next; + last = next; + } + flags = flags >> 1; + } - /* Otherwise do a bottom-up merge sort based on the name. */ + /* Otherwise do a bottom-up merge sort based on the name. */ - else - { - int j; - next->next = NULL; - for (j = 0; j < LOG2_MAXNODES; j++) - if (root[j]) - { - next = merge_queue_lists(next, root[j]); - root[j] = (j == LOG2_MAXNODES - 1)? next : NULL; - } - else - { - root[j] = next; - break; - } - } - } + else + { + next->next = NULL; + for (int j = 0; j < LOG2_MAXNODES; j++) + if (root[j]) + { + next = merge_queue_lists(next, root[j]); + root[j] = j == LOG2_MAXNODES - 1 ? next : NULL; + } + else + { + root[j] = next; + break; + } + } + } } /* Finished with this directory */ @@ -323,7 +300,7 @@ for (; i <= *subcount; i++) /* When using a bottom-up merge sort, do the final merging of the sublists. Then pass back the final list of file items. */ -if (!randomize) +if (!pcount && !randomize) for (i = 0; i < LOG2_MAXNODES; ++i) yield = merge_queue_lists(yield, root[i]); @@ -368,14 +345,18 @@ Returns: nothing void queue_run(uschar *start_id, uschar *stop_id, BOOL recurse) { -BOOL force_delivery = queue_run_force || deliver_selectstring != NULL || +BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL || deliver_selectstring_sender != NULL; const pcre *selectstring_regex = NULL; const pcre *selectstring_regex_sender = NULL; uschar *log_detail = NULL; int subcount = 0; -int i; uschar subdirs[64]; +pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */ + +#ifdef MEASURE_TIMING +report_time_since(×tamp_startup, US"queue_run start"); +#endif /* Cancel any specific queue domains. Turn off the flag that causes SMTP deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag @@ -385,10 +366,10 @@ on TCP/IP channels have queue_run_pid set, but not queue_running. */ queue_domains = NULL; queue_smtp_domains = NULL; -queue_smtp = queue_2stage; +f.queue_smtp = f.queue_2stage; queue_run_pid = getpid(); -queue_running = TRUE; +f.queue_running = TRUE; /* Log the true start of a queue run, and fancy options */ @@ -397,11 +378,11 @@ if (!recurse) uschar extras[8]; uschar *p = extras; - if (queue_2stage) *p++ = 'q'; - if (queue_run_first_delivery) *p++ = 'i'; - if (queue_run_force) *p++ = 'f'; - if (deliver_force_thaw) *p++ = 'f'; - if (queue_run_local) *p++ = 'l'; + if (f.queue_2stage) *p++ = 'q'; + if (f.queue_run_first_delivery) *p++ = 'i'; + if (f.queue_run_force) *p++ = 'f'; + if (f.deliver_force_thaw) *p++ = 'f'; + if (f.queue_run_local) *p++ = 'l'; *p = 0; p = big_buffer; @@ -411,11 +392,11 @@ if (!recurse) p += sprintf(CS p, " -q%s", extras); if (deliver_selectstring) - p += sprintf(CS p, " -R%s %s", deliver_selectstring_regex? "r" : "", + p += sprintf(CS p, " -R%s %s", f.deliver_selectstring_regex? "r" : "", deliver_selectstring); if (deliver_selectstring_sender) - p += sprintf(CS p, " -S%s %s", deliver_selectstring_sender_regex? "r" : "", + p += sprintf(CS p, " -S%s %s", f.deliver_selectstring_sender_regex? "r" : "", deliver_selectstring_sender); log_detail = string_copy(big_buffer); @@ -428,10 +409,10 @@ if (!recurse) /* If deliver_selectstring is a regex, compile it. */ -if (deliver_selectstring && deliver_selectstring_regex) +if (deliver_selectstring && f.deliver_selectstring_regex) selectstring_regex = regex_must_compile(deliver_selectstring, TRUE, FALSE); -if (deliver_selectstring_sender && deliver_selectstring_sender_regex) +if (deliver_selectstring_sender && f.deliver_selectstring_sender_regex) selectstring_regex_sender = regex_must_compile(deliver_selectstring_sender, TRUE, FALSE); @@ -450,12 +431,11 @@ subsequent iterations. When the first argument of queue_get_spool_list() is -1 (for queue_run_in_ order), it scans all directories and makes a single message list. */ -for (i = (queue_run_in_order? -1 : 0); - i <= (queue_run_in_order? -1 : subcount); +for (int i = queue_run_in_order ? -1 : 0; + i <= (queue_run_in_order ? -1 : subcount); i++) { - queue_filename *f; - void *reset_point1 = store_get(0); + rmark reset_point1 = store_mark(); DEBUG(D_queue_run) { @@ -467,9 +447,9 @@ for (i = (queue_run_in_order? -1 : 0); debug_printf("queue running subdirectory '%c'\n", subdirs[i]); } - for (f = queue_get_spool_list(i, subdirs, &subcount, !queue_run_in_order); - f; - f = f->next) + for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount, + !queue_run_in_order, NULL); + fq; fq = fq->next) { pid_t pid; int status; @@ -480,7 +460,7 @@ for (i = (queue_run_in_order? -1 : 0); /* Unless deliveries are forced, if deliver_queue_load_max is non-negative, check that the load average is low enough to permit deliveries. */ - if (!queue_run_force && deliver_queue_load_max >= 0) + if (!f.queue_run_force && deliver_queue_load_max >= 0) if ((load_average = os_getloadavg()) > deliver_queue_load_max) { log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)", @@ -495,18 +475,41 @@ for (i = (queue_run_in_order? -1 : 0); (double)load_average/1000.0, (double)deliver_queue_load_max/1000.0); + /* If initial of a 2-phase run, maintain a set of child procs + to get disk parallelism */ + + if (f.queue_2stage && !queue_run_in_order) + { + int i; + if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1]) + { + DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]); + waitpid(qpid[0], NULL, 0); + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]); + if (f.running_in_test_harness) i = 0; + else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1]; + qpid[i] = 0; + } + else + for (i = 0; qpid[i]; ) i++; + DEBUG(D_queue_run) debug_printf("q2stage forking\n"); + if ((qpid[i] = fork())) + continue; /* parent loops around */ + DEBUG(D_queue_run) debug_printf("q2stage child\n"); + } + /* Skip this message unless it's within the ID limits */ - if (stop_id && Ustrncmp(f->text, stop_id, MESSAGE_ID_LENGTH) > 0) - continue; - if (start_id && Ustrncmp(f->text, start_id, MESSAGE_ID_LENGTH) < 0) - continue; + if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0) + goto go_around; + if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0) + goto go_around; /* Check that the message still exists */ - message_subdir[0] = f->dir_uschar; - if (Ustat(spool_fname(US"input", message_subdir, f->text, US""), &statbuf) < 0) - continue; + message_subdir[0] = fq->dir_uschar; + if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0) + goto go_around; /* There are some tests that require the reading of the header file. Ensure the store used is scavenged afterwards so that this process doesn't keep @@ -515,25 +518,25 @@ for (i = (queue_run_in_order? -1 : 0); message when many are not going to be delivered. */ if (deliver_selectstring || deliver_selectstring_sender || - queue_run_first_delivery) + f.queue_run_first_delivery) { BOOL wanted = TRUE; - BOOL orig_dont_deliver = dont_deliver; - void *reset_point2 = store_get(0); + BOOL orig_dont_deliver = f.dont_deliver; + rmark reset_point2 = store_mark(); /* Restore the original setting of dont_deliver after reading the header, so that a setting for a particular message doesn't force it for any that follow. If the message is chosen for delivery, the header is read again in the deliver_message() function, in a subprocess. */ - if (spool_read_header(f->text, FALSE, TRUE) != spool_read_OK) continue; - dont_deliver = orig_dont_deliver; + if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around; + f.dont_deliver = orig_dont_deliver; /* Now decide if we want to deliver this message. As we have read the header file, we might as well do the freeze test now, and save forking another process. */ - if (deliver_freeze && !deliver_force_thaw) + if (f.deliver_freeze && !f.deliver_force_thaw) { log_write(L_skip_delivery, LOG_MAIN, "Message is frozen"); wanted = FALSE; @@ -541,9 +544,9 @@ for (i = (queue_run_in_order? -1 : 0); /* Check first_delivery in the case when there are no message logs. */ - else if (queue_run_first_delivery && !deliver_firsttime) + else if (f.queue_run_first_delivery && !f.deliver_firsttime) { - DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", f->text); + DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text); wanted = FALSE; } @@ -554,7 +557,7 @@ for (i = (queue_run_in_order? -1 : 0); /* Sender matching */ else if ( deliver_selectstring_sender - && !(deliver_selectstring_sender_regex + && !(f.deliver_selectstring_sender_regex ? (pcre_exec(selectstring_regex_sender, NULL, CS sender_address, Ustrlen(sender_address), 0, PCRE_EOPT, NULL, 0) >= 0) @@ -563,7 +566,7 @@ for (i = (queue_run_in_order? -1 : 0); ) ) { DEBUG(D_queue_run) debug_printf("%s: sender address did not match %s\n", - f->text, deliver_selectstring_sender); + fq->text, deliver_selectstring_sender); wanted = FALSE; } @@ -575,7 +578,7 @@ for (i = (queue_run_in_order? -1 : 0); for (i = 0; i < recipients_count; i++) { uschar *address = recipients_list[i].address; - if ( (deliver_selectstring_regex + if ( (f.deliver_selectstring_regex ? (pcre_exec(selectstring_regex, NULL, CS address, Ustrlen(address), 0, PCRE_EOPT, NULL, 0) >= 0) : (strstric(address, deliver_selectstring, FALSE) != NULL) @@ -589,18 +592,16 @@ for (i = (queue_run_in_order? -1 : 0); { DEBUG(D_queue_run) debug_printf("%s: no recipient address matched %s\n", - f->text, deliver_selectstring); + fq->text, deliver_selectstring); wanted = FALSE; } } /* Recover store used when reading the header */ - received_protocol = NULL; - sender_address = sender_ident = NULL; - authenticated_id = authenticated_sender = NULL; + spool_clear_header_globals(); store_reset(reset_point2); - if (!wanted) continue; /* With next message */ + if (!wanted) goto go_around; /* With next message */ } /* OK, got a message we want to deliver. Create a pipe which will @@ -639,15 +640,19 @@ for (i = (queue_run_in_order? -1 : 0); /* Now deliver the message; get the id by cutting the -H off the file name. The return of the process is zero if a delivery was attempted. */ - set_process_info("running queue: %s", f->text); - f->text[SPOOL_NAME_LENGTH-2] = 0; + set_process_info("running queue: %s", fq->text); + fq->text[SPOOL_NAME_LENGTH-2] = 0; +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue msg selected"); +#endif + if ((pid = fork()) == 0) { int rc; - if (running_in_test_harness) millisleep(100); + testharness_pause_ms(100); (void)close(pfd[pipe_read]); - rc = deliver_message(f->text, force_delivery, FALSE); - _exit(rc == DELIVER_NOT_ATTEMPTED); + rc = deliver_message(fq->text, force_delivery, FALSE); + exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED); } if (pid < 0) log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from " @@ -657,20 +662,20 @@ for (i = (queue_run_in_order? -1 : 0); then wait for the first level process to terminate. */ (void)close(pfd[pipe_write]); - set_process_info("running queue: waiting for %s (%d)", f->text, pid); + set_process_info("running queue: waiting for %s (%d)", fq->text, pid); while (wait(&status) != pid); /* A zero return means a delivery was attempted; turn off the force flag for any subsequent calls unless queue_force is set. */ - if ((status & 0xffff) == 0) force_delivery = queue_run_force; + if (!(status & 0xffff)) force_delivery = f.queue_run_force; /* If the process crashed, tell somebody */ - else if ((status & 0x00ff) != 0) + else if (status & 0x00ff) log_write(0, LOG_MAIN|LOG_PANIC, "queue run: process %d crashed with signal %d while delivering %s", - (int)pid, status & 0x00ff, f->text); + (int)pid, status & 0x00ff, fq->text); /* Before continuing, wait till the pipe gets closed at the far end. This tells us that any children created by the delivery to re-use any SMTP @@ -679,30 +684,44 @@ for (i = (queue_run_in_order? -1 : 0); set_process_info("running queue: waiting for children of %d", pid); if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0) - log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe", - status > 0 ? "unexpected data" : "error"); + log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ? + "queue run: unexpected data on pipe" : "queue run: error on pipe: %s", + strerror(errno)); (void)close(pfd[pipe_read]); set_process_info("running queue"); + /* If initial of a 2-phase run, we are a child - so just exit */ + if (f.queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS, US"2-phase child"); + /* If we are in the test harness, and this is not the first of a 2-stage queue run, update fudged queue times. */ - if (running_in_test_harness && !queue_2stage) + if (f.running_in_test_harness && !f.queue_2stage) { - uschar *fqtnext = Ustrchr(fudged_queue_times, '/'); - if (fqtnext != NULL) fudged_queue_times = fqtnext + 1; + uschar * fqtnext = Ustrchr(fudged_queue_times, '/'); + if (fqtnext) fudged_queue_times = fqtnext + 1; } + + + continue; + + go_around: + /* If initial of a 2-phase run, we are a child - so just exit */ + if (f.queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS, US"2-phase child"); } /* End loop for list of messages */ + tree_nonrecipients = NULL; store_reset(reset_point1); /* Scavenge list of messages */ /* If this was the first time through for random order processing, and sub-directories have been found, randomize their order if necessary. */ if (i == 0 && subcount > 1 && !queue_run_in_order) - { - int j, r; - for (j = 1; j <= subcount; j++) + for (int j = 1; j <= subcount; j++) + { + int r; if ((r = random_number(100)) >= 50) { int k = (r % subcount) + 1; @@ -710,15 +729,28 @@ for (i = (queue_run_in_order? -1 : 0); subdirs[j] = subdirs[k]; subdirs[k] = x; } - } + } } /* End loop for multiple directories */ /* If queue_2stage is true, we do it all again, with the 2stage flag turned off. */ -if (queue_2stage) +if (f.queue_2stage) { - queue_2stage = FALSE; + + /* wait for last children */ + for (int i = 0; i < nelem(qpid); i++) + if (qpid[i]) + { + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]); + waitpid(qpid[i], NULL, 0); + } + else break; + +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue_run 1st phase done"); +#endif + f.queue_2stage = FALSE; queue_run(start_id, stop_id, TRUE); } @@ -742,26 +774,38 @@ if (!recurse) /* Called as a result of -bpc Arguments: none -Returns: nothing +Returns: count */ -void +unsigned queue_count(void) { int subcount; -int count = 0; -queue_filename *f = NULL; +unsigned count = 0; uschar subdirs[64]; -f = queue_get_spool_list( - -1, /* entire queue */ - subdirs, /* for holding sub list */ - &subcount, /* for subcount */ - FALSE); /* not random */ -for (; f != NULL; f = f->next) count++; -fprintf(stdout, "%d\n", count); + +(void) queue_get_spool_list(-1, /* entire queue */ + subdirs, /* for holding sub list */ + &subcount, /* for subcount */ + FALSE, /* not random */ + &count); /* just get the count */ +return count; } +#define QUEUE_SIZE_AGE 60 /* update rate for queue_size */ + +unsigned +queue_count_cached(void) +{ +time_t now; +if ((now = time(NULL)) >= queue_size_next) + { + queue_size = queue_count(); + queue_size_next = now + (f.running_in_test_harness ? 3 : QUEUE_SIZE_AGE); + } +return queue_size; +} /************************************************ * List extra deliveries * @@ -777,11 +821,12 @@ Argument: points to the tree node Returns: nothing */ -static void queue_list_extras(tree_node *p) +static void +queue_list_extras(tree_node *p) { -if (p->left != NULL) queue_list_extras(p->left); +if (p->left) queue_list_extras(p->left); if (!p->data.val) printf(" +D %s\n", p->name); -if (p->right != NULL) queue_list_extras(p->right); +if (p->right) queue_list_extras(p->right); } @@ -812,11 +857,10 @@ Returns: nothing void queue_list(int option, uschar **list, int count) { -int i; int subcount; int now = (int)time(NULL); -void *reset_point; -queue_filename *f = NULL; +rmark reset_point; +queue_filename * qf = NULL; uschar subdirs[64]; /* If given a list of messages, build a chain containing their ids. */ @@ -824,14 +868,14 @@ uschar subdirs[64]; if (count > 0) { queue_filename *last = NULL; - for (i = 0; i < count; i++) + for (int i = 0; i < count; i++) { queue_filename *next = - store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2); + store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i])); sprintf(CS next->text, "%s-H", list[i]); next->dir_uschar = '*'; next->next = NULL; - if (i == 0) f = next; else last->next = next; + if (i == 0) qf = next; else last->next = next; last = next; } } @@ -839,37 +883,42 @@ if (count > 0) /* Otherwise get a list of the entire queue, in order if necessary. */ else - f = queue_get_spool_list( + qf = queue_get_spool_list( -1, /* entire queue */ subdirs, /* for holding sub list */ &subcount, /* for subcount */ - option >= 8); /* randomize if required */ + option >= 8, /* randomize if required */ + NULL); /* don't just count */ if (option >= 8) option -= 8; /* Now scan the chain and print information, resetting store used each time. */ -for (reset_point = store_get(0); f; f = f->next) +for (; + qf && (reset_point = store_mark()); + spool_clear_header_globals(), store_reset(reset_point), qf = qf->next + ) { int rc, save_errno; int size = 0; BOOL env_read; message_size = 0; - message_subdir[0] = f->dir_uschar; - rc = spool_read_header(f->text, FALSE, count <= 0); - if (rc == spool_read_notopen && errno == ENOENT && count <= 0) goto next; + message_subdir[0] = qf->dir_uschar; + rc = spool_read_header(qf->text, FALSE, count <= 0); + if (rc == spool_read_notopen && errno == ENOENT && count <= 0) + continue; save_errno = errno; env_read = (rc == spool_read_OK || rc == spool_read_hdrerror); if (env_read) { - int ptr; + int i, ptr; FILE *jread; struct stat statbuf; - uschar * fname = spool_fname(US"input", message_subdir, f->text, US""); + uschar * fname = spool_fname(US"input", message_subdir, qf->text, US""); ptr = Ustrlen(fname)-1; fname[ptr] = 'D'; @@ -904,12 +953,12 @@ for (reset_point = store_get(0); f; f = f->next) } fprintf(stdout, "%s ", string_format_size(size, big_buffer)); - for (i = 0; i < 16; i++) fputc(f->text[i], stdout); + for (int i = 0; i < 16; i++) fputc(qf->text[i], stdout); if (env_read && sender_address) { printf(" <%s>", sender_address); - if (sender_set_untrusted) printf(" (%s)", originator_login); + if (f.sender_set_untrusted) printf(" (%s)", originator_login); } if (rc != spool_read_OK) @@ -918,7 +967,7 @@ for (reset_point = store_get(0); f; f = f->next) if (save_errno == ERRNO_SPOOLFORMAT) { struct stat statbuf; - uschar * fname = spool_fname(US"input", message_subdir, f->text, US""); + uschar * fname = spool_fname(US"input", message_subdir, qf->text, US""); if (Ustat(fname, &statbuf) == 0) printf("*** spool format error: size=" OFF_T_FMT " ***", @@ -929,39 +978,29 @@ for (reset_point = store_get(0); f; f = f->next) if (rc != spool_read_hdrerror) { printf("\n\n"); - goto next; + continue; } } - if (deliver_freeze) printf(" *** frozen ***"); + if (f.deliver_freeze) printf(" *** frozen ***"); printf("\n"); if (recipients_list) { - for (i = 0; i < recipients_count; i++) + for (int i = 0; i < recipients_count; i++) { tree_node *delivered = tree_search(tree_nonrecipients, recipients_list[i].address); if (!delivered || option != 1) - printf(" %s %s\n", (delivered != NULL)? "D":" ", - recipients_list[i].address); + printf(" %s %s\n", + delivered ? "D" : " ", recipients_list[i].address); if (delivered) delivered->data.val = TRUE; } if (option == 2 && tree_nonrecipients) queue_list_extras(tree_nonrecipients); printf("\n"); } - -next: - received_protocol = NULL; - sender_fullhost = sender_helo_name = - sender_rcvhost = sender_host_address = sender_address = sender_ident = NULL; - sender_host_authenticated = authenticated_sender = authenticated_id = NULL; - interface_address = NULL; - acl_var_m = NULL; - - store_reset(reset_point); } } @@ -988,7 +1027,6 @@ Returns: FALSE if there was any problem BOOL queue_action(uschar *id, int action, uschar **argv, int argc, int recipients_arg) { -int i, j; BOOL yield = TRUE; BOOL removed = FALSE; struct passwd *pw; @@ -1007,10 +1045,10 @@ done. Only admin users may read the spool files. */ if (action >= MSG_SHOW_BODY) { - int fd, i, rc; + int fd, rc; uschar *subdirectory, *suffix; - if (!admin_user) + if (!f.admin_user) { printf("Permission denied\n"); return FALSE; @@ -1038,9 +1076,9 @@ if (action >= MSG_SHOW_BODY) suffix = US""; } - for (i = 0; i < 2; i++) + for (int i = 0; i < 2; i++) { - message_subdir[0] = split_spool_directory == (i == 0) ? id[5] : 0; + set_subdir_str(message_subdir, id, i); if ((fd = Uopen(spool_fname(subdirectory, message_subdir, id, suffix), O_RDONLY, 0)) >= 0) break; @@ -1072,7 +1110,7 @@ if ((deliver_datafile = spool_open_datafile(id)) < 0) { yield = FALSE; printf("Spool data file for %s does not exist\n", id); - if (action != MSG_REMOVE || !admin_user) return FALSE; + if (action != MSG_REMOVE || !f.admin_user) return FALSE; printf("Continuing, to ensure all files removed\n"); } else @@ -1097,7 +1135,7 @@ if (spool_read_header(spoolname, TRUE, FALSE) != spool_read_OK) printf("Spool read error for %s: %s\n", spoolname, strerror(errno)); else printf("Spool format error for %s\n", spoolname); - if (action != MSG_REMOVE || !admin_user) + if (action != MSG_REMOVE || !f.admin_user) { (void)close(deliver_datafile); deliver_datafile = -1; @@ -1111,7 +1149,7 @@ message. Only admin users may freeze/thaw, add/cancel recipients, or otherwise mess about, but the original sender is permitted to remove a message. That's why we leave this check until after the headers are read. */ -if (!admin_user && (action != MSG_REMOVE || real_uid != originator_uid)) +if (!f.admin_user && (action != MSG_REMOVE || real_uid != originator_uid)) { printf("Permission denied\n"); (void)close(deliver_datafile); @@ -1133,25 +1171,25 @@ switch(action) { case MSG_SHOW_COPY: { - transport_ctx tctx = {0}; + transport_ctx tctx = {{0}}; deliver_in_buffer = store_malloc(DELIVER_IN_BUFFER_SIZE); deliver_out_buffer = store_malloc(DELIVER_OUT_BUFFER_SIZE); tctx.u.fd = 1; - transport_write_message(&tctx, 0); + (void) transport_write_message(&tctx, 0); break; } case MSG_FREEZE: - if (deliver_freeze) + if (f.deliver_freeze) { yield = FALSE; printf("is already frozen\n"); } else { - deliver_freeze = TRUE; - deliver_manual_thaw = FALSE; + f.deliver_freeze = TRUE; + f.deliver_manual_thaw = FALSE; deliver_frozen_at = time(NULL); if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0) { @@ -1168,15 +1206,15 @@ switch(action) case MSG_THAW: - if (!deliver_freeze) + if (!f.deliver_freeze) { yield = FALSE; printf("is not frozen\n"); } else { - deliver_freeze = FALSE; - deliver_manual_thaw = TRUE; + f.deliver_freeze = FALSE; + f.deliver_manual_thaw = TRUE; if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0) { printf("is no longer frozen\n"); @@ -1205,7 +1243,7 @@ switch(action) suffix[2] = 0; message_subdir[0] = id[5]; - for (j = 0; j < 2; message_subdir[0] = 0, j++) + for (int j = 0; j < 2; message_subdir[0] = 0, j++) { uschar * fname = spool_fname(US"msglog", message_subdir, id, US""); @@ -1225,7 +1263,7 @@ switch(action) DEBUG(D_any) debug_printf(" (ok)\n"); } - for (i = 0; i < 3; i++) + for (int i = 0; i < 3; i++) { uschar * fname; @@ -1257,6 +1295,38 @@ switch(action) else printf("has been removed or did not exist\n"); if (removed) { +#ifndef DISABLE_EVENT + if (event_action) for (int i = 0; i < recipients_count; i++) + { + tree_node *delivered = + tree_search(tree_nonrecipients, recipients_list[i].address); + if (!delivered) + { + uschar * save_local = deliver_localpart; + const uschar * save_domain = deliver_domain; + uschar * addr = recipients_list[i].address, * errmsg = NULL; + int start, end, dom; + + if (!parse_extract_address(addr, &errmsg, &start, &end, &dom, TRUE)) + log_write(0, LOG_MAIN|LOG_PANIC, + "failed to parse address '%.100s'\n: %s", addr, errmsg); + else + { + deliver_localpart = + string_copyn(addr+start, dom ? (dom-1) - start : end - start); + deliver_domain = dom + ? CUS string_copyn(addr+dom, end - dom) : CUS""; + + event_raise(event_action, US"msg:fail:internal", + string_sprintf("message removed by %s", username)); + + deliver_localpart = save_local; + deliver_domain = save_domain; + } + } + } + (void) event_raise(event_action, US"msg:complete", NULL); +#endif log_write(0, LOG_MAIN, "removed by %s", username); log_write(0, LOG_MAIN, "Completed"); } @@ -1264,15 +1334,22 @@ switch(action) } + case MSG_SETQUEUE: + /* The global "queue_name_dest" is used as destination, "queue_name" + as source */ + + spool_move_message(id, message_subdir, US"", US""); + break; + + case MSG_MARK_ALL_DELIVERED: - for (i = 0; i < recipients_count; i++) - { + for (int i = 0; i < recipients_count; i++) tree_add_nonrecipient(recipients_list[i].address); - } + if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0) { printf("has been modified\n"); - for (i = 0; i < recipients_count; i++) + for (int i = 0; i < recipients_count; i++) log_write(0, LOG_MAIN, "address <%s> marked delivered by %s", recipients_list[i].address, username); } @@ -1343,6 +1420,7 @@ switch(action) } else if (action == MSG_MARK_DELIVERED) { + int i; for (i = 0; i < recipients_count; i++) if (Ustrcmp(recipients_list[i].address, recipient) == 0) break; if (i >= recipients_count) @@ -1409,37 +1487,71 @@ Returns: nothing void queue_check_only(void) { -BOOL *set; int sep = 0; struct stat statbuf; -const uschar *s; -uschar *ss, *name; -uschar buffer[1024]; +const uschar * s = queue_only_file; +uschar * ss; + +if (s) + while ((ss = string_nextinlist(&s, &sep, NULL, 0))) + if (Ustrncmp(ss, "smtp", 4) == 0) + { + ss += 4; + if (Ustat(ss, &statbuf) == 0) + { + f.queue_smtp = TRUE; + DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss); + } + } + else + if (Ustat(ss, &statbuf) == 0) + { + queue_only = TRUE; + DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss); + } +} + -if (queue_only_file == NULL) return; -s = queue_only_file; -while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL) +/******************************************************************************/ +/******************************************************************************/ + +#ifdef EXPERIMENTAL_QUEUE_RAMP +void +queue_notify_daemon(const uschar * msgid) +{ +uschar buf[MESSAGE_ID_LENGTH + 2]; +int fd; + +DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid); + +buf[0] = NOTIFY_MSG_QRUN; +memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1); + +if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0) { - if (Ustrncmp(ss, "smtp", 4) == 0) - { - name = US"queue_smtp"; - set = &queue_smtp; - ss += 4; - } - else - { - name = US"queue_only"; - set = &queue_only; - } + struct sockaddr_un sa_un = {.sun_family = AF_UNIX}; + int slen; + +#ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS + int len = offsetof(struct sockaddr_un, sun_path) + 1 + + snprintf(sa_un.sun_path+1, sizeof(sa_un.sun_path)-1, "%s", + NOTIFIER_SOCKET_NAME); + sa_un.sun_path[0] = 0; +#else + int len = offsetof(struct sockaddr_un, sun_path) + + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), "%s/%s", + spool_directory, NOTIFIER_SOCKET_NAME); +#endif - if (Ustat(ss, &statbuf) == 0) - { - *set = TRUE; - DEBUG(D_receive) debug_printf("%s set because %s exists\n", name, ss); - } + if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, len) < 0) + DEBUG(D_queue_run) + debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno)); + close(fd); } +else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno)); } +#endif #endif /*!COMPILE_UTILITY*/