Added timeout to stream connection to prevent deadlock on disconnect()
authorJosh Roesslein <jroesslein@gmail.com>
Sat, 8 Aug 2009 07:03:00 +0000 (02:03 -0500)
committerJosh Roesslein <jroesslein@gmail.com>
Sat, 8 Aug 2009 07:03:00 +0000 (02:03 -0500)
tweepy/streaming.py

index b2c849dbbbffd213ccc32c60492ea33c10c0bd49..fd2b1118e6519bb406788ef50003d9cd87e6a133 100644 (file)
@@ -3,7 +3,9 @@
 # See LICENSE
 
 import httplib
+from socket import timeout
 from threading import Thread
+from time import sleep
 
 from . auth import BasicAuthHandler
 from . parsers import parse_status
@@ -17,20 +19,39 @@ except ImportError:
 
 class Stream(object):
 
-  def __init__(self, username, password, callback, host='stream.twitter.com', buffer_size=1500):
+  def __init__(self, username, password, callback, host='stream.twitter.com', timeout=2.0, buffer_size=1500):
     self.host = host
     self.auth = BasicAuthHandler(username, password)
     self.running = False
+    self.timeout = timeout
     self.buffer_size = buffer_size
     self.callback = callback
+    self.api = API()
 
   def _run(self):
-    api = API()
-    conn = httplib.HTTPConnection(self.host, timeout=5)
+    # setup
     headers = {}
     self.auth.apply_auth(None, None, headers, None)
-    conn.request('POST', self.url, headers=headers)
-    resp = conn.getresponse()
+
+    # enter loop
+    while self.running:
+      try:
+        conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
+        conn.request('POST', self.url, headers=headers)
+        resp = conn.getresponse()
+        if resp.status != 200:
+          # TODO: better handle failures
+          sleep(5.0)
+          continue
+        self._read_loop(resp)
+      except timeout:
+        conn.close()
+        continue
+
+    # cleanup
+    conn.close()
+
+  def _read_loop(self, resp):
     data = ''
     while self.running:
       if resp.isclosed():
@@ -38,7 +59,7 @@ class Stream(object):
 
       # read length
       length = ''
-      while resp.isclosed() is False:
+      while True:
         c = resp.read(1)
         if c == '\n':
           break
@@ -54,15 +75,12 @@ class Stream(object):
 
       # turn json data into status object
       if 'in_reply_to_status_id' in data:
-        status = parse_status(data, api)
+        status = parse_status(data, self.api)
         self.callback(status)
 
       # TODO: we should probably also parse delete/track messages
       # and pass to a callback
 
-    conn.close()
-    self.running = False
-
   def spritzer(self):
     if self.running:
       raise TweepError('Stream object already connected!')