From: Josh Roesslein Date: Sat, 8 Aug 2009 07:03:00 +0000 (-0500) Subject: Added timeout to stream connection to prevent deadlock on disconnect() X-Git-Url: https://vcs.fsf.org/?a=commitdiff_plain;h=d4f265a93145e329619633f788cb46dd139387de;p=tweepy.git Added timeout to stream connection to prevent deadlock on disconnect() --- diff --git a/tweepy/streaming.py b/tweepy/streaming.py index b2c849d..fd2b111 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -3,7 +3,9 @@ # See LICENSE import httplib +from socket import timeout from threading import Thread +from time import sleep from . auth import BasicAuthHandler from . parsers import parse_status @@ -17,20 +19,39 @@ except ImportError: class Stream(object): - def __init__(self, username, password, callback, host='stream.twitter.com', buffer_size=1500): + def __init__(self, username, password, callback, host='stream.twitter.com', timeout=2.0, buffer_size=1500): self.host = host self.auth = BasicAuthHandler(username, password) self.running = False + self.timeout = timeout self.buffer_size = buffer_size self.callback = callback + self.api = API() def _run(self): - api = API() - conn = httplib.HTTPConnection(self.host, timeout=5) + # setup headers = {} self.auth.apply_auth(None, None, headers, None) - conn.request('POST', self.url, headers=headers) - resp = conn.getresponse() + + # enter loop + while self.running: + try: + conn = httplib.HTTPConnection(self.host, timeout=self.timeout) + conn.request('POST', self.url, headers=headers) + resp = conn.getresponse() + if resp.status != 200: + # TODO: better handle failures + sleep(5.0) + continue + self._read_loop(resp) + except timeout: + conn.close() + continue + + # cleanup + conn.close() + + def _read_loop(self, resp): data = '' while self.running: if resp.isclosed(): @@ -38,7 +59,7 @@ class Stream(object): # read length length = '' - while resp.isclosed() is False: + while True: c = resp.read(1) if c == '\n': break @@ -54,15 +75,12 @@ class Stream(object): # turn json data into status object if 'in_reply_to_status_id' in data: - status = parse_status(data, api) + status = parse_status(data, self.api) self.callback(status) # TODO: we should probably also parse delete/track messages # and pass to a callback - conn.close() - self.running = False - def spritzer(self): if self.running: raise TweepError('Stream object already connected!')