Add support for streaming with Twitter API v2
authorHarmon <Harmon758@gmail.com>
Sun, 20 Feb 2022 06:18:00 +0000 (00:18 -0600)
committerHarmon <Harmon758@gmail.com>
Sun, 20 Feb 2022 06:18:00 +0000 (00:18 -0600)
Refactor Client and Stream to inherit from new BaseClient and BaseStream classes and add StreamingClient, StreamResponse, and StreamRule

docs/conf.py
docs/index.rst
docs/stream.rst
docs/streaming.rst
docs/streamingclient.rst [new file with mode: 0644]
tweepy/__init__.py
tweepy/client.py
tweepy/streaming.py

index 4c534245cd127cfea51e24ce75d483ef89b582b0..c2e6d8d20e866ae17bda743b0ca5d2fae1bc2485 100644 (file)
@@ -38,6 +38,8 @@ hoverxref_auto_ref = True
 hoverxref_domains = ['py']
 hoverxref_intersphinx = ['aiohttp', 'requests', 'requests_oauthlib']
 
+autoclass_content = "both"
+
 intersphinx_mapping = {
   'python': ('https://docs.python.org/3', None),
   'aiohttp': ('https://docs.aiohttp.org/en/stable/', None),
index cd79f5abc1b6a6a9e0ec11aeeebe3ab569245a6a..d3e9d659a53c447e1f82225da8246ee6d98f5b0b 100644 (file)
@@ -18,6 +18,7 @@ Contents:
    client.rst
    models.rst
    stream.rst
+   streamingclient.rst
    asyncstream.rst
    exceptions.rst
    extended_tweets.rst
index d5f2c6a1447f93a45c28071140c06bc5f0680948..73b18ef6cf09d814f117da8dc0cfd25b4c9b7df8 100644 (file)
@@ -2,10 +2,11 @@
 
 .. currentmodule:: tweepy
 
-*******************************************
-:class:`tweepy.Stream` --- Stream Reference
-*******************************************
+*****************************************************
+:class:`tweepy.Stream` --- Twitter API v1.1 Reference
+*****************************************************
 
 .. autoclass:: Stream
    :members:
+   :inherited-members:
    :member-order: bysource
index 3f32d0d76edae90f1590365658683e056e0b2538..95a36410de8db0ea27c733ec1d9a0dadef6ad4ef 100644 (file)
@@ -6,12 +6,6 @@
 Streaming
 *********
 
-:class:`Stream` allows `filtering`_ and `sampling`_ of realtime Tweets using
-Twitter's API.
-
-.. _filtering: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
-.. _sampling: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/overview
-
 Streams utilize Streaming HTTP protocol to deliver data through
 an open, streaming API connection. Rather than delivering data in batches
 through repeated requests by your client app, as might be expected from a REST
@@ -21,6 +15,17 @@ results in a low-latency delivery mechanism that can support very high
 throughput. For further information, see
 https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data
 
+:class:`Stream` allows `filtering <v1.1 filtering_>`_ and
+`sampling <v1.1 sampling_>`_ of realtime Tweets using Twitter API v1.1.
+
+:class:`StreamingClient` allows `filtering <v2 filtering_>`_ and
+`sampling <v2 sampling_>`_ of realtime Tweets using Twitter API v2.
+
+.. _v1.1 filtering: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
+.. _v1.1 sampling: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/overview
+.. _v2 filtering: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/introduction
+.. _v2 sampling: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/introduction
+
 Using :class:`Stream`
 =====================
 
@@ -64,35 +69,110 @@ subclassed. For example, to print the IDs of every Tweet received::
     )
     printer.sample()
 
+Using :class:`StreamingClient`
+==============================
+
+To use :class:`StreamingClient`, an instance of it needs to be initialized with
+a Twitter API Bearer Token::
+
+    import tweepy
+
+    streaming_client = tweepy.StreamingClient("Bearer Token here")
+
+Then, :meth:`StreamingClient.sample` can be used to connect to and run a
+sampling stream::
+
+    streaming_client.sample()
+
+Or :meth:`StreamingClient.add_rules` can be used to add rules before using
+:meth:`StreamingClient.filter` to connect to and run a filtered stream::
+
+    streaming_client.add_rules(tweepy.StreamRule("Tweepy"))
+    streaming_client.filter()
+
+:meth:`StreamingClient.get_rules` can be used to retrieve existing rules and
+:meth:`StreamingClient.delete_rules` can be used to delete rules.
+
+To learn how build rules, refer to the Twitter API
+`Building rules for filtered stream`_ documentation.
+
+.. _Building rules for filtered stream: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule
+
+Data received from the stream is passed to :meth:`StreamingClient.on_data`.
+This method handles sending the data to other methods. Tweets recieved are sent
+to :meth:`StreamingClient.on_tweet`, ``includes`` data are sent to
+:meth:`StreamingClient.on_includes`, errors are sent to
+:meth:`StreamingClient.on_errors`, and matching rules are sent to
+:meth:`StreamingClient.on_matching_rules`. A :class:`StreamResponse` instance
+containing all four fields is sent to :meth:`StreamingClient.on_response`. By
+default, only :meth:`StreamingClient.on_response` logs the data received, at
+the ``DEBUG`` :ref:`logging level <python:levels>`.
+
+To customize the processing of the stream data, :class:`StreamingClient` needs to be
+subclassed. For example, to print the IDs of every Tweet received::
+
+    class IDPrinter(tweepy.StreamingClient):
+
+        def on_tweet(self, tweet):
+            print(tweet.id)
+
+
+    printer = IDPrinter("Bearer Token here")
+    printer.sample()
+
 Threading
 =========
-Both :meth:`Stream.filter` and :meth:`Stream.sample` have a ``threaded``
-parameter. When set to ``True``, the stream will run in a separate
-:ref:`thread <python:thread-objects>`, which is returned by the call to either
+:meth:`Stream.filter`, :meth:`Stream.sample`, :meth:`StreamingClient.filter`,
+and :meth:`StreamingClient.sample` all have a ``threaded`` parameter. When set
+to ``True``, the stream will run in a separate
+:ref:`thread <python:thread-objects>`, which is returned by the call to the
 method. For example::
 
     thread = stream.filter(follow=[1072250532645998596], threaded=True)
 
+or::
+
+    thread = streaming_client.sample(threaded=True)
+
 Handling Errors
 ===============
-:class:`Stream` has multiple methods to handle errors during streaming.
-:meth:`Stream.on_closed` is called when the stream is closed by Twitter.
-:meth:`Stream.on_connection_error` is called when the stream encounters a
-connection error. :meth:`Stream.on_request_error` is called when an error is
-encountered while trying to connect to the stream. When these errors are
-encountered and ``max_retries``, which defaults to infinite, hasn't been
-exceeded yet, the :class:`Stream` instance will attempt to reconnect the stream
-after an appropriate amount of time. By default, all three of these methods log
-an error. To customize that handling, they can be overridden in a subclass::
+Both :class:`Stream` and :class:`StreamingClient` have multiple methods to
+handle errors during streaming.
+
+:meth:`Stream.on_closed` / :meth:`StreamingClient.on_closed` is called when the
+stream is closed by Twitter.
+
+:meth:`Stream.on_connection_error` /
+:meth:`StreamingClient.on_connection_error` is called when the stream
+encounters a connection error.
+
+:meth:`Stream.on_request_error` / :meth:`StreamingClient.on_request_error` is
+called when an error is encountered while trying to connect to the stream.
+
+When these errors are encountered and ``max_retries``, which defaults to
+infinite, hasn't been exceeded yet, the :class:`Stream` /
+:class:`StreamingClient` instance will attempt to reconnect the stream after an
+appropriate amount of time. By default, both versions of all three of these
+methods log an error. To customize that handling, they can be overridden in a
+subclass::
 
     class ConnectionTester(tweepy.Stream):
 
         def on_connection_error(self):
             self.disconnect()
 
-:meth:`Stream.on_request_error` is also passed the HTTP status code that was
-encountered. The HTTP status codes reference for the Twitter API can be found
-at https://developer.twitter.com/en/support/twitter-api/error-troubleshooting.
+::
+
+    class ConnectionTester(tweepy.StreamingClient):
+
+        def on_connection_error(self):
+            self.disconnect()
+
+:meth:`Stream.on_request_error` / :meth:`StreamingClient.on_request_error` is
+also passed the HTTP status code that was encountered. The HTTP status codes
+reference for the Twitter API can be found at
+https://developer.twitter.com/en/support/twitter-api/error-troubleshooting.
 
-:meth:`Stream.on_exception` is called when an unhandled exception occurs. This
-is fatal to the stream, and by default, an exception is logged.
+:meth:`Stream.on_exception` / :meth:`StreamingClient.on_exception` is called
+when an unhandled exception occurs. This is fatal to the stream, and by
+default, an exception is logged.
diff --git a/docs/streamingclient.rst b/docs/streamingclient.rst
new file mode 100644 (file)
index 0000000..048f96b
--- /dev/null
@@ -0,0 +1,27 @@
+.. _streamingclient_reference:
+
+.. currentmodule:: tweepy
+
+************************************************************
+:class:`tweepy.StreamingClient` --- Twitter API v2 Reference
+************************************************************
+
+.. autoclass:: StreamingClient
+   :members:
+   :inherited-members:
+   :member-order: bysource
+
+``StreamResponse``
+==================
+.. autoclass:: StreamResponse
+
+   The :obj:`StreamResponse` returned by :meth:`StreamingClient.on_response` is
+   a :class:`collections.namedtuple`, with ``data``, ``includes``, ``errors``,
+   and ``matching_rules`` fields, corresponding with the fields in responses
+   from Twitter's API.
+
+   .. versionadded:: 4.6
+
+``StreamRule``
+==============
+.. autoclass:: StreamRule
index 10fe9991e5faf6d6af4eada19831032504f1a874..7c572029bdb6ba9ab9c7afabb5a7a3f55ac3e9b1 100644 (file)
@@ -27,7 +27,9 @@ from tweepy.pagination import Paginator
 from tweepy.place import Place
 from tweepy.poll import Poll
 from tweepy.space import Space
-from tweepy.streaming import Stream
+from tweepy.streaming import (
+    Stream, StreamingClient, StreamResponse, StreamRule
+)
 from tweepy.tweet import ReferencedTweet, Tweet
 from tweepy.user import User
 
index 1af2c74e26eed80d2f93962c91daa223e67e7f00..5a36d7809c3bf4273d58d4c8b8f545b058f05390 100644 (file)
@@ -30,41 +30,7 @@ log = logging.getLogger(__name__)
 Response = namedtuple("Response", ("data", "includes", "errors", "meta"))
 
 
-class Client:
-    """Client( \
-        bearer_token=None, consumer_key=None, consumer_secret=None, \
-        access_token=None, access_token_secret=None, *, return_type=Response, \
-        wait_on_rate_limit=False \
-    )
-
-    Twitter API v2 Client
-
-    .. versionadded:: 4.0
-
-    Parameters
-    ----------
-    bearer_token : Optional[str]
-        Twitter API Bearer Token
-    consumer_key : Optional[str]
-        Twitter API Consumer Key
-    consumer_secret : Optional[str]
-        Twitter API Consumer Secret
-    access_token : Optional[str]
-        Twitter API Access Token
-    access_token_secret : Optional[str]
-        Twitter API Access Token Secret
-    return_type : Type[Union[dict, requests.Response, Response]]
-        Type to return from requests to the API
-    wait_on_rate_limit : bool
-        Whether to wait when rate limit is reached
-
-    Attributes
-    ----------
-    session : requests.Session
-        Requests Session used to make requests to the API
-    user_agent : str
-        User agent used when making requests to the API
-    """
+class BaseClient:
 
     def __init__(
         self, bearer_token=None, consumer_key=None, consumer_secret=None,
@@ -147,25 +113,7 @@ class Client:
 
     def _make_request(self, method, route, params={}, endpoint_parameters=None,
                       json=None, data_type=None, user_auth=False):
-        request_params = {}
-        for param_name, param_value in params.items():
-            if param_name.replace('_', '.') in endpoint_parameters:
-                param_name = param_name.replace('_', '.')
-
-            if isinstance(param_value, list):
-                request_params[param_name] = ','.join(map(str, param_value))
-            elif isinstance(param_value, datetime.datetime):
-                if param_value.tzinfo is not None:
-                    param_value = param_value.astimezone(datetime.timezone.utc)
-                request_params[param_name] = param_value.strftime(
-                    "%Y-%m-%dT%H:%M:%SZ"
-                )
-                # TODO: Constant datetime format string?
-            else:
-                request_params[param_name] = param_value
-
-            if param_name not in endpoint_parameters:
-                log.warn(f"Unexpected parameter: {param_name}")
+        request_params = self._process_params(params, endpoint_parameters)
 
         response = self.request(method, route, params=request_params,
                                 json=json, user_auth=user_auth)
@@ -179,13 +127,25 @@ class Client:
             return response
 
         data = response.get("data")
+        data = self._process_data(data, data_type=data_type)
+
+        includes = response.get("includes", {})
+        includes = self._process_includes(includes)
+
+        errors = response.get("errors", [])
+        meta = response.get("meta", {})
+
+        return Response(data, includes, errors, meta)
+
+    def _process_data(self, data, data_type=None):
         if data_type is not None:
             if isinstance(data, list):
                 data = [data_type(result) for result in data]
             elif data is not None:
                 data = data_type(data)
+        return data
 
-        includes = response.get("includes", {})
+    def _process_includes(self, includes):
         if "media" in includes:
             includes["media"] = [Media(media) for media in includes["media"]]
         if "places" in includes:
@@ -196,11 +156,66 @@ class Client:
             includes["tweets"] = [Tweet(tweet) for tweet in includes["tweets"]]
         if "users" in includes:
             includes["users"] = [User(user) for user in includes["users"]]
+        return includes
 
-        errors = response.get("errors", [])
-        meta = response.get("meta", {})
+    def _process_params(self, params, endpoint_parameters):
+        request_params = {}
+        for param_name, param_value in params.items():
+            if param_name.replace('_', '.') in endpoint_parameters:
+                param_name = param_name.replace('_', '.')
 
-        return Response(data, includes, errors, meta)
+            if isinstance(param_value, list):
+                request_params[param_name] = ','.join(map(str, param_value))
+            elif isinstance(param_value, datetime.datetime):
+                if param_value.tzinfo is not None:
+                    param_value = param_value.astimezone(datetime.timezone.utc)
+                request_params[param_name] = param_value.strftime(
+                    "%Y-%m-%dT%H:%M:%SZ"
+                )
+                # TODO: Constant datetime format string?
+            else:
+                request_params[param_name] = param_value
+
+            if param_name not in endpoint_parameters:
+                log.warn(f"Unexpected parameter: {param_name}")
+        return request_params
+
+
+class Client(BaseClient):
+    """Client( \
+        bearer_token=None, consumer_key=None, consumer_secret=None, \
+        access_token=None, access_token_secret=None, *, return_type=Response, \
+        wait_on_rate_limit=False \
+    )
+
+    Twitter API v2 Client
+
+    .. versionadded:: 4.0
+
+    Parameters
+    ----------
+    bearer_token : Optional[str]
+        Twitter API Bearer Token
+    consumer_key : Optional[str]
+        Twitter API Consumer Key
+    consumer_secret : Optional[str]
+        Twitter API Consumer Secret
+    access_token : Optional[str]
+        Twitter API Access Token
+    access_token_secret : Optional[str]
+        Twitter API Access Token Secret
+    return_type : Type[Union[dict, requests.Response, Response]]
+        Type to return from requests to the API
+    wait_on_rate_limit : bool
+        Whether to wait when rate limit is reached
+
+    Attributes
+    ----------
+    session : requests.Session
+        Requests Session used to make requests to the API
+    user_agent : str
+        User agent used when making requests to the API
+    """
 
     # Hide replies
 
index 1bbc2f9027c3719980000d839ca430090dcc7e2d..6c338cf41387febe4dde0d5ba186013d621ab465 100644 (file)
@@ -4,6 +4,7 @@
 
 # Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
 
+from collections import namedtuple
 import json
 import logging
 from math import inf
@@ -11,68 +12,29 @@ from platform import python_version
 import ssl
 from threading import Thread
 from time import sleep
+from typing import NamedTuple
 
 import requests
 from requests_oauthlib import OAuth1
 import urllib3
 
 import tweepy
+from tweepy.client import BaseClient, Response
 from tweepy.errors import TweepyException
 from tweepy.models import Status
+from tweepy.tweet import Tweet
 
 log = logging.getLogger(__name__)
 
+StreamResponse = namedtuple(
+    "StreamResponse", ("data", "includes", "errors", "matching_rules")
+)
 
-class Stream:
-    """Filter and sample realtime Tweets
 
-    Parameters
-    ----------
-    consumer_key : str
-        Twitter API Consumer Key
-    consumer_secret : str
-        Twitter API Consumer Secret
-    access_token: str
-        Twitter API Access Token
-    access_token_secret : str
-        Twitter API Access Token Secret
-    chunk_size : int
-        The default socket.read size. Default to 512, less than half the size
-        of a Tweet so that it reads Tweets with the minimal latency of 2 reads
-        per Tweet. Values higher than ~1kb will increase latency by waiting for
-        more data to arrive but may also increase throughput by doing fewer
-        socket read calls.
-    daemon : bool
-        Whether or not to use a daemon thread when using a thread to run the
-        stream
-    max_retries : int
-        Max number of times to retry connecting the stream
-    proxy : Optional[str]
-        URL of the proxy to use when connecting to the stream
-    verify : Union[bool, str]
-        Either a boolean, in which case it controls whether to verify the
-        server’s TLS certificate, or a string, in which case it must be a path
-        to a CA bundle to use.
-
-    Attributes
-    ----------
-    running : bool
-        Whether there's currently a stream running
-    session : :class:`requests.Session`
-        Requests Session used to connect to the stream
-    thread : Optional[:class:`threading.Thread`]
-        Thread used to run the stream
-    user_agent : str
-        User agent used when connecting to the stream
-    """
+class BaseStream:
 
-    def __init__(self, consumer_key, consumer_secret, access_token,
-                 access_token_secret, *, chunk_size=512, daemon=False,
-                 max_retries=inf, proxy=None, verify=True):
-        self.consumer_key = consumer_key
-        self.consumer_secret = consumer_secret
-        self.access_token = access_token
-        self.access_token_secret = access_token_secret
+    def __init__(self, *, chunk_size=512, daemon=False, max_retries=inf,
+                 proxy=None, verify=True):
         self.chunk_size = chunk_size
         self.daemon = daemon
         self.max_retries = max_retries
@@ -88,11 +50,14 @@ class Stream:
             f"Tweepy/{tweepy.__version__}"
         )
 
-    def _connect(self, method, endpoint, params=None, headers=None, body=None):
+    def _connect(self, method, url, auth=None, params=None, headers=None,
+                 body=None):
         self.running = True
 
         error_count = 0
         # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting
+        # https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/handling-disconnections
+        # https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/handling-disconnections
         stall_timeout = 90
         network_error_wait = network_error_wait_step = 0.25
         network_error_wait_max = 16
@@ -100,13 +65,8 @@ class Stream:
         http_error_wait_max = 320
         http_420_error_wait_start = 60
 
-        auth = OAuth1(self.consumer_key, self.consumer_secret,
-                      self.access_token, self.access_token_secret)
-
         self.session.headers["User-Agent"] = self.user_agent
 
-        url = f"https://stream.twitter.com/1.1/{endpoint}.json"
-
         try:
             while self.running and error_count <= self.max_retries:
                 try:
@@ -185,6 +145,121 @@ class Stream:
         self.thread.start()
         return self.thread
 
+    def disconnect(self):
+        """Disconnect the stream"""
+        self.running = False
+
+    def on_closed(self, response):
+        """This is called when the stream has been closed by Twitter.
+
+        Parameters
+        ----------
+        response : requests.Response
+            The Response from Twitter
+        """
+        log.error("Stream connection closed by Twitter")
+
+    def on_connect(self):
+        """This is called after successfully connecting to the streaming API.
+        """
+        log.info("Stream connected")
+
+    def on_connection_error(self):
+        """This is called when the stream connection errors or times out."""
+        log.error("Stream connection has errored or timed out")
+
+    def on_disconnect(self):
+        """This is called when the stream has disconnected."""
+        log.info("Stream disconnected")
+
+    def on_exception(self, exception):
+        """This is called when an unhandled exception occurs.
+
+        Parameters
+        ----------
+        exception : Exception
+            The unhandled exception
+        """
+        log.exception("Stream encountered an exception")
+
+    def on_keep_alive(self):
+        """This is called when a keep-alive signal is received."""
+        log.debug("Received keep-alive signal")
+
+    def on_request_error(self, status_code):
+        """This is called when a non-200 HTTP status code is encountered.
+
+        Parameters
+        ----------
+        status_code : int
+            The HTTP status code encountered
+        """
+        log.error("Stream encountered HTTP error: %d", status_code)
+
+
+class Stream(BaseStream):
+    """Filter and sample realtime Tweets with Twitter API v1.1
+
+    Parameters
+    ----------
+    consumer_key : str
+        Twitter API Consumer Key
+    consumer_secret : str
+        Twitter API Consumer Secret
+    access_token: str
+        Twitter API Access Token
+    access_token_secret : str
+        Twitter API Access Token Secret
+    chunk_size : int
+        The default socket.read size. Default to 512, less than half the size
+        of a Tweet so that it reads Tweets with the minimal latency of 2 reads
+        per Tweet. Values higher than ~1kb will increase latency by waiting for
+        more data to arrive but may also increase throughput by doing fewer
+        socket read calls.
+    daemon : bool
+        Whether or not to use a daemon thread when using a thread to run the
+        stream
+    max_retries : int
+        Max number of times to retry connecting the stream
+    proxy : Optional[str]
+        URL of the proxy to use when connecting to the stream
+    verify : Union[bool, str]
+        Either a boolean, in which case it controls whether to verify the
+        server’s TLS certificate, or a string, in which case it must be a path
+        to a CA bundle to use.
+
+    Attributes
+    ----------
+    running : bool
+        Whether there's currently a stream running
+    session : :class:`requests.Session`
+        Requests Session used to connect to the stream
+    thread : Optional[:class:`threading.Thread`]
+        Thread used to run the stream
+    user_agent : str
+        User agent used when connecting to the stream
+    """
+
+    def __init__(self, consumer_key, consumer_secret, access_token,
+                 access_token_secret, **kwargs):
+        """__init__( \
+            consumer_key, consumer_secret, access_token, access_token_secret, \
+            chunk_size=512, daemon=False, max_retries=inf, proxy=None, \
+            verify=True \
+        )
+        """
+        self.consumer_key = consumer_key
+        self.consumer_secret = consumer_secret
+        self.access_token = access_token
+        self.access_token_secret = access_token_secret
+        super().__init__(**kwargs)
+
+    def _connect(self, method, endpoint, **kwargs):
+        auth = OAuth1(self.consumer_key, self.consumer_secret,
+                      self.access_token, self.access_token_secret)
+        url = f"https://stream.twitter.com/1.1/{endpoint}.json"
+        super()._connect(method, url, auth=auth **kwargs)
+
     def filter(self, *, follow=None, track=None, locations=None,
                filter_level=None, languages=None, stall_warnings=False,
                threaded=False):
@@ -313,57 +388,6 @@ class Stream:
         else:
             self._connect(method, endpoint, params=params)
 
-    def disconnect(self):
-        """Disconnect the stream"""
-        self.running = False
-
-    def on_closed(self, response):
-        """This is called when the stream has been closed by Twitter.
-
-        Parameters
-        ----------
-        response : requests.Response
-            The Response from Twitter
-        """
-        log.error("Stream connection closed by Twitter")
-
-    def on_connect(self):
-        """This is called after successfully connecting to the streaming API.
-        """
-        log.info("Stream connected")
-
-    def on_connection_error(self):
-        """This is called when the stream connection errors or times out."""
-        log.error("Stream connection has errored or timed out")
-
-    def on_disconnect(self):
-        """This is called when the stream has disconnected."""
-        log.info("Stream disconnected")
-
-    def on_exception(self, exception):
-        """This is called when an unhandled exception occurs.
-
-        Parameters
-        ----------
-        exception : Exception
-            The unhandled exception
-        """
-        log.exception("Stream encountered an exception")
-
-    def on_keep_alive(self):
-        """This is called when a keep-alive signal is received."""
-        log.debug("Received keep-alive signal")
-
-    def on_request_error(self, status_code):
-        """This is called when a non-200 HTTP status code is encountered.
-
-        Parameters
-        ----------
-        status_code : int
-            The HTTP status code encountered
-        """
-        log.error("Stream encountered HTTP error: %d", status_code)
-
     def on_data(self, raw_data):
         """This is called when raw data is received from the stream.
         This method handles sending the data to other methods based on the
@@ -483,3 +507,463 @@ class Stream:
             The stall warning
         """
         log.warning("Received stall warning: %s", warning)
+
+
+class StreamingClient(BaseClient, BaseStream):
+    """Filter and sample realtime Tweets with Twitter API v2
+
+    .. versionadded:: 4.6
+
+    Parameters
+    ----------
+    bearer_token : str
+        Twitter API Bearer Token
+    return_type : Type[Union[dict, requests.Response, Response]]
+        Type to return from requests to the API
+    wait_on_rate_limit : bool
+        Whether to wait when rate limit is reached
+    chunk_size : int
+        The default socket.read size. Default to 512, less than half the size
+        of a Tweet so that it reads Tweets with the minimal latency of 2 reads
+        per Tweet. Values higher than ~1kb will increase latency by waiting for
+        more data to arrive but may also increase throughput by doing fewer
+        socket read calls.
+    daemon : bool
+        Whether or not to use a daemon thread when using a thread to run the
+        stream
+    max_retries : int
+        Max number of times to retry connecting the stream
+    proxy : Optional[str]
+        URL of the proxy to use when connecting to the stream
+    verify : Union[bool, str]
+        Either a boolean, in which case it controls whether to verify the
+        server’s TLS certificate, or a string, in which case it must be a path
+        to a CA bundle to use.
+
+    Attributes
+    ----------
+    running : bool
+        Whether there's currently a stream running
+    session : :class:`requests.Session`
+        Requests Session used to connect to the stream
+    thread : Optional[:class:`threading.Thread`]
+        Thread used to run the stream
+    user_agent : str
+        User agent used when connecting to the stream
+    """
+
+    def __init__(self, bearer_token, *, return_type=Response,
+                 wait_on_rate_limit=False, **kwargs):
+        """__init__( \
+            bearer_token, *, return_type=Response, wait_on_rate_limit=False, \
+            chunk_size=512, daemon=False, max_retries=inf, proxy=None, \
+            verify=True \
+        )
+        """
+        BaseClient.__init__(self, bearer_token, return_type=return_type,
+                            wait_on_rate_limit=wait_on_rate_limit)
+        BaseStream.__init__(self, **kwargs)
+
+    def _connect(self, method, endpoint, **kwargs):
+        self.session.headers["Authorization"] = f"Bearer {self.bearer_token}"
+        url = f"https://api.twitter.com/2/tweets/{endpoint}/stream"
+        super()._connect(method, url, **kwargs)
+
+    def _process_data(self, data, data_type=None):
+        if data_type is StreamRule:
+            if isinstance(data, list):
+                rules = []
+                for rule in data:
+                    if "tag" in rule:
+                        rules.append(StreamRule(
+                            value=rule["value"], id=rule["id"], tag=rule["tag"]
+                        ))
+                    else:
+                        rules.append(StreamRule(value=rule["value"],
+                                                id=rule["id"]))
+                return rules
+            elif data is not None:
+                if "tag" in data:
+                    return StreamRule(value=data["value"], id=data["id"],
+                                      tag=data["tag"])
+                else:
+                    return StreamRule(value=data["value"], id=data["id"])
+        else:
+            super()._process_data(data, data_type=data_type)
+
+    def add_rules(self, add, **params):
+        """add_rules(add, *, dry_run)
+
+        Add rules to filtered stream.
+
+        Parameters
+        ----------
+        add : Union[List[StreamRule], StreamRule]
+            Specifies the operation you want to perform on the rules.
+        dry_run : bool
+            Set to true to test a the syntax of your rule without submitting
+            it. This is useful if you want to check the syntax of a rule before
+            removing one or more of your existing rules.
+
+        Returns
+        -------
+        Union[dict, requests.Response, Response]
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules
+        """
+        json = {"add": []}
+        if isinstance(add, StreamRule):
+            add = (add,)
+        for rule in add:
+            if rule.tag is not None:
+                json["add"].append({"value": rule.value, "tag": rule.tag})
+            else:
+                json["add"].append({"value": rule.value})
+
+        return self._make_request(
+            "POST", f"/2/tweets/search/stream/rules", params=params,
+            endpoint_parameters=("dry_run",), json=json, data_type=StreamRule
+        )
+
+    def delete_rules(self, ids, **params):
+        """delete_rules(ids, *, dry_run)
+
+        Delete rules from filtered stream.
+
+        Parameters
+        ----------
+        ids : Union[int, str, List[Union[int, str, StreamRule]], StreamRule]
+            Array of rule IDs, each one representing a rule already active in
+            your stream. IDs must be submitted as strings.
+        dry_run : bool
+            Set to true to test a the syntax of your rule without submitting
+            it. This is useful if you want to check the syntax of a rule before
+            removing one or more of your existing rules.
+
+        Returns
+        -------
+        Union[dict, requests.Response, Response]
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules
+        """
+        json = {"delete": {"ids": []}}
+        if isinstance(ids, (int, str, StreamRule)):
+            ids = (ids,)
+        for id in ids:
+            if isinstance(id, StreamRule):
+                json["delete"]["ids"].append(str(StreamRule.id))
+            else:
+                json["delete"]["ids"].append(str(id))
+
+        return self._make_request(
+            "POST", f"/2/tweets/search/stream/rules", params=params,
+            endpoint_parameters=("dry_run",), json=json, data_type=StreamRule
+        )
+
+    def filter(self, *, threaded=False, **params):
+        """filter( \
+            *, backfill_minutes=None, expansions=None, media_fields=None, \
+            place_fields=None, poll_fields=None, tweet_fields=None, \
+            user_fields=None, threaded=False \
+        )
+
+        Streams Tweets in real-time based on a specific set of filter rules.
+
+        If you are using the academic research product track, you can connect
+        up to two `redundant connections <filter redundant connections_>`_ to
+        maximize your streaming up-time.
+
+        The Tweets returned by this endpoint count towards the Project-level
+        `Tweet cap`_.
+
+        Parameters
+        ----------
+        backfill_minutes : Optional[int]
+            By passing this parameter, you can request up to five (5) minutes
+            worth of streaming data that you might have missed during a
+            disconnection to be delivered to you upon reconnection. The
+            backfilled Tweets will automatically flow through the reconnected
+            stream, with older Tweets generally being delivered before any
+            newly matching Tweets. You must include a whole number between 1
+            and 5 as the value to this parameter.
+
+            This feature will deliver duplicate Tweets, meaning that if you
+            were disconnected for 90 seconds, and you requested two minutes of
+            backfill, you will receive 30 seconds worth of duplicate Tweets.
+            Due to this, you should make sure your system is tolerant of
+            duplicate data.
+
+            This feature is currently only available to the Academic Research
+            product track.
+        expansions : Union[List[str], str]
+            :ref:`expansions_parameter`
+        media_fields : Union[List[str], str]
+            :ref:`media_fields_parameter`
+        place_fields : Union[List[str], str]
+            :ref:`place_fields_parameter`
+        poll_fields : Union[List[str], str]
+            :ref:`poll_fields_parameter`
+        tweet_fields : Union[List[str], str]
+            :ref:`tweet_fields_parameter`
+        user_fields : Union[List[str], str]
+            :ref:`user_fields_parameter`
+        threaded : bool
+            Whether or not to use a thread to run the stream
+
+        Returns
+        -------
+        Optional[threading.Thread]
+            The thread if ``threaded`` is set to ``True``, else ``None``
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream
+
+        .. _filter redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/recovery-and-redundancy-features
+        .. _Tweet cap: https://developer.twitter.com/en/docs/twitter-api/tweet-caps
+        """
+        if self.running:
+            raise TweepyException("Stream is already connected")
+
+        method = "GET"
+        endpoint = "search"
+
+        params = self._process_params(
+            params, endpoint_parameters=(
+                "backfill_minutes", "expansions", "media.fields",
+                "place.fields", "poll.fields", "tweet.fields", "user.fields"
+            )
+        )
+
+        if threaded:
+            return self._threaded_connect(method, endpoint, params=params)
+        else:
+            self._connect(method, endpoint, params=params)
+
+    def get_rules(self, **params):
+        """get_rules(*, ids)
+
+        Return a list of rules currently active on the streaming endpoint,
+        either as a list or individually.
+
+        Parameters
+        ----------
+        ids : Union[List[str], str]
+            Comma-separated list of rule IDs. If omitted, all rules are
+            returned.
+
+        Returns
+        -------
+        Union[dict, requests.Response, Response]
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream-rules
+        """
+        return self._make_request(
+            "GET", f"/2/tweets/search/stream/rules", params=params,
+            endpoint_parameters=("ids",), data_type=StreamRule
+        )
+
+    def sample(self, *, threaded=False, **params):
+        """sample( \
+            *, backfill_minutes=None, expansions=None, media_fields=None, \
+            place_fields=None, poll_fields=None, tweet_fields=None, \
+            user_fields=None, threaded=False \
+        )
+
+        Streams about 1% of all Tweets in real-time.
+
+        If you are using the academic research product track, you can connect
+        up to two `redundant connections <sample redundant connections_>`_ to
+        maximize your streaming up-time.
+
+        Parameters
+        ----------
+        backfill_minutes : Optional[int]
+            By passing this parameter, you can request up to five (5) minutes
+            worth of streaming data that you might have missed during a
+            disconnection to be delivered to you upon reconnection. The
+            backfilled Tweets will automatically flow through the reconnected
+            stream, with older Tweets generally being delivered before any
+            newly matching Tweets. You must include a whole number between 1
+            and 5 as the value to this parameter.
+
+            This feature will deliver duplicate Tweets, meaning that if you
+            were disconnected for 90 seconds, and you requested two minutes of
+            backfill, you will receive 30 seconds worth of duplicate Tweets.
+            Due to this, you should make sure your system is tolerant of
+            duplicate data.
+
+            This feature is currently only available to the Academic Research
+            product track.
+        expansions : Union[List[str], str]
+            :ref:`expansions_parameter`
+        media_fields : Union[List[str], str]
+            :ref:`media_fields_parameter`
+        place_fields : Union[List[str], str]
+            :ref:`place_fields_parameter`
+        poll_fields : Union[List[str], str]
+            :ref:`poll_fields_parameter`
+        tweet_fields : Union[List[str], str]
+            :ref:`tweet_fields_parameter`
+        user_fields : Union[List[str], str]
+            :ref:`user_fields_parameter`
+        threaded : bool
+            Whether or not to use a thread to run the stream
+
+        Returns
+        -------
+        Optional[threading.Thread]
+            The thread if ``threaded`` is set to ``True``, else ``None``
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream
+
+        .. _sample redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/recovery-and-redundancy-features
+        """
+        if self.running:
+            raise TweepyException("Stream is already connected")
+
+        method = "GET"
+        endpoint = "sample"
+
+        params = self._process_params(
+            params, endpoint_parameters=(
+                "backfill_minutes", "expansions", "media.fields",
+                "place.fields", "poll.fields", "tweet.fields", "user.fields"
+            )
+        )
+
+        if threaded:
+            return self._threaded_connect(method, endpoint, params=params)
+        else:
+            self._connect(method, endpoint, params=params)
+
+    def on_data(self, raw_data):
+        """This is called when raw data is received from the stream.
+        This method handles sending the data to other methods.
+
+        Parameters
+        ----------
+        raw_data : JSON
+            The raw data from the stream
+
+        References
+        ----------
+        https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/consuming-streaming-data
+        """
+        data = json.loads(raw_data)
+
+        tweet = None
+        includes = {}
+        errors = []
+        matching_rules = []
+
+        if "data" in data:
+            tweet = Tweet(data["data"])
+            self.on_tweet(tweet)
+        if "includes" in data:
+            includes = self._process_includes(data["includes"])
+            self.on_includes(includes)
+        if "errors" in data:
+            errors = data["errors"]
+            self.on_errors(errors)
+        if "matching_rules" in data:
+            matching_rules = [
+                StreamRule(id=rule["id"], tag=rule["tag"])
+                for rule in data["matching_rules"]
+            ]
+            self.on_matching_rules(matching_rules)
+
+        self.on_response(
+            StreamResponse(tweet, includes, errors, matching_rules)
+        )
+
+    def on_tweet(self, tweet):
+        """This is called when a Tweet is received.
+
+        Parameters
+        ----------
+        status : Tweet
+            The Tweet received
+        """
+        pass
+
+    def on_includes(self, includes):
+        """This is called when includes are received.
+
+        Parameters
+        ----------
+        includes : dict
+            The includes received
+        """
+        pass
+
+    def on_errors(self, errors):
+        """This is called when errors are received.
+
+        Parameters
+        ----------
+        errors : dict
+            The errors received
+        """
+        log.error("Received errors: %s", errors)
+
+    def on_matching_rules(self, matching_rules):
+        """This is called when matching rules are received.
+        
+        Parameters
+        ----------
+        matching_rules : List[StreamRule]
+            The matching rules received
+        """
+        pass
+
+    def on_response(self, response):
+        """This is called when a response is received.
+
+        Parameters
+        ----------
+        response : StreamResponse
+            The response received
+        """
+        log.debug("Received response: %s", response)
+
+
+class StreamRule(NamedTuple):
+    """Rule for filtered stream
+
+    .. versionadded:: 4.6
+
+    Parameters
+    ----------
+    value : Optional[str]
+        The rule text. If you are using a `Standard Project`_ at the Basic
+        `access level`_, you can use the basic set of `operators`_, can submit
+        up to 25 concurrent rules, and can submit rules up to 512 characters
+        long. If you are using an `Academic Research Project`_ at the Basic
+        access level, you can use all available operators, can submit up to
+        1,000 concurrent rules, and can submit rules up to 1,024 characters
+        long.
+    tag : Optional[str]
+        The tag label. This is a free-form text you can use to identify the
+        rules that matched a specific Tweet in the streaming response. Tags can
+        be the same across rules.
+    id : Optional[str]
+        Unique identifier of this rule. This is returned as a string.
+
+
+    .. _Standard Project: https://developer.twitter.com/en/docs/projects
+    .. _access level: https://developer.twitter.com/en/products/twitter-api/early-access/guide#na_1
+    .. _operators: https://developer.twitter.com/en/docs/twitter-api/tweets/search/integrate/build-a-query
+    .. _Academic Research Project: https://developer.twitter.com/en/docs/projects
+    """
+    value: str = None
+    tag: str = None
+    id: str = None