max_parallel transport option
authorJeremy Harris <jgh146exb@wizmail.org>
Thu, 8 Oct 2015 22:59:27 +0000 (23:59 +0100)
committerJeremy Harris <jgh146exb@wizmail.org>
Tue, 13 Oct 2015 00:29:55 +0000 (01:29 +0100)
14 files changed:
doc/doc-docbook/spec.xfpt
doc/doc-txt/NewStuff
src/src/deliver.c
src/src/globals.c
src/src/structs.h
src/src/transport.c
test/confs/0288
test/confs/0611 [new file with mode: 0644]
test/log/0288
test/log/0611 [new file with mode: 0644]
test/scripts/0000-Basic/0288
test/scripts/0000-Basic/0611 [new file with mode: 0644]
test/stdout/0288
test/stdout/0390

index ed3533a..f9a8efa 100644 (file)
@@ -1575,7 +1575,7 @@ If a host is unreachable for a period of time, a number of messages may be
 waiting for it by the time it recovers, and sending them in a single SMTP
 connection is clearly beneficial. Whenever a delivery to a remote host is
 deferred,
-.cindex "hints database"
+.cindex "hints database" "deferred deliveries"
 Exim makes a note in its hints database, and whenever a successful
 SMTP delivery has happened, it looks to see if any other messages are waiting
 for the same host. If any are found, they are sent over the same SMTP
@@ -20539,6 +20539,32 @@ transport, the &[initgroups()]& function is called when running the transport
 to ensure that any additional groups associated with the uid are set up.
 
 
+.new
+.option max_parallel transports integer&!! unset
+.cindex limit "transport parallelism"
+.cindex transport "parallel processes"
+.cindex transport "concurrency limit"
+.cindex "delivery" "parallelism for transport"
+If this option is set and expands to an integer greater than zero
+it limits the number of concurrent runs of the transport.
+The control does not apply to shadow transports.
+
+.cindex "hints database" "transport concurrency control"
+Exim implements this control by means of a hints database in which a record is
+incremented whenever a transport process is beaing created. The record
+is decremented and possibly removed when the process terminates.
+Obviously there is scope for
+records to get left lying around if there is a system or program crash. To
+guard against this, Exim ignores any records that are more than six hours old.
+
+If you use this option, you should also arrange to delete the
+relevant hints database whenever your system reboots. The names of the files
+start with &_misc_& and they are kept in the &_spool/db_& directory. There
+may be one or two files, depending on the type of DBM in use. The same files
+are used for ETRN and smtp transport serialization.
+.wen
+
+
 .option message_size_limit transports string&!! 0
 .cindex "limit" "message size per transport"
 .cindex "size" "of message, limit"
@@ -22436,6 +22462,10 @@ If two messages arrive at almost the same time, and both are routed to a pipe
 delivery, the two pipe transports may be run concurrently. You must ensure that
 any pipe commands you set up are robust against this happening. If the commands
 write to a file, the &%exim_lock%& utility might be of use.
+.new
+Alternatively the &%max_parallel%& option could be used with a value
+of "1" to enforce serialization.
+.wen
 
 
 
@@ -23618,6 +23648,10 @@ start with &_misc_& and they are kept in the &_spool/db_& directory. There
 may be one or two files, depending on the type of DBM in use. The same files
 are used for ETRN serialization.
 
+.new
+See also the &%max_parallel%& generic transport option.
+.wen
+
 
 .option size_addition smtp integer 1024
 .cindex "SMTP" "SIZE"
@@ -36265,6 +36299,9 @@ Serializing ETRN runs (when &%smtp_etrn_serialize%& is set)
 .next
 Serializing delivery to a specific host (when &%serialize_hosts%& is set in an
 &(smtp)& transport)
+.next
+Limiting the concurrency of specific transports (when &%max_parallel%& is set
+in a transport)
 .endlist
 
 
index e7d42d4..a7185a4 100644 (file)
@@ -15,6 +15,8 @@ Version 4.87
  2. New $callout_address variable records the address used for a spam=,
     malware= or verify= callout.
 
+ 3. Transports now take a "max_parallel" option, to limit concurrency.
+
 
 Version 4.86
 ------------
index a1d16ec..6a3df89 100644 (file)
@@ -1945,9 +1945,6 @@ if (  !shadowing
     }
   }
 
-/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer
-this late (we could be a shadow tpt)? */
-
 /* Create the pipe for inter-process communication. */
 
 if (pipe(pfd) != 0)
@@ -2317,6 +2314,22 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message)
 
 
 
+
+/* Put the chain of addrs on the defer list.  Retry will happen
+on the next queue run, earlier if triggered by a new message.
+Loop for the next set of addresses. */
+
+static void
+deferlist_chain(address_item * addr)
+{
+address_item * next;
+for (next = addr; next->next; next = next->next) ;
+next->next = addr_defer;
+addr_defer = addr;
+}
+
+
+
 /*************************************************
 *              Do local deliveries               *
 *************************************************/
@@ -2348,6 +2361,7 @@ while (addr_local)
   int logflags = LOG_MAIN;
   int logchar = dont_deliver? '*' : '=';
   transport_instance *tp;
+  uschar * serialize_key = NULL;
 
   /* Pick the first undelivered address off the chain */
 
@@ -2483,7 +2497,7 @@ while (addr_local)
         last = next;
         batch_count++;
         }
-      else anchor = &(next->next);      /* Skip the address */
+      else anchor = &next->next;        /* Skip the address */
       }
     }
 
@@ -2614,6 +2628,40 @@ while (addr_local)
 
   if (!addr) continue;
 
+  /* If the transport is limited for parallellism, enforce that here.
+  We use a hints DB entry, incremented here and decremented after
+  the transport (and any shadow transport) completes. */
+
+  if (tp->max_parallel)
+    {
+    int_eximarith_t max_parallel =
+      expand_string_integer(tp->max_parallel, TRUE);
+    if (expand_string_message)
+      {
+      logflags |= LOG_PANIC;
+      log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+            "in %s transport (%s): %s", tp->name, addr->address,
+            expand_string_message);
+      for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+       post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
+      continue;
+      }
+    if (  max_parallel > 0
+       && !enq_start(
+           serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+           (unsigned) max_parallel)
+       )
+      {
+      DEBUG(D_transport)
+       debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+                   tp->name, (unsigned) max_parallel);
+
+      deferlist_chain(addr);
+      continue;
+      }
+    }
+
+
   /* So, finally, we do have some addresses that can be passed to the
   transport. Before doing so, set up variables that are relevant to a
   single delivery. */
@@ -2719,6 +2767,10 @@ while (addr_local)
 
   deliver_set_expansions(NULL);
 
+  /* If the transport was parallelism-limited, decrement the hints DB record. */
+
+  if (serialize_key) enq_end(serialize_key);
+
   /* Now we can process the results of the real transport. We must take each
   address off the chain first, because post_process_one() puts it on another
   chain. */
@@ -3730,7 +3782,14 @@ while (parcount > max)
       "remote delivery process count got out of step");
     parcount = 0;
     }
-  else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+  else
+    {
+    transport_instance * tp = doneaddr->transport;
+    if (tp->max_parallel)
+      enq_end(string_sprintf("tpt-serialize-%s", tp->name));
+
+    remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+    }
   }
 }
 
@@ -3853,6 +3912,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   address_item *last = addr;
   address_item *next;
   uschar * panicmsg;
+  uschar * serialize_key = NULL;
 
   /* Pull the first address right off the list. */
 
@@ -4027,6 +4087,34 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     return FALSE;
     }
 
+  /* If the transport is limited for parallellism, enforce that here.
+  The hints DB entry is decremented in par_reduce(), when we reap the
+  transport process. */
+
+  if (tp->max_parallel)
+    {
+    int_eximarith_t max_parallel =
+      expand_string_integer(tp->max_parallel, TRUE);
+    if (expand_string_message)
+      {
+      panicmsg = expand_string_message;
+      goto panic_continue;
+      }
+    if (  max_parallel > 0
+       && !enq_start(
+           serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+           (unsigned) max_parallel)
+       )
+      {
+      DEBUG(D_transport)
+       debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+                   tp->name, (unsigned) max_parallel);
+
+      deferlist_chain(addr);
+      continue;
+      }
+    }
+
   /* Set up the expansion variables for this set of addresses */
 
   deliver_set_expansions(addr);
@@ -4055,7 +4143,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
       {
       panicmsg = string_sprintf("Failed to expand return path \"%s\": %s",
        tp->return_path, expand_string_message);
-      goto panic_continue;
+      goto enq_continue;
       }
     }
 
@@ -4066,7 +4154,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   if (!findugid(addr, tp, &uid, &gid, &use_initgroups))
     {
     panicmsg = NULL;
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* If this transport has a setup function, call it now so that it gets
@@ -4104,11 +4192,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     if (!ok)
       {
       DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n");
-      next = addr;
+      if (serialize_key) enq_end(serialize_key);
 
       if (addr->fallback_hosts && !fallback)
         {
-        for (;; next = next->next)
+       for (next = addr; ; next = next->next)
           {
           next->host_list = next->fallback_hosts;
           DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address);
@@ -4119,11 +4207,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
         }
 
       else
-        {
-        while (next->next) next = next->next;
-        next->next = addr_defer;
-        addr_defer = addr;
-        }
+       deferlist_chain(addr);
 
       continue;
       }
@@ -4185,7 +4269,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   if (!pipe_done)
     {
     panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno));
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Find a free slot in the pardata list. Must do this after the possible
@@ -4203,7 +4287,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     (void)close(pfd[pipe_write]);
     (void)close(pfd[pipe_read]);
     panicmsg = US"Unexpectedly no free subprocess slot";
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Now fork a subprocess to do the remote delivery, but before doing so,
@@ -4532,7 +4616,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     (void)close(pfd[pipe_read]);
     panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",
         addr->domain, strerror(errno));
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Fork succeeded; increment the count, and remember relevant data for
@@ -4567,6 +4651,8 @@ for (delivery_count = 0; addr_remote; delivery_count++)
 
   continue;
 
+enq_continue:
+  if (serialize_key) enq_end(serialize_key);
 panic_continue:
   remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback);
   continue;
@@ -7448,7 +7534,7 @@ if (!addr_defer)
 #ifdef EXPERIMENTAL_EVENT
   (void) event_raise(event_action, US"msg:complete", NULL);
 #endif
-}
+  }
 
 /* If there are deferred addresses, we are keeping this message because it is
 not yet completed. Lose any temporary files that were catching output from
index 55a101f..20e578e 100644 (file)
@@ -1424,6 +1424,7 @@ transport_instance  transport_defaults = {
     NULL,                     /* remove_headers */
     NULL,                     /* return_path */
     NULL,                     /* debug_string */
+    NULL,                     /* max_parallel */
     NULL,                     /* message_size_limit */
     NULL,                     /* headers_rewrite */
     NULL,                     /* rewrite_rules */
index c36d08c..713702e 100644 (file)
@@ -171,6 +171,7 @@ typedef struct transport_instance {
   uschar *remove_headers;         /* Remove these headers */
   uschar *return_path;            /* Overriding (rewriting) return path */
   uschar *debug_string;           /* Debugging output */
+  uschar *max_parallel;           /* Number of concurrent instances */
   uschar *message_size_limit;     /* Biggest message this transport handles */
   uschar *headers_rewrite;        /* Rules for rewriting headers */
   rewrite_rule *rewrite_rules;    /* Parsed rewriting rules */
index a6ad3ed..c258bfd 100644 (file)
@@ -84,6 +84,8 @@ optionlist optionlist_transports[] = {
                  (void *)offsetof(transport_instance, home_dir) },
   { "initgroups",       opt_bool|opt_public,
                  (void *)offsetof(transport_instance, initgroups) },
+  { "max_parallel",     opt_stringptr|opt_public,
+                 (void *)offsetof(transport_instance, max_parallel) },
   { "message_size_limit", opt_stringptr|opt_public,
                  (void *)offsetof(transport_instance, message_size_limit) },
   { "rcpt_include_affixes", opt_bool|opt_public,
index 270ffb2..1d41d08 100644 (file)
@@ -1,4 +1,5 @@
 # Exim test configuration 0288
+# serialize_hosts option on smtp transport
 
 exim_path = EXIM_PATH
 host_lookup_order = bydns
diff --git a/test/confs/0611 b/test/confs/0611
new file mode 100644 (file)
index 0000000..b1bff27
--- /dev/null
@@ -0,0 +1,68 @@
+# Exim test configuration 0611
+# max_parallel on transport
+
+SERVER=
+
+exim_path = EXIM_PATH
+host_lookup_order = bydns
+primary_hostname = myhost.test.ex
+spool_directory = DIR/spool
+log_file_path = DIR/spool/log/%slog
+gecos_pattern = ""
+gecos_name = CALLER_NAME
+
+# ----- Main settings -----
+
+qualify_domain = test.ex
+queue_run_in_order
+log_selector = +received_recipients
+
+acl_smtp_rcpt = accept ${if eq {SERVER}{server} {delay = 2s}}
+
+# ----- Routers -----
+
+begin routers
+
+server:
+  condition = ${if eq {SERVER}{server} {yes}{no}}
+  driver = redirect
+  data = :blackhole:
+
+rmt_client:
+  local_parts =        a:b:c
+  driver =     manualroute
+  route_list = * 127.0.0.1
+  self =       send
+  transport =  smtp
+
+lcl_client:
+  local_parts = x:y:z
+  driver =     accept
+  transport =  pipe
+
+# ----- Transports -----
+
+begin transports
+
+smtp:
+  driver =     smtp
+  port =       PORT_D
+  max_rcpt =   1
+  connection_max_messages = 1
+  max_parallel = 2
+
+pipe:
+  driver =     pipe
+  command =    "sleep 2; cat > /dev/null"
+  use_shell =  true
+  max_parallel = 1
+
+# ----- Retry -----
+
+
+begin retry
+
+* * F,1h,10m
+
+
+# End
index 6e08260..5ca2885 100644 (file)
@@ -3,3 +3,7 @@
 1999-03-02 09:44:33 10HmaX-0005vi-00 == b@test.ex R=all T=smtp defer (-53): connection limit reached for all hosts
 1999-03-02 09:44:33 10HmaX-0005vi-00 => a@test.ex R=all T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK"
 1999-03-02 09:44:33 End queue run: pid=pppp
+1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmaX-0005vi-00 => b@test.ex R=all T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK"
+1999-03-02 09:44:33 10HmaX-0005vi-00 Completed
+1999-03-02 09:44:33 End queue run: pid=pppp
diff --git a/test/log/0611 b/test/log/0611
new file mode 100644 (file)
index 0000000..39c10bc
--- /dev/null
@@ -0,0 +1,27 @@
+1999-03-02 09:44:33 10HmaX-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for a b c
+1999-03-02 09:44:33 exim x.yz daemon started: pid=pppp, no queue runs, listening for SMTP on port 1225
+1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmaY-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for a@test.ex
+1999-03-02 09:44:33 10HmaY-0005vi-00 => :blackhole: <a@test.ex> R=server
+1999-03-02 09:44:33 10HmaY-0005vi-00 Completed
+1999-03-02 09:44:33 10HmaX-0005vi-00 => a@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmaY-0005vi-00"
+1999-03-02 09:44:33 10HmaZ-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for b@test.ex
+1999-03-02 09:44:33 10HmaZ-0005vi-00 => :blackhole: <b@test.ex> R=server
+1999-03-02 09:44:33 10HmaZ-0005vi-00 Completed
+1999-03-02 09:44:33 10HmaX-0005vi-00 => b@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmaZ-0005vi-00"
+1999-03-02 09:44:33 End queue run: pid=pppp
+1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmbA-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for c@test.ex
+1999-03-02 09:44:33 10HmbA-0005vi-00 => :blackhole: <c@test.ex> R=server
+1999-03-02 09:44:33 10HmbA-0005vi-00 Completed
+1999-03-02 09:44:33 10HmaX-0005vi-00 => c@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmbA-0005vi-00"
+1999-03-02 09:44:33 10HmaX-0005vi-00 Completed
+1999-03-02 09:44:33 End queue run: pid=pppp
+1999-03-02 09:44:33 10HmbB-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for y
+1999-03-02 09:44:33 10HmbC-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for z
+1999-03-02 09:44:33 10HmbB-0005vi-00 => y <y@test.ex> R=lcl_client T=pipe
+1999-03-02 09:44:33 10HmbB-0005vi-00 Completed
+1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmbC-0005vi-00 => z <z@test.ex> R=lcl_client T=pipe
+1999-03-02 09:44:33 10HmbC-0005vi-00 Completed
+1999-03-02 09:44:33 End queue run: pid=pppp
index 7b867ce..ed381da 100644 (file)
@@ -1,9 +1,12 @@
 # serialize_hosts
 need_ipv4
 #
+# preload the spool
 exim -odq a b 
 .
 ****
+#
+# a slow server as a test target
 server PORT_S
 220 ESMTP
 EHLO
@@ -21,6 +24,32 @@ DATA
 QUIT
 250 OK
 ****
+#
+# First message should go; second does not wait for 1st complete
+# on same conn due to connection_max_messages, then is deferred
+# as second transport run aborted by serialize_hosts.
+exim -q
+****
+#
+# a server as a test target
+server PORT_S
+220 ESMTP
+EHLO
+250-OK
+250 HELP
+MAIL FROM:
+250 Sender OK
+RCPT TO:
+250 Recipient OK
+DATA
+354 Send data
+.
+250 OK
+QUIT
+250 OK
+****
+#
+# Remaining message on queue should go immediately; no delay
+# associated with retry rules
 exim -q
 ****
-no_msglog_check
diff --git a/test/scripts/0000-Basic/0611 b/test/scripts/0000-Basic/0611
new file mode 100644 (file)
index 0000000..c352841
--- /dev/null
@@ -0,0 +1,43 @@
+# max_parallel on transport
+need_ipv4
+#
+# Remote transport:
+# preload the spool
+exim -odq a b c
+.
+****
+#
+# a slow server as a test target
+exim -DSERVER=server -bd -oX PORT_D
+****
+#
+# First and second messages should go, as separate conns due to
+# connection_max_messages, third is deferred
+# as third transport run denied by max_parallel
+exim -q
+****
+#
+#
+# Remaining message on queue should go immediately; no delay
+# associated with retry rules
+exim -q
+****
+killdaemon
+#
+########
+#
+#
+# Local transport:
+# Only one message should go as the transport takes a long
+# time and we set max_parallel=1 to serialize it
+exim y
+****
+exim z
+****
+#
+#
+sleep 3
+#
+# Remaining message on queue should go immediately; no delay
+# associated with retry rules
+exim -q
index 8f36a02..3eab9df 100644 (file)
@@ -25,3 +25,27 @@ Date: Tue, 2 Mar 1999 09:44:33 +0000
 QUIT
 250 OK
 End of script
+Listening on port 1224 ... 
+Connection request from [127.0.0.1]
+220 ESMTP
+EHLO myhost.test.ex
+250-OK
+250 HELP
+MAIL FROM:<CALLER@test.ex>
+250 Sender OK
+RCPT TO:<b@test.ex>
+250 Recipient OK
+DATA
+354 Send data
+Received: from CALLER by myhost.test.ex with local (Exim x.yz)
+       (envelope-from <CALLER@test.ex>)
+       id 10HmaX-0005vi-00; Tue, 2 Mar 1999 09:44:33 +0000
+Message-Id: <E10HmaX-0005vi-00@myhost.test.ex>
+From: CALLER_NAME <CALLER@test.ex>
+Date: Tue, 2 Mar 1999 09:44:33 +0000
+
+.
+250 OK
+QUIT
+250 OK
+End of script
index a80e34c..ca6f90c 100644 (file)
@@ -52,6 +52,7 @@ headers_remove =
 headers_rewrite = 
 home_directory = 
 no_initgroups
+max_parallel = 
 message_size_limit = 
 no_rcpt_include_affixes
 retry_use_local_part
@@ -106,6 +107,7 @@ headers_remove =
 headers_rewrite = 
 home_directory = 
 no_initgroups
+max_parallel = 
 message_size_limit = 
 no_rcpt_include_affixes
 retry_use_local_part