self.proxies["https"] = proxy
def _run(self, endpoint, params=None, body=None):
- # Authenticate
if self.session is None:
self.session = requests.Session()
url = f"https://stream.twitter.com/{STREAM_VERSION}/{endpoint}.json"
- # Connect and process the stream
error_counter = 0
-
# https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting
network_error_wait = network_error_wait_step = 0.25
network_error_wait_max = 16
while self.running:
if self.retry_count is not None:
if error_counter > self.retry_count:
- # quit if error count greater than retry count
break
try:
auth = self.auth.apply_auth()