Add asynchronous.AsyncStreamingClient
authorHarmon <Harmon758@gmail.com>
Thu, 19 May 2022 00:53:35 +0000 (19:53 -0500)
committerHarmon <Harmon758@gmail.com>
Thu, 19 May 2022 00:53:35 +0000 (19:53 -0500)
docs/asyncstream.rst
docs/asyncstreamingclient.rst [new file with mode: 0644]
docs/index.rst
docs/streamingclient.rst
docs/streamresponse.rst [new file with mode: 0644]
docs/streamrule.rst [new file with mode: 0644]
tweepy/asynchronous/__init__.py
tweepy/asynchronous/streaming.py

index da4123a9d410010c43c0983613a1ccb76bd3eb16..f77b1d8630fbfba4962d1bfea3a637bfd20e0d8c 100644 (file)
@@ -8,4 +8,5 @@
 
 .. autoclass:: AsyncStream
    :members:
+   :inherited-members:
    :member-order: bysource
diff --git a/docs/asyncstreamingclient.rst b/docs/asyncstreamingclient.rst
new file mode 100644 (file)
index 0000000..a4834ed
--- /dev/null
@@ -0,0 +1,12 @@
+.. _asyncstreamingclient_reference:
+
+.. currentmodule:: tweepy.asynchronous
+
+*****************************
+:class:`AsyncStreamingClient`
+*****************************
+
+.. autoclass:: AsyncStreamingClient
+   :members:
+   :inherited-members:
+   :member-order: bysource
index 41465f98ff9f87fe82b56c8f04e704a507ae99d5..2ac23a3eb8f5e4be832ed6c36ece9d46afaeb9c6 100644 (file)
@@ -38,7 +38,10 @@ Contents:
    client.rst
    response.rst
    streamingclient.rst
+   streamresponse.rst
+   streamrule.rst
    asyncclient.rst
+   asyncstreamingclient.rst
    exceptions.rst
    expansions_and_fields.rst
    v2_models.rst
index 9a9577266f7562a08541b5bebc5bcfe5bdec1622..3069447599fd957429d14758bbe776e768b53d84 100644 (file)
    :members:
    :inherited-members:
    :member-order: bysource
-
-``StreamResponse``
-==================
-.. autoclass:: StreamResponse
-   :class-doc-from: class
-
-   The :obj:`StreamResponse` returned by :meth:`StreamingClient.on_response` is
-   a :class:`collections.namedtuple`, with ``data``, ``includes``, ``errors``,
-   and ``matching_rules`` fields, corresponding with the fields in responses
-   from Twitter's API.
-
-   .. versionadded:: 4.6
-
-``StreamRule``
-==============
-.. autoclass:: StreamRule
diff --git a/docs/streamresponse.rst b/docs/streamresponse.rst
new file mode 100644 (file)
index 0000000..9e8d8ff
--- /dev/null
@@ -0,0 +1,15 @@
+.. _streamresponse:
+
+.. currentmodule:: tweepy
+
+``StreamResponse``
+==================
+.. autoclass:: StreamResponse
+   :class-doc-from: class
+
+   The :obj:`StreamResponse` returned by :meth:`StreamingClient.on_response` is
+   a :class:`collections.namedtuple`, with ``data``, ``includes``, ``errors``,
+   and ``matching_rules`` fields, corresponding with the fields in responses
+   from Twitter's API.
+
+   .. versionadded:: 4.6
diff --git a/docs/streamrule.rst b/docs/streamrule.rst
new file mode 100644 (file)
index 0000000..ea65ed5
--- /dev/null
@@ -0,0 +1,7 @@
+.. _streamrule:
+
+.. currentmodule:: tweepy
+
+``StreamRule``
+==============
+.. autoclass:: StreamRule
index ff2800245b4aed502762f2d0c308e18e6af5e4bf..d5dc1a8d0418b33bbb197c0c34a375e296633e57 100644 (file)
@@ -19,5 +19,5 @@ except ModuleNotFoundError:
         "installed"
     )
 
-from tweepy.asynchronous.streaming import AsyncStream
+from tweepy.asynchronous.streaming import AsyncStream, AsyncStreamingClient
 from tweepy.asynchronous.client import AsyncClient
index 3bf172359cd6199ba15f4940e3d353f0c2d7096b..21241e3d6a51605cd788fb6238fc84462980eafb 100644 (file)
@@ -13,48 +13,19 @@ from oauthlib.oauth1 import Client as OAuthClient
 from yarl import URL
 
 import tweepy
+from tweepy.asynchronous.client import AsyncBaseClient
+from tweepy.client import Response
 from tweepy.errors import TweepyException
 from tweepy.models import Status
+from tweepy.streaming import StreamResponse, StreamRule
+from tweepy.tweet import Tweet
 
 log = logging.getLogger(__name__)
 
 
-class AsyncStream:
-    """Stream realtime Tweets asynchronously with Twitter API v1.1
-
-    .. versionadded:: 4.0
-
-    Parameters
-    ----------
-    consumer_key: str
-        Twitter API Consumer Key
-    consumer_secret: str
-        Twitter API Consumer Secret
-    access_token: str
-        Twitter API Access Token
-    access_token_secret: str
-        Twitter API Access Token Secret
-    max_retries: int | None
-        Number of times to attempt to (re)connect the stream.
-    proxy: str | None
-        Proxy URL
+class AsyncBaseStream:
 
-    Attributes
-    ----------
-    session : aiohttp.ClientSession | None
-        Aiohttp client session used to connect to the API
-    task : asyncio.Task | None
-        The task running the stream
-    user_agent : str
-        User agent used when connecting to the API
-    """
-
-    def __init__(self, consumer_key, consumer_secret, access_token,
-                 access_token_secret, *, max_retries=inf, proxy=None):
-        self.consumer_key = consumer_key
-        self.consumer_secret = consumer_secret
-        self.access_token = access_token
-        self.access_token_secret = access_token_secret
+    def __init__(self, *, max_retries=inf, proxy=None):
         self.max_retries = max_retries
         self.proxy = proxy
 
@@ -66,10 +37,13 @@ class AsyncStream:
             f"Tweepy/{tweepy.__version__}"
         )
 
-    async def _connect(self, method, endpoint, params={}, headers=None,
-                       body=None):
+    async def _connect(
+        self, method, url, params=None, headers=None, body=None
+    ):
         error_count = 0
         # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting
+        # https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/handling-disconnections
+        # https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/handling-disconnections
         stall_timeout = 90
         network_error_wait = network_error_wait_step = 0.25
         network_error_wait_max = 16
@@ -77,27 +51,18 @@ class AsyncStream:
         http_error_wait_max = 320
         http_420_error_wait_start = 60
 
-        oauth_client = OAuthClient(self.consumer_key, self.consumer_secret,
-                                   self.access_token, self.access_token_secret)
-
         if self.session is None or self.session.closed:
             self.session = aiohttp.ClientSession(
                 timeout=aiohttp.ClientTimeout(sock_read=stall_timeout)
             )
         self.session.headers["User-Agent"] = self.user_agent
 
-        url = f"https://stream.twitter.com/1.1/{endpoint}.json"
-        url = str(URL(url).with_query(sorted(params.items())))
-
         try:
             while error_count <= self.max_retries:
-                request_url, request_headers, request_body = oauth_client.sign(
-                    url, method, body, headers
-                )
                 try:
                     async with self.session.request(
-                        method, request_url, headers=request_headers,
-                        data=request_body, proxy=self.proxy
+                        method, url, params=params, headers=headers, data=body,
+                        proxy=self.proxy
                     ) as resp:
                         if resp.status == 200:
                             error_count = 0
@@ -146,6 +111,131 @@ class AsyncStream:
             await self.session.close()
             await self.on_disconnect()
 
+    def disconnect(self):
+        """Disconnect the stream"""
+        if self.task is not None:
+            self.task.cancel()
+
+    async def on_closed(self, resp):
+        """|coroutine|
+
+        This is called when the stream has been closed by Twitter.
+
+        Parameters
+        ----------
+        response : aiohttp.ClientResponse
+            The response from Twitter
+        """
+        log.error("Stream connection closed by Twitter")
+
+    async def on_connect(self):
+        """|coroutine|
+
+        This is called after successfully connecting to the streaming API.
+        """
+        log.info("Stream connected")
+
+    async def on_connection_error(self):
+        """|coroutine|
+
+        This is called when the stream connection errors or times out.
+        """
+        log.error("Stream connection has errored or timed out")
+
+    async def on_disconnect(self):
+        """|coroutine|
+
+        This is called when the stream has disconnected.
+        """
+        log.info("Stream disconnected")
+
+    async def on_exception(self, exception):
+        """|coroutine|
+
+        This is called when an unhandled exception occurs.
+
+        Parameters
+        ----------
+        exception : Exception
+            The unhandled exception
+        """
+        log.exception("Stream encountered an exception")
+
+    async def on_keep_alive(self):
+        """|coroutine|
+
+        This is called when a keep-alive signal is received.
+        """
+        log.debug("Received keep-alive signal")
+
+    async def on_request_error(self, status_code):
+        """|coroutine|
+
+        This is called when a non-200 HTTP status code is encountered.
+
+        Parameters
+        ----------
+        status_code : int
+            The HTTP status code encountered
+        """
+        log.error("Stream encountered HTTP Error: %d", status_code)
+
+
+class AsyncStream(AsyncBaseStream):
+    """Stream realtime Tweets asynchronously with Twitter API v1.1
+
+    .. versionadded:: 4.0
+
+    Parameters
+    ----------
+    consumer_key: str
+        Twitter API Consumer Key
+    consumer_secret: str
+        Twitter API Consumer Secret
+    access_token: str
+        Twitter API Access Token
+    access_token_secret: str
+        Twitter API Access Token Secret
+    max_retries: int | None
+        Number of times to attempt to (re)connect the stream.
+    proxy: str | None
+        Proxy URL
+
+    Attributes
+    ----------
+    session : aiohttp.ClientSession | None
+        Aiohttp client session used to connect to the API
+    task : asyncio.Task | None
+        The task running the stream
+    user_agent : str
+        User agent used when connecting to the API
+    """
+
+    def __init__(self, consumer_key, consumer_secret, access_token,
+                 access_token_secret, **kwargs):
+        """__init__( \
+            consumer_key, consumer_secret, access_token, access_token_secret, \
+            *, max_retries=inf, proxy=None \
+        )
+        """
+        self.consumer_key = consumer_key
+        self.consumer_secret = consumer_secret
+        self.access_token = access_token
+        self.access_token_secret = access_token_secret
+        super().__init__(**kwargs)
+
+    async def _connect(
+        self, method, endpoint, params={}, headers=None, body=None
+    ):
+        oauth_client = OAuthClient(self.consumer_key, self.consumer_secret,
+                                   self.access_token, self.access_token_secret)
+        url = f"https://stream.twitter.com/1.1/{endpoint}.json"
+        url = str(URL(url).with_query(sorted(params.items())))
+        url, headers, body = oauth_client.sign(
+            url, http_method=method, headers=headers, body=body
+        )
+        await super()._connect(method, url, headers=headers, body=body)
+
     def filter(self, *, follow=None, track=None, locations=None,
                filter_level=None, languages=None, stall_warnings=False):
         """Filter realtime Tweets
@@ -286,75 +376,6 @@ class AsyncStream:
         # Use name parameter when support for Python 3.7 is dropped
         return self.task
 
-    def disconnect(self):
-        """Disconnect the stream"""
-        if self.task is not None:
-            self.task.cancel()
-
-    async def on_closed(self, resp):
-        """|coroutine|
-
-        This is called when the stream has been closed by Twitter.
-
-        Parameters
-        ----------
-        response : aiohttp.ClientResponse
-            The response from Twitter
-        """
-        log.error("Stream connection closed by Twitter")
-
-    async def on_connect(self):
-        """|coroutine|
-
-        This is called after successfully connecting to the streaming API.
-        """
-        log.info("Stream connected")
-
-    async def on_connection_error(self):
-        """|coroutine|
-
-        This is called when the stream connection errors or times out.
-        """
-        log.error("Stream connection has errored or timed out")
-
-    async def on_disconnect(self):
-        """|coroutine|
-
-        This is called when the stream has disconnected.
-        """
-        log.info("Stream disconnected")
-
-    async def on_exception(self, exception):
-        """|coroutine|
-
-        This is called when an unhandled exception occurs.
-
-        Parameters
-        ----------
-        exception : Exception
-            The unhandled exception
-        """
-        log.exception("Stream encountered an exception")
-
-    async def on_keep_alive(self):
-        """|coroutine|
-
-        This is called when a keep-alive signal is received.
-        """
-        log.debug("Received keep-alive signal")
-
-    async def on_request_error(self, status_code):
-        """|coroutine|
-
-        This is called when a non-200 HTTP status code is encountered.
-
-        Parameters
-        ----------
-        status_code : int
-            The HTTP status code encountered
-        """
-        log.error("Stream encountered HTTP Error: %d", status_code)
-
     async def on_data(self, raw_data):
         """|coroutine|
 
@@ -492,3 +513,438 @@ class AsyncStream:
             The stall warning
         """
         log.warning("Received stall warning: %s", notice)
+
+
+class AsyncStreamingClient(AsyncBaseClient, AsyncBaseStream):
+    """Stream realtime Tweets asynchronously with Twitter API v2
+
+    .. versionadded:: 4.10
+
+    Parameters
+    ----------
+    bearer_token : str
+        Twitter API Bearer Token
+    return_type : type[dict | requests.Response | Response]
+        Type to return from requests to the API
+    wait_on_rate_limit : bool
+        Whether to wait when rate limit is reached
+    max_retries: int | None
+        Number of times to attempt to (re)connect the stream.
+    proxy : str | None
+        URL of the proxy to use when connecting to the stream
+
+    Attributes
+    ----------
+    session : aiohttp.ClientSession | None
+        Aiohttp client session used to connect to the API
+    task : asyncio.Task | None
+        The task running the stream
+    user_agent : str
+        User agent used when connecting to the API
+    """
+
+    def __init__(self, bearer_token, *, return_type=Response,
+                 wait_on_rate_limit=False, **kwargs):
+        """__init__( \
+            bearer_token, *, return_type=Response, wait_on_rate_limit=False, \
+            max_retries=inf, proxy=None \
+        )
+        """
+        AsyncBaseClient.__init__(self, bearer_token, return_type=return_type,
+                                 wait_on_rate_limit=wait_on_rate_limit)
+        AsyncBaseStream.__init__(self, **kwargs)
+
+    async def _connect(self, method, endpoint, **kwargs):
+        url = f"https://api.twitter.com/2/tweets/{endpoint}/stream"
+        headers = {"Authorization": f"Bearer {self.bearer_token}"}
+        await super()._connect(method, url, headers=headers, **kwargs)
+
+    def _process_data(self, data, data_type=None):
+        if data_type is StreamRule:
+            if isinstance(data, list):
+                rules = []
+                for rule in data:
+                    if "tag" in rule:
+                        rules.append(StreamRule(
+                            value=rule["value"], id=rule["id"], tag=rule["tag"]
+                        ))
+                    else:
+                        rules.append(StreamRule(value=rule["value"],
+                                                id=rule["id"]))
+                return rules
+            elif data is not None:
+                if "tag" in data:
+                    return StreamRule(value=data["value"], id=data["id"],
+                                      tag=data["tag"])
+                else:
+                    return StreamRule(value=data["value"], id=data["id"])
+        else:
+            super()._process_data(data, data_type=data_type)
+
+    async def add_rules(self, add, **params):
+        """add_rules(add, *, dry_run)
+
+        |coroutine|
+
+        Add rules to filtered stream.
+
+        Parameters
+        ----------
+        add : list[StreamRule] | StreamRule
+            Specifies the operation you want to perform on the rules.
+        dry_run : bool
+            Set to true to test a the syntax of your rule without submitting
+            it. This is useful if you want to check the syntax of a rule before
+            removing one or more of your existing rules.
+
+        Returns
+        -------
+        dict | requests.Response | Response
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules
+        """
+        json = {"add": []}
+        if isinstance(add, StreamRule):
+            add = (add,)
+        for rule in add:
+            if rule.tag is not None:
+                json["add"].append({"value": rule.value, "tag": rule.tag})
+            else:
+                json["add"].append({"value": rule.value})
+
+        return await self._make_request(
+            "POST", f"/2/tweets/search/stream/rules", params=params,
+            endpoint_parameters=("dry_run",), json=json, data_type=StreamRule
+        )
+
+    async def delete_rules(self, ids, **params):
+        """delete_rules(ids, *, dry_run)
+
+        |coroutine|
+
+        Delete rules from filtered stream.
+
+        Parameters
+        ----------
+        ids : int | str | list[int | str | StreamRule] | StreamRule
+            Array of rule IDs, each one representing a rule already active in
+            your stream. IDs must be submitted as strings.
+        dry_run : bool
+            Set to true to test a the syntax of your rule without submitting
+            it. This is useful if you want to check the syntax of a rule before
+            removing one or more of your existing rules.
+
+        Returns
+        -------
+        dict | requests.Response | Response
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules
+        """
+        json = {"delete": {"ids": []}}
+        if isinstance(ids, (int, str, StreamRule)):
+            ids = (ids,)
+        for id in ids:
+            if isinstance(id, StreamRule):
+                json["delete"]["ids"].append(str(StreamRule.id))
+            else:
+                json["delete"]["ids"].append(str(id))
+
+        return await self._make_request(
+            "POST", f"/2/tweets/search/stream/rules", params=params,
+            endpoint_parameters=("dry_run",), json=json, data_type=StreamRule
+        )
+
+    def filter(self, **params):
+        """filter( \
+            *, backfill_minutes=None, expansions=None, media_fields=None, \
+            place_fields=None, poll_fields=None, tweet_fields=None, \
+            user_fields=None \
+        )
+
+        Streams Tweets in real-time based on a specific set of filter rules.
+
+        If you are using the academic research product track, you can connect
+        up to two `redundant connections <filter redundant connections_>`_ to
+        maximize your streaming up-time.
+
+        The Tweets returned by this endpoint count towards the Project-level
+        `Tweet cap`_.
+
+        Parameters
+        ----------
+        backfill_minutes : int | None
+            By passing this parameter, you can request up to five (5) minutes
+            worth of streaming data that you might have missed during a
+            disconnection to be delivered to you upon reconnection. The
+            backfilled Tweets will automatically flow through the reconnected
+            stream, with older Tweets generally being delivered before any
+            newly matching Tweets. You must include a whole number between 1
+            and 5 as the value to this parameter.
+
+            This feature will deliver duplicate Tweets, meaning that if you
+            were disconnected for 90 seconds, and you requested two minutes of
+            backfill, you will receive 30 seconds worth of duplicate Tweets.
+            Due to this, you should make sure your system is tolerant of
+            duplicate data.
+
+            This feature is currently only available to the Academic Research
+            product track.
+        expansions : list[str] | str
+            :ref:`expansions_parameter`
+        media_fields : list[str] | str
+            :ref:`media_fields_parameter`
+        place_fields : list[str] | str
+            :ref:`place_fields_parameter`
+        poll_fields : list[str] | str
+            :ref:`poll_fields_parameter`
+        tweet_fields : list[str] | str
+            :ref:`tweet_fields_parameter`
+        user_fields : list[str] | str
+            :ref:`user_fields_parameter`
+
+        Raises
+        ------
+        TweepyException
+            When the stream is already connected
+
+        Returns
+        -------
+        asyncio.Task
+            The task running the stream
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream
+
+        .. _filter redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/recovery-and-redundancy-features
+        .. _Tweet cap: https://developer.twitter.com/en/docs/twitter-api/tweet-caps
+        """
+        if self.task is not None and not self.task.done():
+            raise TweepyException("Stream is already connected")
+
+        endpoint = "search"
+
+        params = self._process_params(
+            params, endpoint_parameters=(
+                "backfill_minutes", "expansions", "media.fields",
+                "place.fields", "poll.fields", "tweet.fields", "user.fields"
+            )
+        )
+
+        self.task = asyncio.create_task(
+            self._connect("GET", endpoint, params=params)
+        )
+        # Use name parameter when support for Python 3.7 is dropped
+        return self.task
+
+    async def get_rules(self, **params):
+        """get_rules(*, ids)
+
+        |coroutine|
+
+        Return a list of rules currently active on the streaming endpoint,
+        either as a list or individually.
+
+        Parameters
+        ----------
+        ids : list[str] | str
+            Comma-separated list of rule IDs. If omitted, all rules are
+            returned.
+
+        Returns
+        -------
+        dict | requests.Response | Response
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream-rules
+        """
+        return await self._make_request(
+            "GET", f"/2/tweets/search/stream/rules", params=params,
+            endpoint_parameters=("ids",), data_type=StreamRule
+        )
+
+    def sample(self, **params):
+        """sample( \
+            *, backfill_minutes=None, expansions=None, media_fields=None, \
+            place_fields=None, poll_fields=None, tweet_fields=None, \
+            user_fields=None \
+        )
+
+        Streams about 1% of all Tweets in real-time.
+
+        If you are using the academic research product track, you can connect
+        up to two `redundant connections <sample redundant connections_>`_ to
+        maximize your streaming up-time.
+
+        Parameters
+        ----------
+        backfill_minutes : int | None
+            By passing this parameter, you can request up to five (5) minutes
+            worth of streaming data that you might have missed during a
+            disconnection to be delivered to you upon reconnection. The
+            backfilled Tweets will automatically flow through the reconnected
+            stream, with older Tweets generally being delivered before any
+            newly matching Tweets. You must include a whole number between 1
+            and 5 as the value to this parameter.
+
+            This feature will deliver duplicate Tweets, meaning that if you
+            were disconnected for 90 seconds, and you requested two minutes of
+            backfill, you will receive 30 seconds worth of duplicate Tweets.
+            Due to this, you should make sure your system is tolerant of
+            duplicate data.
+
+            This feature is currently only available to the Academic Research
+            product track.
+        expansions : list[str] | str
+            :ref:`expansions_parameter`
+        media_fields : list[str] | str
+            :ref:`media_fields_parameter`
+        place_fields : list[str] | str
+            :ref:`place_fields_parameter`
+        poll_fields : list[str] | str
+            :ref:`poll_fields_parameter`
+        tweet_fields : list[str] | str
+            :ref:`tweet_fields_parameter`
+        user_fields : list[str] | str
+            :ref:`user_fields_parameter`
+
+        Raises
+        ------
+        TweepyException
+            When the stream is already connected
+
+        Returns
+        -------
+        asyncio.Task
+            The task running the stream
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream
+
+        .. _sample redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/recovery-and-redundancy-features
+        """
+        if self.task is not None and not self.task.done():
+            raise TweepyException("Stream is already connected")
+
+        endpoint = "sample"
+
+        params = self._process_params(
+            params, endpoint_parameters=(
+                "backfill_minutes", "expansions", "media.fields",
+                "place.fields", "poll.fields", "tweet.fields", "user.fields"
+            )
+        )
+
+        self.task = asyncio.create_task(
+            self._connect("GET", endpoint, params=params)
+        )
+        # Use name parameter when support for Python 3.7 is dropped
+        return self.task
+
+    async def on_data(self, raw_data):
+        """|coroutine|
+
+        This is called when raw data is received from the stream.
+        This method handles sending the data to other methods.
+
+        Parameters
+        ----------
+        raw_data : JSON
+            The raw data from the stream
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/consuming-streaming-data
+        """
+        data = json.loads(raw_data)
+
+        tweet = None
+        includes = {}
+        errors = []
+        matching_rules = []
+
+        if "data" in data:
+            tweet = Tweet(data["data"])
+            await self.on_tweet(tweet)
+        if "includes" in data:
+            includes = self._process_includes(data["includes"])
+            await self.on_includes(includes)
+        if "errors" in data:
+            errors = data["errors"]
+            await self.on_errors(errors)
+        if "matching_rules" in data:
+            matching_rules = [
+                StreamRule(id=rule["id"], tag=rule["tag"])
+                for rule in data["matching_rules"]
+            ]
+            await self.on_matching_rules(matching_rules)
+
+        await self.on_response(
+            StreamResponse(tweet, includes, errors, matching_rules)
+        )
+
+    async def on_tweet(self, tweet):
+        """|coroutine|
+
+        This is called when a Tweet is received.
+
+        Parameters
+        ----------
+        tweet : Tweet
+            The Tweet received
+        """
+        pass
+
+    async def on_includes(self, includes):
+        """|coroutine|
+
+        This is called when includes are received.
+
+        Parameters
+        ----------
+        includes : dict
+            The includes received
+        """
+        pass
+
+    async def on_errors(self, errors):
+        """|coroutine|
+
+        This is called when errors are received.
+
+        Parameters
+        ----------
+        errors : dict
+            The errors received
+        """
+        log.error("Received errors: %s", errors)
+
+    async def on_matching_rules(self, matching_rules):
+        """|coroutine|
+
+        This is called when matching rules are received.
+
+        Parameters
+        ----------
+        matching_rules : list[StreamRule]
+            The matching rules received
+        """
+        pass
+
+    async def on_response(self, response):
+        """|coroutine|
+
+        This is called when a response is received.
+
+        Parameters
+        ----------
+        response : StreamResponse
+            The response received
+        """
+        log.debug("Received response: %s", response)