When trying spooled messages, account for the local-interface in grouping for a conne...
[exim.git] / src / src / transport.c
index fea9146469b1632d344934fe40d12f3c8f870edf..7b3d4690808dadde3b71faa6b6777eada5dd5859 100644 (file)
@@ -1625,13 +1625,24 @@ Arguments:
                        as set by the caller transport
   new_message_id     set to the message id of a waiting message
   more               set TRUE if there are yet more messages waiting
+  oicf_func          function to call to validate if it is ok to send
+                     to this message_id from the current instance.
+  oicf_data          opaque data for oicf_func
 
 Returns:             TRUE if new_message_id set; FALSE otherwise
 */
 
+typedef struct msgq_s msgq_t;
+
+typedef struct msgq_s
+{
+    uschar  message_id [MESSAGE_ID_LENGTH + 1];
+    BOOL    bKeep;
+} msgq_t;
+
 BOOL
 transport_check_waiting(const uschar *transport_name, const uschar *hostname,
-  int local_message_max, uschar *new_message_id, BOOL *more)
+  int local_message_max, uschar *new_message_id, BOOL *more, oicf oicf_func, void *oicf_data)
 {
 dbdata_wait *host_record;
 int host_length, path_len;
@@ -1639,6 +1650,16 @@ open_db dbblock;
 open_db *dbm_file;
 uschar buffer[256];
 
+msgq_t      *msgq = NULL;
+int         msgq_count = 0;
+int         msgq_actual = 0;
+int         i;
+BOOL        bFound = FALSE;
+uschar      spool_dir [PATH_MAX];
+uschar      spool_file [PATH_MAX];
+struct stat statbuf;
+BOOL        bContinuation = FALSE;
+
 *more = FALSE;
 
 DEBUG(D_transport)
@@ -1691,58 +1712,107 @@ until one is found for which a spool file actually exists. If the record gets
 emptied, delete it and continue with any continuation records that may exist.
 */
 
-host_length = host_record->count * MESSAGE_ID_LENGTH;
+/* For Bug 1141, I refactored this major portion of the routine, it is risky
+but the 1 off will remain without it.  This code now allows me to SKIP over
+a message I do not want to send out on this run.  */
 
-/* Loop to handle continuation host records in the database */
+sprintf(CS spool_dir, "%s/input/", spool_directory);
 
-for (;;)
+host_length = host_record->count * MESSAGE_ID_LENGTH;
+
+while (1)
   {
-  BOOL found = FALSE;
+  /* create an array to read entire message queue into memory for processing  */
 
-  sprintf(CS buffer, "%s/input/", spool_directory);
-  path_len = Ustrlen(buffer);
+  msgq = (msgq_t*) malloc(sizeof(msgq_t) * host_record->count);
+  msgq_count = host_record->count;
+  msgq_actual = msgq_count;
 
-  for (host_length -= MESSAGE_ID_LENGTH; host_length >= 0;
-       host_length -= MESSAGE_ID_LENGTH)
+  for (i = 0; i < host_record->count; ++i)
     {
-    struct stat statbuf;
-    Ustrncpy(new_message_id, host_record->text + host_length,
+    msgq[i].bKeep = TRUE;
+
+    Ustrncpy(msgq[i].message_id, host_record->text + (i * MESSAGE_ID_LENGTH),
       MESSAGE_ID_LENGTH);
-    new_message_id[MESSAGE_ID_LENGTH] = 0;
+    msgq[i].message_id[MESSAGE_ID_LENGTH] = 0;
+    }
+
+  /* first thing remove current message id if it exists */
 
+  for (i = 0; i < msgq_count; ++i)
+    if (Ustrcmp(msgq[i].message_id, message_id) == 0)
+      {
+      msgq[i].bKeep = FALSE;
+      break;
+      }
+
+  /* now find the next acceptable message_id */
+
+  bFound = FALSE;
+
+  for (i = msgq_count - 1; i >= 0; --i) if (msgq[i].bKeep)
+    {
     if (split_spool_directory)
-      sprintf(CS(buffer + path_len), "%c/%s-D", new_message_id[5], new_message_id);
+       sprintf(CS spool_file, "%s%c/%s-D",
+                     spool_dir, new_message_id[5], msgq[i].message_id);
     else
-      sprintf(CS(buffer + path_len), "%s-D", new_message_id);
+       sprintf(CS spool_file, "%s%s-D", spool_dir, msgq[i].message_id);
 
-    /* The listed message may be the one we are currently processing. If
-    so, we want to remove it from the list without doing anything else.
-    If not, do a stat to see if it is an existing message. If it is, break
-    the loop to handle it. No need to bother about locks; as this is all
-    "hint" processing, it won't matter if it doesn't exist by the time exim
-    actually tries to deliver it. */
-
-    if (Ustrcmp(new_message_id, message_id) != 0 &&
-        Ustat(buffer, &statbuf) == 0)
+    if (Ustat(spool_file, &statbuf) != 0)
+      msgq[i].bKeep = FALSE;
+    else if (!oicf_func || oicf_func(msgq[i].message_id, oicf_data))
       {
-      found = TRUE;
+      Ustrcpy(new_message_id, msgq[i].message_id);
+      msgq[i].bKeep = FALSE;
+      bFound = TRUE;
       break;
       }
     }
 
-  /* If we have removed all the message ids from the record delete the record.
-  If there is a continuation record, fetch it and remove it from the file,
-  as it will be rewritten as the main record. Repeat in the case of an
-  empty continuation. */
+  /* re-count */
+  for (msgq_actual = 0, i = 0; i < msgq_count; ++i)
+    if (msgq[i].bKeep)
+      msgq_actual++;
+
+  /* reassemble the host record, based on removed message ids, from in
+   * memory queue.
+   */
+
+  if (msgq_actual <= 0)
+    {
+    host_length = 0;
+    host_record->count = 0;
+    }
+  else
+    {
+    host_length = msgq_actual * MESSAGE_ID_LENGTH;
+    host_record->count = msgq_actual;
+
+    if (msgq_actual < msgq_count)
+      {
+      int new_count;
+      for (new_count = 0, i = 0; i < msgq_count; ++i)
+       if (msgq[i].bKeep)
+         Ustrncpy(&host_record->text[new_count++ * MESSAGE_ID_LENGTH],
+           msgq[i].message_id, MESSAGE_ID_LENGTH);
+
+      host_record->text[new_count * MESSAGE_ID_LENGTH] = 0;
+      }
+    }
+
+/* Jeremy: check for a continuation record, this code I do not know how to
+test but the code should work */
+
+  bContinuation = FALSE;
 
   while (host_length <= 0)
     {
     int i;
-    dbdata_wait *newr = NULL;
+    dbdata_wait * newr = NULL;
 
     /* Search for a continuation */
 
-    for (i = host_record->sequence - 1; i >= 0 && newr == NULL; i--)
+    for (i = host_record->sequence - 1; i >= 0 && !newr; i--)
       {
       sprintf(CS buffer, "%.200s:%d", hostname, i);
       newr = dbfn_read(dbm_file, buffer);
@@ -1750,7 +1820,7 @@ for (;;)
 
     /* If no continuation, delete the current and break the loop */
 
-    if (newr == NULL)
+    if (!newr)
       {
       dbfn_delete(dbm_file, hostname);
       break;
@@ -1761,11 +1831,12 @@ for (;;)
     dbfn_delete(dbm_file, buffer);
     host_record = newr;
     host_length = host_record->count * MESSAGE_ID_LENGTH;
-    }
 
-  /* If we found an existing message, break the continuation loop. */
+    bContinuation = TRUE;
+    }
 
-  if (found) break;
+  if (bFound)
+    break;
 
   /* If host_length <= 0 we have emptied a record and not found a good message,
   and there are no continuation records. Otherwise there is a continuation
@@ -1777,6 +1848,26 @@ for (;;)
     DEBUG(D_transport) debug_printf("waiting messages already delivered\n");
     return FALSE;
     }
+
+  /* we were not able to find an acceptable message, nor was there a
+   * continuation record.  So bug out, outer logic will clean this up.
+   */
+
+  if (!bContinuation)
+    {
+    Ustrcpy (new_message_id, message_id); 
+    dbfn_close(dbm_file);
+    return FALSE;
+    }
+  }            /* we need to process a continuation record */
+
+/* clean up in memory queue */
+if (msgq)
+  {
+  free (msgq);
+  msgq = NULL;
+  msgq_count = 0;
+  msgq_actual = 0;
   }
 
 /* Control gets here when an existing message has been encountered; its
@@ -1786,7 +1877,19 @@ record if required, close the database, and return TRUE. */
 
 if (host_length > 0)
   {
+  uschar  msg [MESSAGE_ID_LENGTH + 1];
+  int i;
+
   host_record->count = host_length/MESSAGE_ID_LENGTH;
+
+  /* rebuild the host_record->text */
+
+  for (i = 0; i < host_record->count; ++i)
+    {
+    Ustrncpy(msg, host_record->text + (i*MESSAGE_ID_LENGTH), MESSAGE_ID_LENGTH);
+    msg[MESSAGE_ID_LENGTH] = 0;
+    }
+
   dbfn_write(dbm_file, hostname, host_record, (int)sizeof(dbdata_wait) + host_length);
   *more = TRUE;
   }
@@ -1795,8 +1898,6 @@ dbfn_close(dbm_file);
 return TRUE;
 }
 
-
-
 /*************************************************
 *    Deliver waiting message down same socket    *
 *************************************************/