log = logging.getLogger(__name__)
-class StreamListener:
-
- def on_connect(self):
- """Called once connected to streaming server.
-
- This will be invoked once a successful response
- is received from the server. Allows the listener
- to perform some work prior to entering the read loop.
- """
- log.info("Stream connected")
-
- def on_connection_error(self):
- """Called when stream connection errors or times out"""
- log.error("Stream connection has errored or timed out")
-
- def on_exception(self, exception):
- """Called when an unhandled exception occurs."""
- log.exception("Stream encountered an exception")
-
- def on_keep_alive(self):
- """Called when a keep-alive arrived"""
- log.debug("Received keep-alive signal")
-
- def on_request_error(self, status_code):
- """Called when a non-200 status code is returned"""
- log.error("Stream encountered HTTP error: %d", status_code)
- return False
-
- def on_data(self, raw_data):
- """Called when raw data is received from connection.
-
- Override this method if you wish to manually handle
- the stream data. Return False to stop stream and close connection.
- """
- data = json.loads(raw_data)
-
- if 'in_reply_to_status_id' in data:
- status = Status.parse(None, data)
- return self.on_status(status)
- if 'delete' in data:
- delete = data['delete']['status']
- return self.on_delete(delete['id'], delete['user_id'])
- if 'disconnect' in data:
- return self.on_disconnect(data['disconnect'])
- if 'limit' in data:
- return self.on_limit(data['limit']['track'])
- if 'scrub_geo' in data:
- return self.on_scrub_geo(data['scrub_geo'])
- if 'status_withheld' in data:
- return self.on_status_withheld(data['status_withheld'])
- if 'user_withheld' in data:
- return self.on_user_withheld(data['user_withheld'])
- if 'warning' in data:
- return self.on_warning(data['warning'])
-
- log.error("Unknown message type: %s", raw_data)
-
- def on_status(self, status):
- """Called when a new status arrives"""
- log.debug("Received status: %d", status.id)
-
- def on_delete(self, status_id, user_id):
- """Called when a delete notice arrives for a status"""
- log.debug("Received status deletion notice: %d", status_id)
-
- def on_disconnect(self, notice):
- """Called when twitter sends a disconnect notice
-
- Disconnect codes are listed here:
- https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
- """
- log.warning("Received disconnect message: %s", notice)
-
- def on_limit(self, track):
- """Called when a limitation notice arrives"""
- log.debug("Received limit notice: %d", track)
-
- def on_scrub_geo(self, notice):
- """Called when a location deletion notice arrives"""
- log.debug("Received location deletion notice: %s", notice)
-
- def on_status_withheld(self, notice):
- """Called when a status withheld content notice arrives"""
- log.debug("Received status withheld content notice: %s", notice)
-
- def on_user_withheld(self, notice):
- """Called when a user withheld content notice arrives"""
- log.debug("Received user withheld content notice: %s", notice)
-
- def on_warning(self, notice):
- """Called when a disconnection warning message arrives"""
- log.warning("Received stall warning: %s", notice)
-
-
class Stream:
def __init__(self, consumer_key, consumer_secret, access_token,
- access_token_secret, listener, *, chunk_size=512,
- daemon=False, max_retries=inf, proxy=None, verify=True):
+ access_token_secret, *, chunk_size=512, daemon=False,
+ max_retries=inf, proxy=None, verify=True):
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
- self.listener = listener
# The default socket.read size. Default to less than half the size of
# a tweet so that it reads tweets with the minimal latency of 2 reads
# per tweet. Values higher than ~1kb will increase latency by waiting
verify=self.verify, proxies=self.proxies
) as resp:
if resp.status_code != 200:
- if self.listener.on_request_error(resp.status_code) is False:
+ if self.on_request_error(resp.status_code) is False:
break
error_count += 1
if resp.status_code == 420:
error_count = 0
http_error_wait = http_error_wait_start
network_error_wait = network_error_wait_step
- self.listener.on_connect()
+ self.on_connect()
for line in resp.iter_lines(
chunk_size=self.chunk_size
if not self.running:
break
if not line:
- self.listener.on_keep_alive()
- elif self.listener.on_data(line) is False:
+ self.on_keep_alive()
+ elif self.on_data(line) is False:
self.running = False
break
if isinstance(exc, ssl.SSLError):
if not (exc.args and 'timed out' in str(exc.args[0])):
raise
- if self.listener.on_connection_error() is False:
+ if self.on_connection_error() is False:
break
if self.running is False:
break
network_error_wait_max
)
except Exception as exc:
- self.listener.on_exception(exc)
+ self.on_exception(exc)
raise
finally:
self.session.close()
def on_closed(self, resp):
""" Called when the response has been closed by Twitter """
pass
+
+ def on_connect(self):
+ """Called once connected to streaming server.
+
+ This will be invoked once a successful response
+ is received from the server.
+ """
+ log.info("Stream connected")
+
+ def on_connection_error(self):
+ """Called when stream connection errors or times out"""
+ log.error("Stream connection has errored or timed out")
+
+ def on_exception(self, exception):
+ """Called when an unhandled exception occurs."""
+ log.exception("Stream encountered an exception")
+
+ def on_keep_alive(self):
+ """Called when a keep-alive arrived"""
+ log.debug("Received keep-alive signal")
+
+ def on_request_error(self, status_code):
+ """Called when a non-200 status code is returned"""
+ log.error("Stream encountered HTTP error: %d", status_code)
+ return False
+
+ def on_data(self, raw_data):
+ """Called when raw data is received from connection.
+
+ Override this method if you wish to manually handle
+ the stream data. Return False to stop stream and close connection.
+ """
+ data = json.loads(raw_data)
+
+ if 'in_reply_to_status_id' in data:
+ status = Status.parse(None, data)
+ return self.on_status(status)
+ if 'delete' in data:
+ delete = data['delete']['status']
+ return self.on_delete(delete['id'], delete['user_id'])
+ if 'disconnect' in data:
+ return self.on_disconnect(data['disconnect'])
+ if 'limit' in data:
+ return self.on_limit(data['limit']['track'])
+ if 'scrub_geo' in data:
+ return self.on_scrub_geo(data['scrub_geo'])
+ if 'status_withheld' in data:
+ return self.on_status_withheld(data['status_withheld'])
+ if 'user_withheld' in data:
+ return self.on_user_withheld(data['user_withheld'])
+ if 'warning' in data:
+ return self.on_warning(data['warning'])
+
+ log.error("Unknown message type: %s", raw_data)
+
+ def on_status(self, status):
+ """Called when a new status arrives"""
+ log.debug("Received status: %d", status.id)
+
+ def on_delete(self, status_id, user_id):
+ """Called when a delete notice arrives for a status"""
+ log.debug("Received status deletion notice: %d", status_id)
+
+ def on_disconnect(self, notice):
+ """Called when twitter sends a disconnect notice
+
+ Disconnect codes are listed here:
+ https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
+ """
+ log.warning("Received disconnect message: %s", notice)
+
+ def on_limit(self, track):
+ """Called when a limitation notice arrives"""
+ log.debug("Received limit notice: %d", track)
+
+ def on_scrub_geo(self, notice):
+ """Called when a location deletion notice arrives"""
+ log.debug("Received location deletion notice: %s", notice)
+
+ def on_status_withheld(self, notice):
+ """Called when a status withheld content notice arrives"""
+ log.debug("Received status withheld content notice: %s", notice)
+
+ def on_user_withheld(self, notice):
+ """Called when a user withheld content notice arrives"""
+ log.debug("Received user withheld content notice: %s", notice)
+
+ def on_warning(self, notice):
+ """Called when a disconnection warning message arrives"""
+ log.warning("Received stall warning: %s", notice)