From c76aabfb48fb0d6844f9b27b68a98c41e3d4e2ed Mon Sep 17 00:00:00 2001 From: Harmon Date: Tue, 26 Jan 2021 09:13:37 -0600 Subject: [PATCH] Improve clarity and coherence of Stream._connect --- tweepy/streaming.py | 59 +++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/tweepy/streaming.py b/tweepy/streaming.py index a80bb4a..834953e 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -55,15 +55,6 @@ class Stream: def _connect(self, method, endpoint, params=None, headers=None, body=None): self.running = True - if self.session is None: - self.session = requests.Session() - self.session.headers["User-Agent"] = self.user_agent - - url = f"https://stream.twitter.com/1.1/{endpoint}.json" - - auth = OAuth1(self.consumer_key, self.consumer_secret, - self.access_token, self.access_token_secret) - error_count = 0 # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting stall_timeout = 90 @@ -73,6 +64,15 @@ class Stream: http_error_wait_max = 320 http_420_error_wait_start = 60 + auth = OAuth1(self.consumer_key, self.consumer_secret, + self.access_token, self.access_token_secret) + + if self.session is None: + self.session = requests.Session() + self.session.headers["User-Agent"] = self.user_agent + + url = f"https://stream.twitter.com/1.1/{endpoint}.json" + try: while self.running and error_count <= self.max_retries: try: @@ -81,22 +81,11 @@ class Stream: timeout=stall_timeout, stream=True, auth=auth, verify=self.verify, proxies=self.proxies ) as resp: - if resp.status_code != 200: - self.on_request_error(resp.status_code) - if not self.running: - break - error_count += 1 - if resp.status_code == 420: - http_error_wait = max( - http_420_error_wait_start, http_error_wait - ) - sleep(http_error_wait) - http_error_wait = min(http_error_wait * 2, - http_error_wait_max) - else: + if resp.status_code == 200: error_count = 0 http_error_wait = http_error_wait_start network_error_wait = network_error_wait_step + self.on_connect() if not self.running: break @@ -113,6 +102,22 @@ class Stream: if resp.raw.closed: self.on_closed(resp) + else: + self.on_request_error(resp.status_code) + if not self.running: + break + + error_count += 1 + + if resp.status_code == 420: + if http_error_wait < http_420_error_wait_start: + http_error_wait = http_420_error_wait_start + + sleep(http_error_wait) + + http_error_wait *= 2 + if http_error_wait > http_error_wait_max: + http_error_wait = http_error_wait_max except (requests.ConnectionError, requests.Timeout, ssl.SSLError, urllib3.exceptions.ReadTimeoutError, urllib3.exceptions.ProtocolError) as exc: @@ -122,14 +127,16 @@ class Stream: if isinstance(exc, ssl.SSLError): if not (exc.args and "timed out" in str(exc.args[0])): raise + self.on_connection_error() if not self.running: break + sleep(network_error_wait) - network_error_wait = min( - network_error_wait + network_error_wait_step, - network_error_wait_max - ) + + network_error_wait += network_error_wait_step + if network_error_wait > network_error_wait_max: + network_error_wait = network_error_wait_max except Exception as exc: self.on_exception(exc) finally: -- 2.25.1