Log deferred deliveries for transport max_parallel
authorJeremy Harris <jgh146exb@wizmail.org>
Tue, 13 Oct 2015 10:43:34 +0000 (11:43 +0100)
committerJeremy Harris <jgh146exb@wizmail.org>
Tue, 13 Oct 2015 10:43:34 +0000 (11:43 +0100)
src/src/deliver.c
src/src/macros.h
test/log/0611

index 6a3df89..87f9cfb 100644 (file)
@@ -2315,17 +2315,47 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message)
 
 
 
-/* Put the chain of addrs on the defer list.  Retry will happen
-on the next queue run, earlier if triggered by a new message.
-Loop for the next set of addresses. */
+/* Check transport for the given concurrency limit.  Return TRUE if over
+the limit (or an expansion failure), else FALSE and if there was a limit,
+the key for the hints database used for the concurrency count. */
 
-static void
-deferlist_chain(address_item * addr)
+static BOOL
+tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key)
 {
-address_item * next;
-for (next = addr; next->next; next = next->next) ;
-next->next = addr_defer;
-addr_defer = addr;
+unsigned max_parallel;
+
+if (!tp->max_parallel) return FALSE;
+
+max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE);
+if (expand_string_message)
+  {
+  log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+       "in %s transport (%s): %s", tp->name, addr->address,
+       expand_string_message);
+  return TRUE;
+  }
+
+if (max_parallel > 0)
+  {
+  uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name);
+  if (!enq_start(serialize_key, max_parallel))
+    {
+    address_item * next;
+    DEBUG(D_transport)
+      debug_printf("skipping tpt %s because concurrency limit %u reached\n",
+                 tp->name, max_parallel);
+    do
+      {
+      next = addr->next;
+      addr->message = US"concurrency limit reached for transport";
+      addr->basic_errno = ERRNO_TRETRY;
+      post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
+      } while ((addr = next));
+    return TRUE;
+    }
+  *key = serialize_key;
+  }
+return FALSE;
 }
 
 
@@ -2632,33 +2662,18 @@ while (addr_local)
   We use a hints DB entry, incremented here and decremented after
   the transport (and any shadow transport) completes. */
 
-  if (tp->max_parallel)
+  if (tpt_parallel_check(tp, addr, &serialize_key))
     {
-    int_eximarith_t max_parallel =
-      expand_string_integer(tp->max_parallel, TRUE);
     if (expand_string_message)
       {
       logflags |= LOG_PANIC;
-      log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
-            "in %s transport (%s): %s", tp->name, addr->address,
-            expand_string_message);
-      for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+      do
+       {
+       addr = addr->next;
        post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
-      continue;
-      }
-    if (  max_parallel > 0
-       && !enq_start(
-           serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
-           (unsigned) max_parallel)
-       )
-      {
-      DEBUG(D_transport)
-       debug_printf("skipping tpt %s because parallelism limit %u reached\n",
-                   tp->name, (unsigned) max_parallel);
-
-      deferlist_chain(addr);
-      continue;
+       } while ((addr = addr2));
       }
+    continue;                  /* Loop for the next set of addresses. */
     }
 
 
@@ -4091,29 +4106,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   The hints DB entry is decremented in par_reduce(), when we reap the
   transport process. */
 
-  if (tp->max_parallel)
-    {
-    int_eximarith_t max_parallel =
-      expand_string_integer(tp->max_parallel, TRUE);
-    if (expand_string_message)
-      {
-      panicmsg = expand_string_message;
+  if (tpt_parallel_check(tp, addr, &serialize_key))
+    if ((panicmsg = expand_string_message))
       goto panic_continue;
-      }
-    if (  max_parallel > 0
-       && !enq_start(
-           serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
-           (unsigned) max_parallel)
-       )
-      {
-      DEBUG(D_transport)
-       debug_printf("skipping tpt %s because parallelism limit %u reached\n",
-                   tp->name, (unsigned) max_parallel);
-
-      deferlist_chain(addr);
-      continue;
-      }
-    }
+    else
+      continue;                        /* Loop for the next set of addresses. */
 
   /* Set up the expansion variables for this set of addresses */
 
@@ -4207,7 +4204,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
         }
 
       else
-       deferlist_chain(addr);
+       {
+       while (next->next) next = next->next;
+       next->next = addr_defer;
+       addr_defer = addr;
+       }
 
       continue;
       }
@@ -6674,7 +6675,6 @@ if (addr_local)
 so just queue them all. */
 
 if (queue_run_local)
-  {
   while (addr_remote)
     {
     address_item *addr = addr_remote;
@@ -6684,7 +6684,6 @@ if (queue_run_local)
     addr->message = US"remote deliveries suppressed";
     (void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
     }
-  }
 
 /* Handle remote deliveries */
 
index 0ce24f8..5a35a9b 100644 (file)
@@ -545,6 +545,7 @@ to conflict with system errno values. */
 #define ERRNO_HRETRY         (-53)   /* Not time for any remote host */
 #define ERRNO_LOCAL_ONLY     (-54)   /* Local-only delivery */
 #define ERRNO_QUEUE_DOMAIN   (-55)   /* Domain in queue_domains */
+#define ERRNO_TRETRY         (-56)   /* Transport concurrency limit */
 
 /* Special actions to take after failure or deferment. */
 
index 39c10bc..f158a20 100644 (file)
@@ -1,6 +1,7 @@
 1999-03-02 09:44:33 10HmaX-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for a b c
 1999-03-02 09:44:33 exim x.yz daemon started: pid=pppp, no queue runs, listening for SMTP on port 1225
 1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmaX-0005vi-00 == c@test.ex R=rmt_client T=smtp defer (-56): concurrency limit reached for transport
 1999-03-02 09:44:33 10HmaY-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for a@test.ex
 1999-03-02 09:44:33 10HmaY-0005vi-00 => :blackhole: <a@test.ex> R=server
 1999-03-02 09:44:33 10HmaY-0005vi-00 Completed
@@ -19,6 +20,7 @@
 1999-03-02 09:44:33 End queue run: pid=pppp
 1999-03-02 09:44:33 10HmbB-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for y
 1999-03-02 09:44:33 10HmbC-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for z
+1999-03-02 09:44:33 10HmbC-0005vi-00 == z@test.ex R=lcl_client T=pipe defer (-56): concurrency limit reached for transport
 1999-03-02 09:44:33 10HmbB-0005vi-00 => y <y@test.ex> R=lcl_client T=pipe
 1999-03-02 09:44:33 10HmbB-0005vi-00 Completed
 1999-03-02 09:44:33 Start queue run: pid=pppp