From 86244c1a82a1852d04f3695b03201363f5d5eafd Mon Sep 17 00:00:00 2001 From: Harmon Date: Sun, 20 Feb 2022 00:18:00 -0600 Subject: [PATCH] Add support for streaming with Twitter API v2 Refactor Client and Stream to inherit from new BaseClient and BaseStream classes and add StreamingClient, StreamResponse, and StreamRule --- docs/conf.py | 2 + docs/index.rst | 1 + docs/stream.rst | 7 +- docs/streaming.rst | 126 +++++-- docs/streamingclient.rst | 27 ++ tweepy/__init__.py | 4 +- tweepy/client.py | 131 ++++---- tweepy/streaming.py | 694 +++++++++++++++++++++++++++++++++------ 8 files changed, 802 insertions(+), 190 deletions(-) create mode 100644 docs/streamingclient.rst diff --git a/docs/conf.py b/docs/conf.py index 4c53424..c2e6d8d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -38,6 +38,8 @@ hoverxref_auto_ref = True hoverxref_domains = ['py'] hoverxref_intersphinx = ['aiohttp', 'requests', 'requests_oauthlib'] +autoclass_content = "both" + intersphinx_mapping = { 'python': ('https://docs.python.org/3', None), 'aiohttp': ('https://docs.aiohttp.org/en/stable/', None), diff --git a/docs/index.rst b/docs/index.rst index cd79f5a..d3e9d65 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -18,6 +18,7 @@ Contents: client.rst models.rst stream.rst + streamingclient.rst asyncstream.rst exceptions.rst extended_tweets.rst diff --git a/docs/stream.rst b/docs/stream.rst index d5f2c6a..73b18ef 100644 --- a/docs/stream.rst +++ b/docs/stream.rst @@ -2,10 +2,11 @@ .. currentmodule:: tweepy -******************************************* -:class:`tweepy.Stream` --- Stream Reference -******************************************* +***************************************************** +:class:`tweepy.Stream` --- Twitter API v1.1 Reference +***************************************************** .. autoclass:: Stream :members: + :inherited-members: :member-order: bysource diff --git a/docs/streaming.rst b/docs/streaming.rst index 3f32d0d..95a3641 100644 --- a/docs/streaming.rst +++ b/docs/streaming.rst @@ -6,12 +6,6 @@ Streaming ********* -:class:`Stream` allows `filtering`_ and `sampling`_ of realtime Tweets using -Twitter's API. - -.. _filtering: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview -.. _sampling: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/overview - Streams utilize Streaming HTTP protocol to deliver data through an open, streaming API connection. Rather than delivering data in batches through repeated requests by your client app, as might be expected from a REST @@ -21,6 +15,17 @@ results in a low-latency delivery mechanism that can support very high throughput. For further information, see https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data +:class:`Stream` allows `filtering `_ and +`sampling `_ of realtime Tweets using Twitter API v1.1. + +:class:`StreamingClient` allows `filtering `_ and +`sampling `_ of realtime Tweets using Twitter API v2. + +.. _v1.1 filtering: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/overview +.. _v1.1 sampling: https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/overview +.. _v2 filtering: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/introduction +.. _v2 sampling: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/introduction + Using :class:`Stream` ===================== @@ -64,35 +69,110 @@ subclassed. For example, to print the IDs of every Tweet received:: ) printer.sample() +Using :class:`StreamingClient` +============================== + +To use :class:`StreamingClient`, an instance of it needs to be initialized with +a Twitter API Bearer Token:: + + import tweepy + + streaming_client = tweepy.StreamingClient("Bearer Token here") + +Then, :meth:`StreamingClient.sample` can be used to connect to and run a +sampling stream:: + + streaming_client.sample() + +Or :meth:`StreamingClient.add_rules` can be used to add rules before using +:meth:`StreamingClient.filter` to connect to and run a filtered stream:: + + streaming_client.add_rules(tweepy.StreamRule("Tweepy")) + streaming_client.filter() + +:meth:`StreamingClient.get_rules` can be used to retrieve existing rules and +:meth:`StreamingClient.delete_rules` can be used to delete rules. + +To learn how build rules, refer to the Twitter API +`Building rules for filtered stream`_ documentation. + +.. _Building rules for filtered stream: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/build-a-rule + +Data received from the stream is passed to :meth:`StreamingClient.on_data`. +This method handles sending the data to other methods. Tweets recieved are sent +to :meth:`StreamingClient.on_tweet`, ``includes`` data are sent to +:meth:`StreamingClient.on_includes`, errors are sent to +:meth:`StreamingClient.on_errors`, and matching rules are sent to +:meth:`StreamingClient.on_matching_rules`. A :class:`StreamResponse` instance +containing all four fields is sent to :meth:`StreamingClient.on_response`. By +default, only :meth:`StreamingClient.on_response` logs the data received, at +the ``DEBUG`` :ref:`logging level `. + +To customize the processing of the stream data, :class:`StreamingClient` needs to be +subclassed. For example, to print the IDs of every Tweet received:: + + class IDPrinter(tweepy.StreamingClient): + + def on_tweet(self, tweet): + print(tweet.id) + + + printer = IDPrinter("Bearer Token here") + printer.sample() + Threading ========= -Both :meth:`Stream.filter` and :meth:`Stream.sample` have a ``threaded`` -parameter. When set to ``True``, the stream will run in a separate -:ref:`thread `, which is returned by the call to either +:meth:`Stream.filter`, :meth:`Stream.sample`, :meth:`StreamingClient.filter`, +and :meth:`StreamingClient.sample` all have a ``threaded`` parameter. When set +to ``True``, the stream will run in a separate +:ref:`thread `, which is returned by the call to the method. For example:: thread = stream.filter(follow=[1072250532645998596], threaded=True) +or:: + + thread = streaming_client.sample(threaded=True) + Handling Errors =============== -:class:`Stream` has multiple methods to handle errors during streaming. -:meth:`Stream.on_closed` is called when the stream is closed by Twitter. -:meth:`Stream.on_connection_error` is called when the stream encounters a -connection error. :meth:`Stream.on_request_error` is called when an error is -encountered while trying to connect to the stream. When these errors are -encountered and ``max_retries``, which defaults to infinite, hasn't been -exceeded yet, the :class:`Stream` instance will attempt to reconnect the stream -after an appropriate amount of time. By default, all three of these methods log -an error. To customize that handling, they can be overridden in a subclass:: +Both :class:`Stream` and :class:`StreamingClient` have multiple methods to +handle errors during streaming. + +:meth:`Stream.on_closed` / :meth:`StreamingClient.on_closed` is called when the +stream is closed by Twitter. + +:meth:`Stream.on_connection_error` / +:meth:`StreamingClient.on_connection_error` is called when the stream +encounters a connection error. + +:meth:`Stream.on_request_error` / :meth:`StreamingClient.on_request_error` is +called when an error is encountered while trying to connect to the stream. + +When these errors are encountered and ``max_retries``, which defaults to +infinite, hasn't been exceeded yet, the :class:`Stream` / +:class:`StreamingClient` instance will attempt to reconnect the stream after an +appropriate amount of time. By default, both versions of all three of these +methods log an error. To customize that handling, they can be overridden in a +subclass:: class ConnectionTester(tweepy.Stream): def on_connection_error(self): self.disconnect() -:meth:`Stream.on_request_error` is also passed the HTTP status code that was -encountered. The HTTP status codes reference for the Twitter API can be found -at https://developer.twitter.com/en/support/twitter-api/error-troubleshooting. +:: + + class ConnectionTester(tweepy.StreamingClient): + + def on_connection_error(self): + self.disconnect() + +:meth:`Stream.on_request_error` / :meth:`StreamingClient.on_request_error` is +also passed the HTTP status code that was encountered. The HTTP status codes +reference for the Twitter API can be found at +https://developer.twitter.com/en/support/twitter-api/error-troubleshooting. -:meth:`Stream.on_exception` is called when an unhandled exception occurs. This -is fatal to the stream, and by default, an exception is logged. +:meth:`Stream.on_exception` / :meth:`StreamingClient.on_exception` is called +when an unhandled exception occurs. This is fatal to the stream, and by +default, an exception is logged. diff --git a/docs/streamingclient.rst b/docs/streamingclient.rst new file mode 100644 index 0000000..048f96b --- /dev/null +++ b/docs/streamingclient.rst @@ -0,0 +1,27 @@ +.. _streamingclient_reference: + +.. currentmodule:: tweepy + +************************************************************ +:class:`tweepy.StreamingClient` --- Twitter API v2 Reference +************************************************************ + +.. autoclass:: StreamingClient + :members: + :inherited-members: + :member-order: bysource + +``StreamResponse`` +================== +.. autoclass:: StreamResponse + + The :obj:`StreamResponse` returned by :meth:`StreamingClient.on_response` is + a :class:`collections.namedtuple`, with ``data``, ``includes``, ``errors``, + and ``matching_rules`` fields, corresponding with the fields in responses + from Twitter's API. + + .. versionadded:: 4.6 + +``StreamRule`` +============== +.. autoclass:: StreamRule diff --git a/tweepy/__init__.py b/tweepy/__init__.py index 10fe999..7c57202 100644 --- a/tweepy/__init__.py +++ b/tweepy/__init__.py @@ -27,7 +27,9 @@ from tweepy.pagination import Paginator from tweepy.place import Place from tweepy.poll import Poll from tweepy.space import Space -from tweepy.streaming import Stream +from tweepy.streaming import ( + Stream, StreamingClient, StreamResponse, StreamRule +) from tweepy.tweet import ReferencedTweet, Tweet from tweepy.user import User diff --git a/tweepy/client.py b/tweepy/client.py index 1af2c74..5a36d78 100644 --- a/tweepy/client.py +++ b/tweepy/client.py @@ -30,41 +30,7 @@ log = logging.getLogger(__name__) Response = namedtuple("Response", ("data", "includes", "errors", "meta")) -class Client: - """Client( \ - bearer_token=None, consumer_key=None, consumer_secret=None, \ - access_token=None, access_token_secret=None, *, return_type=Response, \ - wait_on_rate_limit=False \ - ) - - Twitter API v2 Client - - .. versionadded:: 4.0 - - Parameters - ---------- - bearer_token : Optional[str] - Twitter API Bearer Token - consumer_key : Optional[str] - Twitter API Consumer Key - consumer_secret : Optional[str] - Twitter API Consumer Secret - access_token : Optional[str] - Twitter API Access Token - access_token_secret : Optional[str] - Twitter API Access Token Secret - return_type : Type[Union[dict, requests.Response, Response]] - Type to return from requests to the API - wait_on_rate_limit : bool - Whether to wait when rate limit is reached - - Attributes - ---------- - session : requests.Session - Requests Session used to make requests to the API - user_agent : str - User agent used when making requests to the API - """ +class BaseClient: def __init__( self, bearer_token=None, consumer_key=None, consumer_secret=None, @@ -147,25 +113,7 @@ class Client: def _make_request(self, method, route, params={}, endpoint_parameters=None, json=None, data_type=None, user_auth=False): - request_params = {} - for param_name, param_value in params.items(): - if param_name.replace('_', '.') in endpoint_parameters: - param_name = param_name.replace('_', '.') - - if isinstance(param_value, list): - request_params[param_name] = ','.join(map(str, param_value)) - elif isinstance(param_value, datetime.datetime): - if param_value.tzinfo is not None: - param_value = param_value.astimezone(datetime.timezone.utc) - request_params[param_name] = param_value.strftime( - "%Y-%m-%dT%H:%M:%SZ" - ) - # TODO: Constant datetime format string? - else: - request_params[param_name] = param_value - - if param_name not in endpoint_parameters: - log.warn(f"Unexpected parameter: {param_name}") + request_params = self._process_params(params, endpoint_parameters) response = self.request(method, route, params=request_params, json=json, user_auth=user_auth) @@ -179,13 +127,25 @@ class Client: return response data = response.get("data") + data = self._process_data(data, data_type=data_type) + + includes = response.get("includes", {}) + includes = self._process_includes(includes) + + errors = response.get("errors", []) + meta = response.get("meta", {}) + + return Response(data, includes, errors, meta) + + def _process_data(self, data, data_type=None): if data_type is not None: if isinstance(data, list): data = [data_type(result) for result in data] elif data is not None: data = data_type(data) + return data - includes = response.get("includes", {}) + def _process_includes(self, includes): if "media" in includes: includes["media"] = [Media(media) for media in includes["media"]] if "places" in includes: @@ -196,11 +156,66 @@ class Client: includes["tweets"] = [Tweet(tweet) for tweet in includes["tweets"]] if "users" in includes: includes["users"] = [User(user) for user in includes["users"]] + return includes - errors = response.get("errors", []) - meta = response.get("meta", {}) + def _process_params(self, params, endpoint_parameters): + request_params = {} + for param_name, param_value in params.items(): + if param_name.replace('_', '.') in endpoint_parameters: + param_name = param_name.replace('_', '.') - return Response(data, includes, errors, meta) + if isinstance(param_value, list): + request_params[param_name] = ','.join(map(str, param_value)) + elif isinstance(param_value, datetime.datetime): + if param_value.tzinfo is not None: + param_value = param_value.astimezone(datetime.timezone.utc) + request_params[param_name] = param_value.strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + # TODO: Constant datetime format string? + else: + request_params[param_name] = param_value + + if param_name not in endpoint_parameters: + log.warn(f"Unexpected parameter: {param_name}") + return request_params + + +class Client(BaseClient): + """Client( \ + bearer_token=None, consumer_key=None, consumer_secret=None, \ + access_token=None, access_token_secret=None, *, return_type=Response, \ + wait_on_rate_limit=False \ + ) + + Twitter API v2 Client + + .. versionadded:: 4.0 + + Parameters + ---------- + bearer_token : Optional[str] + Twitter API Bearer Token + consumer_key : Optional[str] + Twitter API Consumer Key + consumer_secret : Optional[str] + Twitter API Consumer Secret + access_token : Optional[str] + Twitter API Access Token + access_token_secret : Optional[str] + Twitter API Access Token Secret + return_type : Type[Union[dict, requests.Response, Response]] + Type to return from requests to the API + wait_on_rate_limit : bool + Whether to wait when rate limit is reached + + Attributes + ---------- + session : requests.Session + Requests Session used to make requests to the API + user_agent : str + User agent used when making requests to the API + """ # Hide replies diff --git a/tweepy/streaming.py b/tweepy/streaming.py index 1bbc2f9..6c338cf 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -4,6 +4,7 @@ # Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets +from collections import namedtuple import json import logging from math import inf @@ -11,68 +12,29 @@ from platform import python_version import ssl from threading import Thread from time import sleep +from typing import NamedTuple import requests from requests_oauthlib import OAuth1 import urllib3 import tweepy +from tweepy.client import BaseClient, Response from tweepy.errors import TweepyException from tweepy.models import Status +from tweepy.tweet import Tweet log = logging.getLogger(__name__) +StreamResponse = namedtuple( + "StreamResponse", ("data", "includes", "errors", "matching_rules") +) -class Stream: - """Filter and sample realtime Tweets - Parameters - ---------- - consumer_key : str - Twitter API Consumer Key - consumer_secret : str - Twitter API Consumer Secret - access_token: str - Twitter API Access Token - access_token_secret : str - Twitter API Access Token Secret - chunk_size : int - The default socket.read size. Default to 512, less than half the size - of a Tweet so that it reads Tweets with the minimal latency of 2 reads - per Tweet. Values higher than ~1kb will increase latency by waiting for - more data to arrive but may also increase throughput by doing fewer - socket read calls. - daemon : bool - Whether or not to use a daemon thread when using a thread to run the - stream - max_retries : int - Max number of times to retry connecting the stream - proxy : Optional[str] - URL of the proxy to use when connecting to the stream - verify : Union[bool, str] - Either a boolean, in which case it controls whether to verify the - server’s TLS certificate, or a string, in which case it must be a path - to a CA bundle to use. - - Attributes - ---------- - running : bool - Whether there's currently a stream running - session : :class:`requests.Session` - Requests Session used to connect to the stream - thread : Optional[:class:`threading.Thread`] - Thread used to run the stream - user_agent : str - User agent used when connecting to the stream - """ +class BaseStream: - def __init__(self, consumer_key, consumer_secret, access_token, - access_token_secret, *, chunk_size=512, daemon=False, - max_retries=inf, proxy=None, verify=True): - self.consumer_key = consumer_key - self.consumer_secret = consumer_secret - self.access_token = access_token - self.access_token_secret = access_token_secret + def __init__(self, *, chunk_size=512, daemon=False, max_retries=inf, + proxy=None, verify=True): self.chunk_size = chunk_size self.daemon = daemon self.max_retries = max_retries @@ -88,11 +50,14 @@ class Stream: f"Tweepy/{tweepy.__version__}" ) - def _connect(self, method, endpoint, params=None, headers=None, body=None): + def _connect(self, method, url, auth=None, params=None, headers=None, + body=None): self.running = True error_count = 0 # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting + # https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/handling-disconnections + # https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/handling-disconnections stall_timeout = 90 network_error_wait = network_error_wait_step = 0.25 network_error_wait_max = 16 @@ -100,13 +65,8 @@ class Stream: http_error_wait_max = 320 http_420_error_wait_start = 60 - auth = OAuth1(self.consumer_key, self.consumer_secret, - self.access_token, self.access_token_secret) - self.session.headers["User-Agent"] = self.user_agent - url = f"https://stream.twitter.com/1.1/{endpoint}.json" - try: while self.running and error_count <= self.max_retries: try: @@ -185,6 +145,121 @@ class Stream: self.thread.start() return self.thread + def disconnect(self): + """Disconnect the stream""" + self.running = False + + def on_closed(self, response): + """This is called when the stream has been closed by Twitter. + + Parameters + ---------- + response : requests.Response + The Response from Twitter + """ + log.error("Stream connection closed by Twitter") + + def on_connect(self): + """This is called after successfully connecting to the streaming API. + """ + log.info("Stream connected") + + def on_connection_error(self): + """This is called when the stream connection errors or times out.""" + log.error("Stream connection has errored or timed out") + + def on_disconnect(self): + """This is called when the stream has disconnected.""" + log.info("Stream disconnected") + + def on_exception(self, exception): + """This is called when an unhandled exception occurs. + + Parameters + ---------- + exception : Exception + The unhandled exception + """ + log.exception("Stream encountered an exception") + + def on_keep_alive(self): + """This is called when a keep-alive signal is received.""" + log.debug("Received keep-alive signal") + + def on_request_error(self, status_code): + """This is called when a non-200 HTTP status code is encountered. + + Parameters + ---------- + status_code : int + The HTTP status code encountered + """ + log.error("Stream encountered HTTP error: %d", status_code) + + +class Stream(BaseStream): + """Filter and sample realtime Tweets with Twitter API v1.1 + + Parameters + ---------- + consumer_key : str + Twitter API Consumer Key + consumer_secret : str + Twitter API Consumer Secret + access_token: str + Twitter API Access Token + access_token_secret : str + Twitter API Access Token Secret + chunk_size : int + The default socket.read size. Default to 512, less than half the size + of a Tweet so that it reads Tweets with the minimal latency of 2 reads + per Tweet. Values higher than ~1kb will increase latency by waiting for + more data to arrive but may also increase throughput by doing fewer + socket read calls. + daemon : bool + Whether or not to use a daemon thread when using a thread to run the + stream + max_retries : int + Max number of times to retry connecting the stream + proxy : Optional[str] + URL of the proxy to use when connecting to the stream + verify : Union[bool, str] + Either a boolean, in which case it controls whether to verify the + server’s TLS certificate, or a string, in which case it must be a path + to a CA bundle to use. + + Attributes + ---------- + running : bool + Whether there's currently a stream running + session : :class:`requests.Session` + Requests Session used to connect to the stream + thread : Optional[:class:`threading.Thread`] + Thread used to run the stream + user_agent : str + User agent used when connecting to the stream + """ + + def __init__(self, consumer_key, consumer_secret, access_token, + access_token_secret, **kwargs): + """__init__( \ + consumer_key, consumer_secret, access_token, access_token_secret, \ + chunk_size=512, daemon=False, max_retries=inf, proxy=None, \ + verify=True \ + ) + """ + self.consumer_key = consumer_key + self.consumer_secret = consumer_secret + self.access_token = access_token + self.access_token_secret = access_token_secret + super().__init__(**kwargs) + + def _connect(self, method, endpoint, **kwargs): + auth = OAuth1(self.consumer_key, self.consumer_secret, + self.access_token, self.access_token_secret) + url = f"https://stream.twitter.com/1.1/{endpoint}.json" + super()._connect(method, url, auth=auth **kwargs) + def filter(self, *, follow=None, track=None, locations=None, filter_level=None, languages=None, stall_warnings=False, threaded=False): @@ -313,57 +388,6 @@ class Stream: else: self._connect(method, endpoint, params=params) - def disconnect(self): - """Disconnect the stream""" - self.running = False - - def on_closed(self, response): - """This is called when the stream has been closed by Twitter. - - Parameters - ---------- - response : requests.Response - The Response from Twitter - """ - log.error("Stream connection closed by Twitter") - - def on_connect(self): - """This is called after successfully connecting to the streaming API. - """ - log.info("Stream connected") - - def on_connection_error(self): - """This is called when the stream connection errors or times out.""" - log.error("Stream connection has errored or timed out") - - def on_disconnect(self): - """This is called when the stream has disconnected.""" - log.info("Stream disconnected") - - def on_exception(self, exception): - """This is called when an unhandled exception occurs. - - Parameters - ---------- - exception : Exception - The unhandled exception - """ - log.exception("Stream encountered an exception") - - def on_keep_alive(self): - """This is called when a keep-alive signal is received.""" - log.debug("Received keep-alive signal") - - def on_request_error(self, status_code): - """This is called when a non-200 HTTP status code is encountered. - - Parameters - ---------- - status_code : int - The HTTP status code encountered - """ - log.error("Stream encountered HTTP error: %d", status_code) - def on_data(self, raw_data): """This is called when raw data is received from the stream. This method handles sending the data to other methods based on the @@ -483,3 +507,463 @@ class Stream: The stall warning """ log.warning("Received stall warning: %s", warning) + + +class StreamingClient(BaseClient, BaseStream): + """Filter and sample realtime Tweets with Twitter API v2 + + .. versionadded:: 4.6 + + Parameters + ---------- + bearer_token : str + Twitter API Bearer Token + return_type : Type[Union[dict, requests.Response, Response]] + Type to return from requests to the API + wait_on_rate_limit : bool + Whether to wait when rate limit is reached + chunk_size : int + The default socket.read size. Default to 512, less than half the size + of a Tweet so that it reads Tweets with the minimal latency of 2 reads + per Tweet. Values higher than ~1kb will increase latency by waiting for + more data to arrive but may also increase throughput by doing fewer + socket read calls. + daemon : bool + Whether or not to use a daemon thread when using a thread to run the + stream + max_retries : int + Max number of times to retry connecting the stream + proxy : Optional[str] + URL of the proxy to use when connecting to the stream + verify : Union[bool, str] + Either a boolean, in which case it controls whether to verify the + server’s TLS certificate, or a string, in which case it must be a path + to a CA bundle to use. + + Attributes + ---------- + running : bool + Whether there's currently a stream running + session : :class:`requests.Session` + Requests Session used to connect to the stream + thread : Optional[:class:`threading.Thread`] + Thread used to run the stream + user_agent : str + User agent used when connecting to the stream + """ + + def __init__(self, bearer_token, *, return_type=Response, + wait_on_rate_limit=False, **kwargs): + """__init__( \ + bearer_token, *, return_type=Response, wait_on_rate_limit=False, \ + chunk_size=512, daemon=False, max_retries=inf, proxy=None, \ + verify=True \ + ) + """ + BaseClient.__init__(self, bearer_token, return_type=return_type, + wait_on_rate_limit=wait_on_rate_limit) + BaseStream.__init__(self, **kwargs) + + def _connect(self, method, endpoint, **kwargs): + self.session.headers["Authorization"] = f"Bearer {self.bearer_token}" + url = f"https://api.twitter.com/2/tweets/{endpoint}/stream" + super()._connect(method, url, **kwargs) + + def _process_data(self, data, data_type=None): + if data_type is StreamRule: + if isinstance(data, list): + rules = [] + for rule in data: + if "tag" in rule: + rules.append(StreamRule( + value=rule["value"], id=rule["id"], tag=rule["tag"] + )) + else: + rules.append(StreamRule(value=rule["value"], + id=rule["id"])) + return rules + elif data is not None: + if "tag" in data: + return StreamRule(value=data["value"], id=data["id"], + tag=data["tag"]) + else: + return StreamRule(value=data["value"], id=data["id"]) + else: + super()._process_data(data, data_type=data_type) + + def add_rules(self, add, **params): + """add_rules(add, *, dry_run) + + Add rules to filtered stream. + + Parameters + ---------- + add : Union[List[StreamRule], StreamRule] + Specifies the operation you want to perform on the rules. + dry_run : bool + Set to true to test a the syntax of your rule without submitting + it. This is useful if you want to check the syntax of a rule before + removing one or more of your existing rules. + + Returns + ------- + Union[dict, requests.Response, Response] + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules + """ + json = {"add": []} + if isinstance(add, StreamRule): + add = (add,) + for rule in add: + if rule.tag is not None: + json["add"].append({"value": rule.value, "tag": rule.tag}) + else: + json["add"].append({"value": rule.value}) + + return self._make_request( + "POST", f"/2/tweets/search/stream/rules", params=params, + endpoint_parameters=("dry_run",), json=json, data_type=StreamRule + ) + + def delete_rules(self, ids, **params): + """delete_rules(ids, *, dry_run) + + Delete rules from filtered stream. + + Parameters + ---------- + ids : Union[int, str, List[Union[int, str, StreamRule]], StreamRule] + Array of rule IDs, each one representing a rule already active in + your stream. IDs must be submitted as strings. + dry_run : bool + Set to true to test a the syntax of your rule without submitting + it. This is useful if you want to check the syntax of a rule before + removing one or more of your existing rules. + + Returns + ------- + Union[dict, requests.Response, Response] + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules + """ + json = {"delete": {"ids": []}} + if isinstance(ids, (int, str, StreamRule)): + ids = (ids,) + for id in ids: + if isinstance(id, StreamRule): + json["delete"]["ids"].append(str(StreamRule.id)) + else: + json["delete"]["ids"].append(str(id)) + + return self._make_request( + "POST", f"/2/tweets/search/stream/rules", params=params, + endpoint_parameters=("dry_run",), json=json, data_type=StreamRule + ) + + def filter(self, *, threaded=False, **params): + """filter( \ + *, backfill_minutes=None, expansions=None, media_fields=None, \ + place_fields=None, poll_fields=None, tweet_fields=None, \ + user_fields=None, threaded=False \ + ) + + Streams Tweets in real-time based on a specific set of filter rules. + + If you are using the academic research product track, you can connect + up to two `redundant connections `_ to + maximize your streaming up-time. + + The Tweets returned by this endpoint count towards the Project-level + `Tweet cap`_. + + Parameters + ---------- + backfill_minutes : Optional[int] + By passing this parameter, you can request up to five (5) minutes + worth of streaming data that you might have missed during a + disconnection to be delivered to you upon reconnection. The + backfilled Tweets will automatically flow through the reconnected + stream, with older Tweets generally being delivered before any + newly matching Tweets. You must include a whole number between 1 + and 5 as the value to this parameter. + + This feature will deliver duplicate Tweets, meaning that if you + were disconnected for 90 seconds, and you requested two minutes of + backfill, you will receive 30 seconds worth of duplicate Tweets. + Due to this, you should make sure your system is tolerant of + duplicate data. + + This feature is currently only available to the Academic Research + product track. + expansions : Union[List[str], str] + :ref:`expansions_parameter` + media_fields : Union[List[str], str] + :ref:`media_fields_parameter` + place_fields : Union[List[str], str] + :ref:`place_fields_parameter` + poll_fields : Union[List[str], str] + :ref:`poll_fields_parameter` + tweet_fields : Union[List[str], str] + :ref:`tweet_fields_parameter` + user_fields : Union[List[str], str] + :ref:`user_fields_parameter` + threaded : bool + Whether or not to use a thread to run the stream + + Returns + ------- + Optional[threading.Thread] + The thread if ``threaded`` is set to ``True``, else ``None`` + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream + + .. _filter redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/recovery-and-redundancy-features + .. _Tweet cap: https://developer.twitter.com/en/docs/twitter-api/tweet-caps + """ + if self.running: + raise TweepyException("Stream is already connected") + + method = "GET" + endpoint = "search" + + params = self._process_params( + params, endpoint_parameters=( + "backfill_minutes", "expansions", "media.fields", + "place.fields", "poll.fields", "tweet.fields", "user.fields" + ) + ) + + if threaded: + return self._threaded_connect(method, endpoint, params=params) + else: + self._connect(method, endpoint, params=params) + + def get_rules(self, **params): + """get_rules(*, ids) + + Return a list of rules currently active on the streaming endpoint, + either as a list or individually. + + Parameters + ---------- + ids : Union[List[str], str] + Comma-separated list of rule IDs. If omitted, all rules are + returned. + + Returns + ------- + Union[dict, requests.Response, Response] + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream-rules + """ + return self._make_request( + "GET", f"/2/tweets/search/stream/rules", params=params, + endpoint_parameters=("ids",), data_type=StreamRule + ) + + def sample(self, *, threaded=False, **params): + """sample( \ + *, backfill_minutes=None, expansions=None, media_fields=None, \ + place_fields=None, poll_fields=None, tweet_fields=None, \ + user_fields=None, threaded=False \ + ) + + Streams about 1% of all Tweets in real-time. + + If you are using the academic research product track, you can connect + up to two `redundant connections `_ to + maximize your streaming up-time. + + Parameters + ---------- + backfill_minutes : Optional[int] + By passing this parameter, you can request up to five (5) minutes + worth of streaming data that you might have missed during a + disconnection to be delivered to you upon reconnection. The + backfilled Tweets will automatically flow through the reconnected + stream, with older Tweets generally being delivered before any + newly matching Tweets. You must include a whole number between 1 + and 5 as the value to this parameter. + + This feature will deliver duplicate Tweets, meaning that if you + were disconnected for 90 seconds, and you requested two minutes of + backfill, you will receive 30 seconds worth of duplicate Tweets. + Due to this, you should make sure your system is tolerant of + duplicate data. + + This feature is currently only available to the Academic Research + product track. + expansions : Union[List[str], str] + :ref:`expansions_parameter` + media_fields : Union[List[str], str] + :ref:`media_fields_parameter` + place_fields : Union[List[str], str] + :ref:`place_fields_parameter` + poll_fields : Union[List[str], str] + :ref:`poll_fields_parameter` + tweet_fields : Union[List[str], str] + :ref:`tweet_fields_parameter` + user_fields : Union[List[str], str] + :ref:`user_fields_parameter` + threaded : bool + Whether or not to use a thread to run the stream + + Returns + ------- + Optional[threading.Thread] + The thread if ``threaded`` is set to ``True``, else ``None`` + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream + + .. _sample redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/recovery-and-redundancy-features + """ + if self.running: + raise TweepyException("Stream is already connected") + + method = "GET" + endpoint = "sample" + + params = self._process_params( + params, endpoint_parameters=( + "backfill_minutes", "expansions", "media.fields", + "place.fields", "poll.fields", "tweet.fields", "user.fields" + ) + ) + + if threaded: + return self._threaded_connect(method, endpoint, params=params) + else: + self._connect(method, endpoint, params=params) + + def on_data(self, raw_data): + """This is called when raw data is received from the stream. + This method handles sending the data to other methods. + + Parameters + ---------- + raw_data : JSON + The raw data from the stream + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/consuming-streaming-data + """ + data = json.loads(raw_data) + + tweet = None + includes = {} + errors = [] + matching_rules = [] + + if "data" in data: + tweet = Tweet(data["data"]) + self.on_tweet(tweet) + if "includes" in data: + includes = self._process_includes(data["includes"]) + self.on_includes(includes) + if "errors" in data: + errors = data["errors"] + self.on_errors(errors) + if "matching_rules" in data: + matching_rules = [ + StreamRule(id=rule["id"], tag=rule["tag"]) + for rule in data["matching_rules"] + ] + self.on_matching_rules(matching_rules) + + self.on_response( + StreamResponse(tweet, includes, errors, matching_rules) + ) + + def on_tweet(self, tweet): + """This is called when a Tweet is received. + + Parameters + ---------- + status : Tweet + The Tweet received + """ + pass + + def on_includes(self, includes): + """This is called when includes are received. + + Parameters + ---------- + includes : dict + The includes received + """ + pass + + def on_errors(self, errors): + """This is called when errors are received. + + Parameters + ---------- + errors : dict + The errors received + """ + log.error("Received errors: %s", errors) + + def on_matching_rules(self, matching_rules): + """This is called when matching rules are received. + + Parameters + ---------- + matching_rules : List[StreamRule] + The matching rules received + """ + pass + + def on_response(self, response): + """This is called when a response is received. + + Parameters + ---------- + response : StreamResponse + The response received + """ + log.debug("Received response: %s", response) + + +class StreamRule(NamedTuple): + """Rule for filtered stream + + .. versionadded:: 4.6 + + Parameters + ---------- + value : Optional[str] + The rule text. If you are using a `Standard Project`_ at the Basic + `access level`_, you can use the basic set of `operators`_, can submit + up to 25 concurrent rules, and can submit rules up to 512 characters + long. If you are using an `Academic Research Project`_ at the Basic + access level, you can use all available operators, can submit up to + 1,000 concurrent rules, and can submit rules up to 1,024 characters + long. + tag : Optional[str] + The tag label. This is a free-form text you can use to identify the + rules that matched a specific Tweet in the streaming response. Tags can + be the same across rules. + id : Optional[str] + Unique identifier of this rule. This is returned as a string. + + + .. _Standard Project: https://developer.twitter.com/en/docs/projects + .. _access level: https://developer.twitter.com/en/products/twitter-api/early-access/guide#na_1 + .. _operators: https://developer.twitter.com/en/docs/twitter-api/tweets/search/integrate/build-a-query + .. _Academic Research Project: https://developer.twitter.com/en/docs/projects + """ + value: str = None + tag: str = None + id: str = None -- 2.25.1