Merge StreamListener into Stream
authorHarmon <Harmon758@gmail.com>
Sun, 24 Jan 2021 13:43:50 +0000 (07:43 -0600)
committerHarmon <Harmon758@gmail.com>
Sun, 24 Jan 2021 13:43:50 +0000 (07:43 -0600)
tweepy/__init__.py
tweepy/streaming.py

index b7d0a485fad300f8053356977af51457da98e603..3f94ac464f1641a1132855d188336cc29072e619 100644 (file)
@@ -15,7 +15,7 @@ from tweepy.cache import Cache, FileCache, MemoryCache
 from tweepy.cursor import Cursor
 from tweepy.error import RateLimitError, TweepError
 from tweepy.models import DirectMessage, Friendship, ModelFactory, SavedSearch, SearchResults, Status, User
-from tweepy.streaming import Stream, StreamListener
+from tweepy.streaming import Stream
 
 # Global, unauthenticated instance of API
 api = API()
index 48dd2c8a11ecff89961d9746ad9ad2bdb59a4ab4..357300196c58bcb9312f3073f159a0d5fba65bb4 100644 (file)
@@ -21,110 +21,15 @@ from tweepy.models import Status
 log = logging.getLogger(__name__)
 
 
-class StreamListener:
-
-    def on_connect(self):
-        """Called once connected to streaming server.
-
-        This will be invoked once a successful response
-        is received from the server. Allows the listener
-        to perform some work prior to entering the read loop.
-        """
-        log.info("Stream connected")
-
-    def on_connection_error(self):
-        """Called when stream connection errors or times out"""
-        log.error("Stream connection has errored or timed out")
-
-    def on_exception(self, exception):
-        """Called when an unhandled exception occurs."""
-        log.exception("Stream encountered an exception")
-
-    def on_keep_alive(self):
-        """Called when a keep-alive arrived"""
-        log.debug("Received keep-alive signal")
-
-    def on_request_error(self, status_code):
-        """Called when a non-200 status code is returned"""
-        log.error("Stream encountered HTTP error: %d", status_code)
-        return False
-
-    def on_data(self, raw_data):
-        """Called when raw data is received from connection.
-
-        Override this method if you wish to manually handle
-        the stream data. Return False to stop stream and close connection.
-        """
-        data = json.loads(raw_data)
-
-        if 'in_reply_to_status_id' in data:
-            status = Status.parse(None, data)
-            return self.on_status(status)
-        if 'delete' in data:
-            delete = data['delete']['status']
-            return self.on_delete(delete['id'], delete['user_id'])
-        if 'disconnect' in data:
-            return self.on_disconnect(data['disconnect'])
-        if 'limit' in data:
-            return self.on_limit(data['limit']['track'])
-        if 'scrub_geo' in data:
-            return self.on_scrub_geo(data['scrub_geo'])
-        if 'status_withheld' in data:
-            return self.on_status_withheld(data['status_withheld'])
-        if 'user_withheld' in data:
-            return self.on_user_withheld(data['user_withheld'])
-        if 'warning' in data:
-            return self.on_warning(data['warning'])
-
-        log.error("Unknown message type: %s", raw_data)
-
-    def on_status(self, status):
-        """Called when a new status arrives"""
-        log.debug("Received status: %d", status.id)
-
-    def on_delete(self, status_id, user_id):
-        """Called when a delete notice arrives for a status"""
-        log.debug("Received status deletion notice: %d", status_id)
-
-    def on_disconnect(self, notice):
-        """Called when twitter sends a disconnect notice
-
-        Disconnect codes are listed here:
-        https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
-        """
-        log.warning("Received disconnect message: %s", notice)
-
-    def on_limit(self, track):
-        """Called when a limitation notice arrives"""
-        log.debug("Received limit notice: %d", track)
-
-    def on_scrub_geo(self, notice):
-        """Called when a location deletion notice arrives"""
-        log.debug("Received location deletion notice: %s", notice)
-
-    def on_status_withheld(self, notice):
-        """Called when a status withheld content notice arrives"""
-        log.debug("Received status withheld content notice: %s", notice)
-
-    def on_user_withheld(self, notice):
-        """Called when a user withheld content notice arrives"""
-        log.debug("Received user withheld content notice: %s", notice)
-
-    def on_warning(self, notice):
-        """Called when a disconnection warning message arrives"""
-        log.warning("Received stall warning: %s", notice)
-
-
 class Stream:
 
     def __init__(self, consumer_key, consumer_secret, access_token,
-                 access_token_secret, listener, *, chunk_size=512,
-                 daemon=False, max_retries=inf, proxy=None, verify=True):
+                 access_token_secret, *, chunk_size=512, daemon=False,
+                 max_retries=inf, proxy=None, verify=True):
         self.consumer_key = consumer_key
         self.consumer_secret = consumer_secret
         self.access_token = access_token
         self.access_token_secret = access_token_secret
-        self.listener = listener
         # The default socket.read size. Default to less than half the size of
         # a tweet so that it reads tweets with the minimal latency of 2 reads
         # per tweet. Values higher than ~1kb will increase latency by waiting
@@ -169,7 +74,7 @@ class Stream:
                         verify=self.verify, proxies=self.proxies
                     ) as resp:
                         if resp.status_code != 200:
-                            if self.listener.on_request_error(resp.status_code) is False:
+                            if self.on_request_error(resp.status_code) is False:
                                 break
                             error_count += 1
                             if resp.status_code == 420:
@@ -183,7 +88,7 @@ class Stream:
                             error_count = 0
                             http_error_wait = http_error_wait_start
                             network_error_wait = network_error_wait_step
-                            self.listener.on_connect()
+                            self.on_connect()
 
                             for line in resp.iter_lines(
                                 chunk_size=self.chunk_size
@@ -191,8 +96,8 @@ class Stream:
                                 if not self.running:
                                     break
                                 if not line:
-                                    self.listener.on_keep_alive()
-                                elif self.listener.on_data(line) is False:
+                                    self.on_keep_alive()
+                                elif self.on_data(line) is False:
                                     self.running = False
                                     break
 
@@ -207,7 +112,7 @@ class Stream:
                     if isinstance(exc, ssl.SSLError):
                         if not (exc.args and 'timed out' in str(exc.args[0])):
                             raise
-                    if self.listener.on_connection_error() is False:
+                    if self.on_connection_error() is False:
                         break
                     if self.running is False:
                         break
@@ -217,7 +122,7 @@ class Stream:
                         network_error_wait_max
                     )
         except Exception as exc:
-            self.listener.on_exception(exc)
+            self.on_exception(exc)
             raise
         finally:
             self.session.close()
@@ -282,3 +187,93 @@ class Stream:
     def on_closed(self, resp):
         """ Called when the response has been closed by Twitter """
         pass
+
+    def on_connect(self):
+        """Called once connected to streaming server.
+
+        This will be invoked once a successful response
+        is received from the server.
+        """
+        log.info("Stream connected")
+
+    def on_connection_error(self):
+        """Called when stream connection errors or times out"""
+        log.error("Stream connection has errored or timed out")
+
+    def on_exception(self, exception):
+        """Called when an unhandled exception occurs."""
+        log.exception("Stream encountered an exception")
+
+    def on_keep_alive(self):
+        """Called when a keep-alive arrived"""
+        log.debug("Received keep-alive signal")
+
+    def on_request_error(self, status_code):
+        """Called when a non-200 status code is returned"""
+        log.error("Stream encountered HTTP error: %d", status_code)
+        return False
+
+    def on_data(self, raw_data):
+        """Called when raw data is received from connection.
+
+        Override this method if you wish to manually handle
+        the stream data. Return False to stop stream and close connection.
+        """
+        data = json.loads(raw_data)
+
+        if 'in_reply_to_status_id' in data:
+            status = Status.parse(None, data)
+            return self.on_status(status)
+        if 'delete' in data:
+            delete = data['delete']['status']
+            return self.on_delete(delete['id'], delete['user_id'])
+        if 'disconnect' in data:
+            return self.on_disconnect(data['disconnect'])
+        if 'limit' in data:
+            return self.on_limit(data['limit']['track'])
+        if 'scrub_geo' in data:
+            return self.on_scrub_geo(data['scrub_geo'])
+        if 'status_withheld' in data:
+            return self.on_status_withheld(data['status_withheld'])
+        if 'user_withheld' in data:
+            return self.on_user_withheld(data['user_withheld'])
+        if 'warning' in data:
+            return self.on_warning(data['warning'])
+
+        log.error("Unknown message type: %s", raw_data)
+
+    def on_status(self, status):
+        """Called when a new status arrives"""
+        log.debug("Received status: %d", status.id)
+
+    def on_delete(self, status_id, user_id):
+        """Called when a delete notice arrives for a status"""
+        log.debug("Received status deletion notice: %d", status_id)
+
+    def on_disconnect(self, notice):
+        """Called when twitter sends a disconnect notice
+
+        Disconnect codes are listed here:
+        https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/streaming-message-types
+        """
+        log.warning("Received disconnect message: %s", notice)
+
+    def on_limit(self, track):
+        """Called when a limitation notice arrives"""
+        log.debug("Received limit notice: %d", track)
+
+    def on_scrub_geo(self, notice):
+        """Called when a location deletion notice arrives"""
+        log.debug("Received location deletion notice: %s", notice)
+
+    def on_status_withheld(self, notice):
+        """Called when a status withheld content notice arrives"""
+        log.debug("Received status withheld content notice: %s", notice)
+
+    def on_user_withheld(self, notice):
+        """Called when a user withheld content notice arrives"""
+        log.debug("Received user withheld content notice: %s", notice)
+
+    def on_warning(self, notice):
+        """Called when a disconnection warning message arrives"""
+        log.warning("Received stall warning: %s", notice)