Streaming
*********
-:class:`Stream` allows `filtering`_ and `sampling`_ of realtime Tweets using
-Twitter's API.
-
-.. _filtering: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
-.. _sampling: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/overview
-
Streams utilize Streaming HTTP protocol to deliver data through
an open, streaming API connection. Rather than delivering data in batches
through repeated requests by your client app, as might be expected from a REST
throughput. For further information, see
https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data
+:class:`Stream` allows `filtering <v1.1 filtering_>`_ and
+`sampling <v1.1 sampling_>`_ of realtime Tweets using Twitter API v1.1.
+
+:class:`StreamingClient` allows `filtering <v2 filtering_>`_ and
+`sampling <v2 sampling_>`_ of realtime Tweets using Twitter API v2.
+
+.. _v1.1 filtering: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview
+.. _v1.1 sampling: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/overview
+.. _v2 filtering: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/introduction
+.. _v2 sampling: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/introduction
+
Using :class:`Stream`
=====================
)
printer.sample()
+Using :class:`StreamingClient`
+==============================
+
+To use :class:`StreamingClient`, an instance of it needs to be initialized with
+a Twitter API Bearer Token::
+
+ import tweepy
+
+ streaming_client = tweepy.StreamingClient("Bearer Token here")
+
+Then, :meth:`StreamingClient.sample` can be used to connect to and run a
+sampling stream::
+
+ streaming_client.sample()
+
+Or :meth:`StreamingClient.add_rules` can be used to add rules before using
+:meth:`StreamingClient.filter` to connect to and run a filtered stream::
+
+ streaming_client.add_rules(tweepy.StreamRule("Tweepy"))
+ streaming_client.filter()
+
+:meth:`StreamingClient.get_rules` can be used to retrieve existing rules and
+:meth:`StreamingClient.delete_rules` can be used to delete rules.
+
+To learn how build rules, refer to the Twitter API
+`Building rules for filtered stream`_ documentation.
+
+.. _Building rules for filtered stream: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule
+
+Data received from the stream is passed to :meth:`StreamingClient.on_data`.
+This method handles sending the data to other methods. Tweets recieved are sent
+to :meth:`StreamingClient.on_tweet`, ``includes`` data are sent to
+:meth:`StreamingClient.on_includes`, errors are sent to
+:meth:`StreamingClient.on_errors`, and matching rules are sent to
+:meth:`StreamingClient.on_matching_rules`. A :class:`StreamResponse` instance
+containing all four fields is sent to :meth:`StreamingClient.on_response`. By
+default, only :meth:`StreamingClient.on_response` logs the data received, at
+the ``DEBUG`` :ref:`logging level <python:levels>`.
+
+To customize the processing of the stream data, :class:`StreamingClient` needs to be
+subclassed. For example, to print the IDs of every Tweet received::
+
+ class IDPrinter(tweepy.StreamingClient):
+
+ def on_tweet(self, tweet):
+ print(tweet.id)
+
+
+ printer = IDPrinter("Bearer Token here")
+ printer.sample()
+
Threading
=========
-Both :meth:`Stream.filter` and :meth:`Stream.sample` have a ``threaded``
-parameter. When set to ``True``, the stream will run in a separate
-:ref:`thread <python:thread-objects>`, which is returned by the call to either
+:meth:`Stream.filter`, :meth:`Stream.sample`, :meth:`StreamingClient.filter`,
+and :meth:`StreamingClient.sample` all have a ``threaded`` parameter. When set
+to ``True``, the stream will run in a separate
+:ref:`thread <python:thread-objects>`, which is returned by the call to the
method. For example::
thread = stream.filter(follow=[1072250532645998596], threaded=True)
+or::
+
+ thread = streaming_client.sample(threaded=True)
+
Handling Errors
===============
-:class:`Stream` has multiple methods to handle errors during streaming.
-:meth:`Stream.on_closed` is called when the stream is closed by Twitter.
-:meth:`Stream.on_connection_error` is called when the stream encounters a
-connection error. :meth:`Stream.on_request_error` is called when an error is
-encountered while trying to connect to the stream. When these errors are
-encountered and ``max_retries``, which defaults to infinite, hasn't been
-exceeded yet, the :class:`Stream` instance will attempt to reconnect the stream
-after an appropriate amount of time. By default, all three of these methods log
-an error. To customize that handling, they can be overridden in a subclass::
+Both :class:`Stream` and :class:`StreamingClient` have multiple methods to
+handle errors during streaming.
+
+:meth:`Stream.on_closed` / :meth:`StreamingClient.on_closed` is called when the
+stream is closed by Twitter.
+
+:meth:`Stream.on_connection_error` /
+:meth:`StreamingClient.on_connection_error` is called when the stream
+encounters a connection error.
+
+:meth:`Stream.on_request_error` / :meth:`StreamingClient.on_request_error` is
+called when an error is encountered while trying to connect to the stream.
+
+When these errors are encountered and ``max_retries``, which defaults to
+infinite, hasn't been exceeded yet, the :class:`Stream` /
+:class:`StreamingClient` instance will attempt to reconnect the stream after an
+appropriate amount of time. By default, both versions of all three of these
+methods log an error. To customize that handling, they can be overridden in a
+subclass::
class ConnectionTester(tweepy.Stream):
def on_connection_error(self):
self.disconnect()
-:meth:`Stream.on_request_error` is also passed the HTTP status code that was
-encountered. The HTTP status codes reference for the Twitter API can be found
-at https://developer.twitter.com/en/support/twitter-api/error-troubleshooting.
+::
+
+ class ConnectionTester(tweepy.StreamingClient):
+
+ def on_connection_error(self):
+ self.disconnect()
+
+:meth:`Stream.on_request_error` / :meth:`StreamingClient.on_request_error` is
+also passed the HTTP status code that was encountered. The HTTP status codes
+reference for the Twitter API can be found at
+https://developer.twitter.com/en/support/twitter-api/error-troubleshooting.
-:meth:`Stream.on_exception` is called when an unhandled exception occurs. This
-is fatal to the stream, and by default, an exception is logged.
+:meth:`Stream.on_exception` / :meth:`StreamingClient.on_exception` is called
+when an unhandled exception occurs. This is fatal to the stream, and by
+default, an exception is logged.
Response = namedtuple("Response", ("data", "includes", "errors", "meta"))
-class Client:
- """Client( \
- bearer_token=None, consumer_key=None, consumer_secret=None, \
- access_token=None, access_token_secret=None, *, return_type=Response, \
- wait_on_rate_limit=False \
- )
-
- Twitter API v2 Client
-
- .. versionadded:: 4.0
-
- Parameters
- ----------
- bearer_token : Optional[str]
- Twitter API Bearer Token
- consumer_key : Optional[str]
- Twitter API Consumer Key
- consumer_secret : Optional[str]
- Twitter API Consumer Secret
- access_token : Optional[str]
- Twitter API Access Token
- access_token_secret : Optional[str]
- Twitter API Access Token Secret
- return_type : Type[Union[dict, requests.Response, Response]]
- Type to return from requests to the API
- wait_on_rate_limit : bool
- Whether to wait when rate limit is reached
-
- Attributes
- ----------
- session : requests.Session
- Requests Session used to make requests to the API
- user_agent : str
- User agent used when making requests to the API
- """
+class BaseClient:
def __init__(
self, bearer_token=None, consumer_key=None, consumer_secret=None,
def _make_request(self, method, route, params={}, endpoint_parameters=None,
json=None, data_type=None, user_auth=False):
- request_params = {}
- for param_name, param_value in params.items():
- if param_name.replace('_', '.') in endpoint_parameters:
- param_name = param_name.replace('_', '.')
-
- if isinstance(param_value, list):
- request_params[param_name] = ','.join(map(str, param_value))
- elif isinstance(param_value, datetime.datetime):
- if param_value.tzinfo is not None:
- param_value = param_value.astimezone(datetime.timezone.utc)
- request_params[param_name] = param_value.strftime(
- "%Y-%m-%dT%H:%M:%SZ"
- )
- # TODO: Constant datetime format string?
- else:
- request_params[param_name] = param_value
-
- if param_name not in endpoint_parameters:
- log.warn(f"Unexpected parameter: {param_name}")
+ request_params = self._process_params(params, endpoint_parameters)
response = self.request(method, route, params=request_params,
json=json, user_auth=user_auth)
return response
data = response.get("data")
+ data = self._process_data(data, data_type=data_type)
+
+ includes = response.get("includes", {})
+ includes = self._process_includes(includes)
+
+ errors = response.get("errors", [])
+ meta = response.get("meta", {})
+
+ return Response(data, includes, errors, meta)
+
+ def _process_data(self, data, data_type=None):
if data_type is not None:
if isinstance(data, list):
data = [data_type(result) for result in data]
elif data is not None:
data = data_type(data)
+ return data
- includes = response.get("includes", {})
+ def _process_includes(self, includes):
if "media" in includes:
includes["media"] = [Media(media) for media in includes["media"]]
if "places" in includes:
includes["tweets"] = [Tweet(tweet) for tweet in includes["tweets"]]
if "users" in includes:
includes["users"] = [User(user) for user in includes["users"]]
+ return includes
- errors = response.get("errors", [])
- meta = response.get("meta", {})
+ def _process_params(self, params, endpoint_parameters):
+ request_params = {}
+ for param_name, param_value in params.items():
+ if param_name.replace('_', '.') in endpoint_parameters:
+ param_name = param_name.replace('_', '.')
- return Response(data, includes, errors, meta)
+ if isinstance(param_value, list):
+ request_params[param_name] = ','.join(map(str, param_value))
+ elif isinstance(param_value, datetime.datetime):
+ if param_value.tzinfo is not None:
+ param_value = param_value.astimezone(datetime.timezone.utc)
+ request_params[param_name] = param_value.strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
+ # TODO: Constant datetime format string?
+ else:
+ request_params[param_name] = param_value
+
+ if param_name not in endpoint_parameters:
+ log.warn(f"Unexpected parameter: {param_name}")
+ return request_params
+
+
+class Client(BaseClient):
+ """Client( \
+ bearer_token=None, consumer_key=None, consumer_secret=None, \
+ access_token=None, access_token_secret=None, *, return_type=Response, \
+ wait_on_rate_limit=False \
+ )
+
+ Twitter API v2 Client
+
+ .. versionadded:: 4.0
+
+ Parameters
+ ----------
+ bearer_token : Optional[str]
+ Twitter API Bearer Token
+ consumer_key : Optional[str]
+ Twitter API Consumer Key
+ consumer_secret : Optional[str]
+ Twitter API Consumer Secret
+ access_token : Optional[str]
+ Twitter API Access Token
+ access_token_secret : Optional[str]
+ Twitter API Access Token Secret
+ return_type : Type[Union[dict, requests.Response, Response]]
+ Type to return from requests to the API
+ wait_on_rate_limit : bool
+ Whether to wait when rate limit is reached
+
+ Attributes
+ ----------
+ session : requests.Session
+ Requests Session used to make requests to the API
+ user_agent : str
+ User agent used when making requests to the API
+ """
# Hide replies
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
+from collections import namedtuple
import json
import logging
from math import inf
import ssl
from threading import Thread
from time import sleep
+from typing import NamedTuple
import requests
from requests_oauthlib import OAuth1
import urllib3
import tweepy
+from tweepy.client import BaseClient, Response
from tweepy.errors import TweepyException
from tweepy.models import Status
+from tweepy.tweet import Tweet
log = logging.getLogger(__name__)
+StreamResponse = namedtuple(
+ "StreamResponse", ("data", "includes", "errors", "matching_rules")
+)
-class Stream:
- """Filter and sample realtime Tweets
- Parameters
- ----------
- consumer_key : str
- Twitter API Consumer Key
- consumer_secret : str
- Twitter API Consumer Secret
- access_token: str
- Twitter API Access Token
- access_token_secret : str
- Twitter API Access Token Secret
- chunk_size : int
- The default socket.read size. Default to 512, less than half the size
- of a Tweet so that it reads Tweets with the minimal latency of 2 reads
- per Tweet. Values higher than ~1kb will increase latency by waiting for
- more data to arrive but may also increase throughput by doing fewer
- socket read calls.
- daemon : bool
- Whether or not to use a daemon thread when using a thread to run the
- stream
- max_retries : int
- Max number of times to retry connecting the stream
- proxy : Optional[str]
- URL of the proxy to use when connecting to the stream
- verify : Union[bool, str]
- Either a boolean, in which case it controls whether to verify the
- server’s TLS certificate, or a string, in which case it must be a path
- to a CA bundle to use.
-
- Attributes
- ----------
- running : bool
- Whether there's currently a stream running
- session : :class:`requests.Session`
- Requests Session used to connect to the stream
- thread : Optional[:class:`threading.Thread`]
- Thread used to run the stream
- user_agent : str
- User agent used when connecting to the stream
- """
+class BaseStream:
- def __init__(self, consumer_key, consumer_secret, access_token,
- access_token_secret, *, chunk_size=512, daemon=False,
- max_retries=inf, proxy=None, verify=True):
- self.consumer_key = consumer_key
- self.consumer_secret = consumer_secret
- self.access_token = access_token
- self.access_token_secret = access_token_secret
+ def __init__(self, *, chunk_size=512, daemon=False, max_retries=inf,
+ proxy=None, verify=True):
self.chunk_size = chunk_size
self.daemon = daemon
self.max_retries = max_retries
f"Tweepy/{tweepy.__version__}"
)
- def _connect(self, method, endpoint, params=None, headers=None, body=None):
+ def _connect(self, method, url, auth=None, params=None, headers=None,
+ body=None):
self.running = True
error_count = 0
# https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting
+ # https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/handling-disconnections
+ # https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/handling-disconnections
stall_timeout = 90
network_error_wait = network_error_wait_step = 0.25
network_error_wait_max = 16
http_error_wait_max = 320
http_420_error_wait_start = 60
- auth = OAuth1(self.consumer_key, self.consumer_secret,
- self.access_token, self.access_token_secret)
-
self.session.headers["User-Agent"] = self.user_agent
- url = f"https://stream.twitter.com/1.1/{endpoint}.json"
-
try:
while self.running and error_count <= self.max_retries:
try:
self.thread.start()
return self.thread
+ def disconnect(self):
+ """Disconnect the stream"""
+ self.running = False
+
+ def on_closed(self, response):
+ """This is called when the stream has been closed by Twitter.
+
+ Parameters
+ ----------
+ response : requests.Response
+ The Response from Twitter
+ """
+ log.error("Stream connection closed by Twitter")
+
+ def on_connect(self):
+ """This is called after successfully connecting to the streaming API.
+ """
+ log.info("Stream connected")
+
+ def on_connection_error(self):
+ """This is called when the stream connection errors or times out."""
+ log.error("Stream connection has errored or timed out")
+
+ def on_disconnect(self):
+ """This is called when the stream has disconnected."""
+ log.info("Stream disconnected")
+
+ def on_exception(self, exception):
+ """This is called when an unhandled exception occurs.
+
+ Parameters
+ ----------
+ exception : Exception
+ The unhandled exception
+ """
+ log.exception("Stream encountered an exception")
+
+ def on_keep_alive(self):
+ """This is called when a keep-alive signal is received."""
+ log.debug("Received keep-alive signal")
+
+ def on_request_error(self, status_code):
+ """This is called when a non-200 HTTP status code is encountered.
+
+ Parameters
+ ----------
+ status_code : int
+ The HTTP status code encountered
+ """
+ log.error("Stream encountered HTTP error: %d", status_code)
+
+
+class Stream(BaseStream):
+ """Filter and sample realtime Tweets with Twitter API v1.1
+
+ Parameters
+ ----------
+ consumer_key : str
+ Twitter API Consumer Key
+ consumer_secret : str
+ Twitter API Consumer Secret
+ access_token: str
+ Twitter API Access Token
+ access_token_secret : str
+ Twitter API Access Token Secret
+ chunk_size : int
+ The default socket.read size. Default to 512, less than half the size
+ of a Tweet so that it reads Tweets with the minimal latency of 2 reads
+ per Tweet. Values higher than ~1kb will increase latency by waiting for
+ more data to arrive but may also increase throughput by doing fewer
+ socket read calls.
+ daemon : bool
+ Whether or not to use a daemon thread when using a thread to run the
+ stream
+ max_retries : int
+ Max number of times to retry connecting the stream
+ proxy : Optional[str]
+ URL of the proxy to use when connecting to the stream
+ verify : Union[bool, str]
+ Either a boolean, in which case it controls whether to verify the
+ server’s TLS certificate, or a string, in which case it must be a path
+ to a CA bundle to use.
+
+ Attributes
+ ----------
+ running : bool
+ Whether there's currently a stream running
+ session : :class:`requests.Session`
+ Requests Session used to connect to the stream
+ thread : Optional[:class:`threading.Thread`]
+ Thread used to run the stream
+ user_agent : str
+ User agent used when connecting to the stream
+ """
+
+ def __init__(self, consumer_key, consumer_secret, access_token,
+ access_token_secret, **kwargs):
+ """__init__( \
+ consumer_key, consumer_secret, access_token, access_token_secret, \
+ chunk_size=512, daemon=False, max_retries=inf, proxy=None, \
+ verify=True \
+ )
+ """
+ self.consumer_key = consumer_key
+ self.consumer_secret = consumer_secret
+ self.access_token = access_token
+ self.access_token_secret = access_token_secret
+ super().__init__(**kwargs)
+
+ def _connect(self, method, endpoint, **kwargs):
+ auth = OAuth1(self.consumer_key, self.consumer_secret,
+ self.access_token, self.access_token_secret)
+ url = f"https://stream.twitter.com/1.1/{endpoint}.json"
+ super()._connect(method, url, auth=auth **kwargs)
+
def filter(self, *, follow=None, track=None, locations=None,
filter_level=None, languages=None, stall_warnings=False,
threaded=False):
else:
self._connect(method, endpoint, params=params)
- def disconnect(self):
- """Disconnect the stream"""
- self.running = False
-
- def on_closed(self, response):
- """This is called when the stream has been closed by Twitter.
-
- Parameters
- ----------
- response : requests.Response
- The Response from Twitter
- """
- log.error("Stream connection closed by Twitter")
-
- def on_connect(self):
- """This is called after successfully connecting to the streaming API.
- """
- log.info("Stream connected")
-
- def on_connection_error(self):
- """This is called when the stream connection errors or times out."""
- log.error("Stream connection has errored or timed out")
-
- def on_disconnect(self):
- """This is called when the stream has disconnected."""
- log.info("Stream disconnected")
-
- def on_exception(self, exception):
- """This is called when an unhandled exception occurs.
-
- Parameters
- ----------
- exception : Exception
- The unhandled exception
- """
- log.exception("Stream encountered an exception")
-
- def on_keep_alive(self):
- """This is called when a keep-alive signal is received."""
- log.debug("Received keep-alive signal")
-
- def on_request_error(self, status_code):
- """This is called when a non-200 HTTP status code is encountered.
-
- Parameters
- ----------
- status_code : int
- The HTTP status code encountered
- """
- log.error("Stream encountered HTTP error: %d", status_code)
-
def on_data(self, raw_data):
"""This is called when raw data is received from the stream.
This method handles sending the data to other methods based on the
The stall warning
"""
log.warning("Received stall warning: %s", warning)
+
+
+class StreamingClient(BaseClient, BaseStream):
+ """Filter and sample realtime Tweets with Twitter API v2
+
+ .. versionadded:: 4.6
+
+ Parameters
+ ----------
+ bearer_token : str
+ Twitter API Bearer Token
+ return_type : Type[Union[dict, requests.Response, Response]]
+ Type to return from requests to the API
+ wait_on_rate_limit : bool
+ Whether to wait when rate limit is reached
+ chunk_size : int
+ The default socket.read size. Default to 512, less than half the size
+ of a Tweet so that it reads Tweets with the minimal latency of 2 reads
+ per Tweet. Values higher than ~1kb will increase latency by waiting for
+ more data to arrive but may also increase throughput by doing fewer
+ socket read calls.
+ daemon : bool
+ Whether or not to use a daemon thread when using a thread to run the
+ stream
+ max_retries : int
+ Max number of times to retry connecting the stream
+ proxy : Optional[str]
+ URL of the proxy to use when connecting to the stream
+ verify : Union[bool, str]
+ Either a boolean, in which case it controls whether to verify the
+ server’s TLS certificate, or a string, in which case it must be a path
+ to a CA bundle to use.
+
+ Attributes
+ ----------
+ running : bool
+ Whether there's currently a stream running
+ session : :class:`requests.Session`
+ Requests Session used to connect to the stream
+ thread : Optional[:class:`threading.Thread`]
+ Thread used to run the stream
+ user_agent : str
+ User agent used when connecting to the stream
+ """
+
+ def __init__(self, bearer_token, *, return_type=Response,
+ wait_on_rate_limit=False, **kwargs):
+ """__init__( \
+ bearer_token, *, return_type=Response, wait_on_rate_limit=False, \
+ chunk_size=512, daemon=False, max_retries=inf, proxy=None, \
+ verify=True \
+ )
+ """
+ BaseClient.__init__(self, bearer_token, return_type=return_type,
+ wait_on_rate_limit=wait_on_rate_limit)
+ BaseStream.__init__(self, **kwargs)
+
+ def _connect(self, method, endpoint, **kwargs):
+ self.session.headers["Authorization"] = f"Bearer {self.bearer_token}"
+ url = f"https://api.twitter.com/2/tweets/{endpoint}/stream"
+ super()._connect(method, url, **kwargs)
+
+ def _process_data(self, data, data_type=None):
+ if data_type is StreamRule:
+ if isinstance(data, list):
+ rules = []
+ for rule in data:
+ if "tag" in rule:
+ rules.append(StreamRule(
+ value=rule["value"], id=rule["id"], tag=rule["tag"]
+ ))
+ else:
+ rules.append(StreamRule(value=rule["value"],
+ id=rule["id"]))
+ return rules
+ elif data is not None:
+ if "tag" in data:
+ return StreamRule(value=data["value"], id=data["id"],
+ tag=data["tag"])
+ else:
+ return StreamRule(value=data["value"], id=data["id"])
+ else:
+ super()._process_data(data, data_type=data_type)
+
+ def add_rules(self, add, **params):
+ """add_rules(add, *, dry_run)
+
+ Add rules to filtered stream.
+
+ Parameters
+ ----------
+ add : Union[List[StreamRule], StreamRule]
+ Specifies the operation you want to perform on the rules.
+ dry_run : bool
+ Set to true to test a the syntax of your rule without submitting
+ it. This is useful if you want to check the syntax of a rule before
+ removing one or more of your existing rules.
+
+ Returns
+ -------
+ Union[dict, requests.Response, Response]
+
+ References
+ ----------
+ https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules
+ """
+ json = {"add": []}
+ if isinstance(add, StreamRule):
+ add = (add,)
+ for rule in add:
+ if rule.tag is not None:
+ json["add"].append({"value": rule.value, "tag": rule.tag})
+ else:
+ json["add"].append({"value": rule.value})
+
+ return self._make_request(
+ "POST", f"/2/tweets/search/stream/rules", params=params,
+ endpoint_parameters=("dry_run",), json=json, data_type=StreamRule
+ )
+
+ def delete_rules(self, ids, **params):
+ """delete_rules(ids, *, dry_run)
+
+ Delete rules from filtered stream.
+
+ Parameters
+ ----------
+ ids : Union[int, str, List[Union[int, str, StreamRule]], StreamRule]
+ Array of rule IDs, each one representing a rule already active in
+ your stream. IDs must be submitted as strings.
+ dry_run : bool
+ Set to true to test a the syntax of your rule without submitting
+ it. This is useful if you want to check the syntax of a rule before
+ removing one or more of your existing rules.
+
+ Returns
+ -------
+ Union[dict, requests.Response, Response]
+
+ References
+ ----------
+ https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules
+ """
+ json = {"delete": {"ids": []}}
+ if isinstance(ids, (int, str, StreamRule)):
+ ids = (ids,)
+ for id in ids:
+ if isinstance(id, StreamRule):
+ json["delete"]["ids"].append(str(StreamRule.id))
+ else:
+ json["delete"]["ids"].append(str(id))
+
+ return self._make_request(
+ "POST", f"/2/tweets/search/stream/rules", params=params,
+ endpoint_parameters=("dry_run",), json=json, data_type=StreamRule
+ )
+
+ def filter(self, *, threaded=False, **params):
+ """filter( \
+ *, backfill_minutes=None, expansions=None, media_fields=None, \
+ place_fields=None, poll_fields=None, tweet_fields=None, \
+ user_fields=None, threaded=False \
+ )
+
+ Streams Tweets in real-time based on a specific set of filter rules.
+
+ If you are using the academic research product track, you can connect
+ up to two `redundant connections <filter redundant connections_>`_ to
+ maximize your streaming up-time.
+
+ The Tweets returned by this endpoint count towards the Project-level
+ `Tweet cap`_.
+
+ Parameters
+ ----------
+ backfill_minutes : Optional[int]
+ By passing this parameter, you can request up to five (5) minutes
+ worth of streaming data that you might have missed during a
+ disconnection to be delivered to you upon reconnection. The
+ backfilled Tweets will automatically flow through the reconnected
+ stream, with older Tweets generally being delivered before any
+ newly matching Tweets. You must include a whole number between 1
+ and 5 as the value to this parameter.
+
+ This feature will deliver duplicate Tweets, meaning that if you
+ were disconnected for 90 seconds, and you requested two minutes of
+ backfill, you will receive 30 seconds worth of duplicate Tweets.
+ Due to this, you should make sure your system is tolerant of
+ duplicate data.
+
+ This feature is currently only available to the Academic Research
+ product track.
+ expansions : Union[List[str], str]
+ :ref:`expansions_parameter`
+ media_fields : Union[List[str], str]
+ :ref:`media_fields_parameter`
+ place_fields : Union[List[str], str]
+ :ref:`place_fields_parameter`
+ poll_fields : Union[List[str], str]
+ :ref:`poll_fields_parameter`
+ tweet_fields : Union[List[str], str]
+ :ref:`tweet_fields_parameter`
+ user_fields : Union[List[str], str]
+ :ref:`user_fields_parameter`
+ threaded : bool
+ Whether or not to use a thread to run the stream
+
+ Returns
+ -------
+ Optional[threading.Thread]
+ The thread if ``threaded`` is set to ``True``, else ``None``
+
+ References
+ ----------
+ https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream
+
+ .. _filter redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/recovery-and-redundancy-features
+ .. _Tweet cap: https://developer.twitter.com/en/docs/twitter-api/tweet-caps
+ """
+ if self.running:
+ raise TweepyException("Stream is already connected")
+
+ method = "GET"
+ endpoint = "search"
+
+ params = self._process_params(
+ params, endpoint_parameters=(
+ "backfill_minutes", "expansions", "media.fields",
+ "place.fields", "poll.fields", "tweet.fields", "user.fields"
+ )
+ )
+
+ if threaded:
+ return self._threaded_connect(method, endpoint, params=params)
+ else:
+ self._connect(method, endpoint, params=params)
+
+ def get_rules(self, **params):
+ """get_rules(*, ids)
+
+ Return a list of rules currently active on the streaming endpoint,
+ either as a list or individually.
+
+ Parameters
+ ----------
+ ids : Union[List[str], str]
+ Comma-separated list of rule IDs. If omitted, all rules are
+ returned.
+
+ Returns
+ -------
+ Union[dict, requests.Response, Response]
+
+ References
+ ----------
+ https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream-rules
+ """
+ return self._make_request(
+ "GET", f"/2/tweets/search/stream/rules", params=params,
+ endpoint_parameters=("ids",), data_type=StreamRule
+ )
+
+ def sample(self, *, threaded=False, **params):
+ """sample( \
+ *, backfill_minutes=None, expansions=None, media_fields=None, \
+ place_fields=None, poll_fields=None, tweet_fields=None, \
+ user_fields=None, threaded=False \
+ )
+
+ Streams about 1% of all Tweets in real-time.
+
+ If you are using the academic research product track, you can connect
+ up to two `redundant connections <sample redundant connections_>`_ to
+ maximize your streaming up-time.
+
+ Parameters
+ ----------
+ backfill_minutes : Optional[int]
+ By passing this parameter, you can request up to five (5) minutes
+ worth of streaming data that you might have missed during a
+ disconnection to be delivered to you upon reconnection. The
+ backfilled Tweets will automatically flow through the reconnected
+ stream, with older Tweets generally being delivered before any
+ newly matching Tweets. You must include a whole number between 1
+ and 5 as the value to this parameter.
+
+ This feature will deliver duplicate Tweets, meaning that if you
+ were disconnected for 90 seconds, and you requested two minutes of
+ backfill, you will receive 30 seconds worth of duplicate Tweets.
+ Due to this, you should make sure your system is tolerant of
+ duplicate data.
+
+ This feature is currently only available to the Academic Research
+ product track.
+ expansions : Union[List[str], str]
+ :ref:`expansions_parameter`
+ media_fields : Union[List[str], str]
+ :ref:`media_fields_parameter`
+ place_fields : Union[List[str], str]
+ :ref:`place_fields_parameter`
+ poll_fields : Union[List[str], str]
+ :ref:`poll_fields_parameter`
+ tweet_fields : Union[List[str], str]
+ :ref:`tweet_fields_parameter`
+ user_fields : Union[List[str], str]
+ :ref:`user_fields_parameter`
+ threaded : bool
+ Whether or not to use a thread to run the stream
+
+ Returns
+ -------
+ Optional[threading.Thread]
+ The thread if ``threaded`` is set to ``True``, else ``None``
+
+ References
+ ----------
+ https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream
+
+ .. _sample redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/recovery-and-redundancy-features
+ """
+ if self.running:
+ raise TweepyException("Stream is already connected")
+
+ method = "GET"
+ endpoint = "sample"
+
+ params = self._process_params(
+ params, endpoint_parameters=(
+ "backfill_minutes", "expansions", "media.fields",
+ "place.fields", "poll.fields", "tweet.fields", "user.fields"
+ )
+ )
+
+ if threaded:
+ return self._threaded_connect(method, endpoint, params=params)
+ else:
+ self._connect(method, endpoint, params=params)
+
+ def on_data(self, raw_data):
+ """This is called when raw data is received from the stream.
+ This method handles sending the data to other methods.
+
+ Parameters
+ ----------
+ raw_data : JSON
+ The raw data from the stream
+
+ References
+ ----------
+ https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/consuming-streaming-data
+ """
+ data = json.loads(raw_data)
+
+ tweet = None
+ includes = {}
+ errors = []
+ matching_rules = []
+
+ if "data" in data:
+ tweet = Tweet(data["data"])
+ self.on_tweet(tweet)
+ if "includes" in data:
+ includes = self._process_includes(data["includes"])
+ self.on_includes(includes)
+ if "errors" in data:
+ errors = data["errors"]
+ self.on_errors(errors)
+ if "matching_rules" in data:
+ matching_rules = [
+ StreamRule(id=rule["id"], tag=rule["tag"])
+ for rule in data["matching_rules"]
+ ]
+ self.on_matching_rules(matching_rules)
+
+ self.on_response(
+ StreamResponse(tweet, includes, errors, matching_rules)
+ )
+
+ def on_tweet(self, tweet):
+ """This is called when a Tweet is received.
+
+ Parameters
+ ----------
+ status : Tweet
+ The Tweet received
+ """
+ pass
+
+ def on_includes(self, includes):
+ """This is called when includes are received.
+
+ Parameters
+ ----------
+ includes : dict
+ The includes received
+ """
+ pass
+
+ def on_errors(self, errors):
+ """This is called when errors are received.
+
+ Parameters
+ ----------
+ errors : dict
+ The errors received
+ """
+ log.error("Received errors: %s", errors)
+
+ def on_matching_rules(self, matching_rules):
+ """This is called when matching rules are received.
+
+ Parameters
+ ----------
+ matching_rules : List[StreamRule]
+ The matching rules received
+ """
+ pass
+
+ def on_response(self, response):
+ """This is called when a response is received.
+
+ Parameters
+ ----------
+ response : StreamResponse
+ The response received
+ """
+ log.debug("Received response: %s", response)
+
+
+class StreamRule(NamedTuple):
+ """Rule for filtered stream
+
+ .. versionadded:: 4.6
+
+ Parameters
+ ----------
+ value : Optional[str]
+ The rule text. If you are using a `Standard Project`_ at the Basic
+ `access level`_, you can use the basic set of `operators`_, can submit
+ up to 25 concurrent rules, and can submit rules up to 512 characters
+ long. If you are using an `Academic Research Project`_ at the Basic
+ access level, you can use all available operators, can submit up to
+ 1,000 concurrent rules, and can submit rules up to 1,024 characters
+ long.
+ tag : Optional[str]
+ The tag label. This is a free-form text you can use to identify the
+ rules that matched a specific Tweet in the streaming response. Tags can
+ be the same across rules.
+ id : Optional[str]
+ Unique identifier of this rule. This is returned as a string.
+
+
+ .. _Standard Project: https://developer.twitter.com/en/docs/projects
+ .. _access level: https://developer.twitter.com/en/products/twitter-api/early-access/guide#na_1
+ .. _operators: https://developer.twitter.com/en/docs/twitter-api/tweets/search/integrate/build-a-query
+ .. _Academic Research Project: https://developer.twitter.com/en/docs/projects
+ """
+ value: str = None
+ tag: str = None
+ id: str = None