def _connect(self, method, endpoint, params=None, headers=None, body=None):
self.running = True
- if self.session is None:
- self.session = requests.Session()
- self.session.headers["User-Agent"] = self.user_agent
-
- url = f"https://stream.twitter.com/1.1/{endpoint}.json"
-
- auth = OAuth1(self.consumer_key, self.consumer_secret,
- self.access_token, self.access_token_secret)
-
error_count = 0
# https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting
stall_timeout = 90
http_error_wait_max = 320
http_420_error_wait_start = 60
+ auth = OAuth1(self.consumer_key, self.consumer_secret,
+ self.access_token, self.access_token_secret)
+
+ if self.session is None:
+ self.session = requests.Session()
+ self.session.headers["User-Agent"] = self.user_agent
+
+ url = f"https://stream.twitter.com/1.1/{endpoint}.json"
+
try:
while self.running and error_count <= self.max_retries:
try:
timeout=stall_timeout, stream=True, auth=auth,
verify=self.verify, proxies=self.proxies
) as resp:
- if resp.status_code != 200:
- self.on_request_error(resp.status_code)
- if not self.running:
- break
- error_count += 1
- if resp.status_code == 420:
- http_error_wait = max(
- http_420_error_wait_start, http_error_wait
- )
- sleep(http_error_wait)
- http_error_wait = min(http_error_wait * 2,
- http_error_wait_max)
- else:
+ if resp.status_code == 200:
error_count = 0
http_error_wait = http_error_wait_start
network_error_wait = network_error_wait_step
+
self.on_connect()
if not self.running:
break
if resp.raw.closed:
self.on_closed(resp)
+ else:
+ self.on_request_error(resp.status_code)
+ if not self.running:
+ break
+
+ error_count += 1
+
+ if resp.status_code == 420:
+ if http_error_wait < http_420_error_wait_start:
+ http_error_wait = http_420_error_wait_start
+
+ sleep(http_error_wait)
+
+ http_error_wait *= 2
+ if http_error_wait > http_error_wait_max:
+ http_error_wait = http_error_wait_max
except (requests.ConnectionError, requests.Timeout,
ssl.SSLError, urllib3.exceptions.ReadTimeoutError,
urllib3.exceptions.ProtocolError) as exc:
if isinstance(exc, ssl.SSLError):
if not (exc.args and "timed out" in str(exc.args[0])):
raise
+
self.on_connection_error()
if not self.running:
break
+
sleep(network_error_wait)
- network_error_wait = min(
- network_error_wait + network_error_wait_step,
- network_error_wait_max
- )
+
+ network_error_wait += network_error_wait_step
+ if network_error_wait > network_error_wait_max:
+ network_error_wait = network_error_wait_max
except Exception as exc:
self.on_exception(exc)
finally: