From 9051ba64bc0610c9e0027e53f6e32a72de67d1e2 Mon Sep 17 00:00:00 2001 From: Harmon Date: Wed, 18 May 2022 19:53:35 -0500 Subject: [PATCH] Add asynchronous.AsyncStreamingClient --- docs/asyncstream.rst | 1 + docs/asyncstreamingclient.rst | 12 + docs/index.rst | 3 + docs/streamingclient.rst | 16 - docs/streamresponse.rst | 15 + docs/streamrule.rst | 7 + tweepy/asynchronous/__init__.py | 2 +- tweepy/asynchronous/streaming.py | 690 +++++++++++++++++++++++++------ 8 files changed, 612 insertions(+), 134 deletions(-) create mode 100644 docs/asyncstreamingclient.rst create mode 100644 docs/streamresponse.rst create mode 100644 docs/streamrule.rst diff --git a/docs/asyncstream.rst b/docs/asyncstream.rst index da4123a..f77b1d8 100644 --- a/docs/asyncstream.rst +++ b/docs/asyncstream.rst @@ -8,4 +8,5 @@ .. autoclass:: AsyncStream :members: + :inherited-members: :member-order: bysource diff --git a/docs/asyncstreamingclient.rst b/docs/asyncstreamingclient.rst new file mode 100644 index 0000000..a4834ed --- /dev/null +++ b/docs/asyncstreamingclient.rst @@ -0,0 +1,12 @@ +.. _asyncstreamingclient_reference: + +.. currentmodule:: tweepy.asynchronous + +***************************** +:class:`AsyncStreamingClient` +***************************** + +.. autoclass:: AsyncStreamingClient + :members: + :inherited-members: + :member-order: bysource diff --git a/docs/index.rst b/docs/index.rst index 41465f9..2ac23a3 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -38,7 +38,10 @@ Contents: client.rst response.rst streamingclient.rst + streamresponse.rst + streamrule.rst asyncclient.rst + asyncstreamingclient.rst exceptions.rst expansions_and_fields.rst v2_models.rst diff --git a/docs/streamingclient.rst b/docs/streamingclient.rst index 9a95772..3069447 100644 --- a/docs/streamingclient.rst +++ b/docs/streamingclient.rst @@ -10,19 +10,3 @@ :members: :inherited-members: :member-order: bysource - -``StreamResponse`` -================== -.. autoclass:: StreamResponse - :class-doc-from: class - - The :obj:`StreamResponse` returned by :meth:`StreamingClient.on_response` is - a :class:`collections.namedtuple`, with ``data``, ``includes``, ``errors``, - and ``matching_rules`` fields, corresponding with the fields in responses - from Twitter's API. - - .. versionadded:: 4.6 - -``StreamRule`` -============== -.. autoclass:: StreamRule diff --git a/docs/streamresponse.rst b/docs/streamresponse.rst new file mode 100644 index 0000000..9e8d8ff --- /dev/null +++ b/docs/streamresponse.rst @@ -0,0 +1,15 @@ +.. _streamresponse: + +.. currentmodule:: tweepy + +``StreamResponse`` +================== +.. autoclass:: StreamResponse + :class-doc-from: class + + The :obj:`StreamResponse` returned by :meth:`StreamingClient.on_response` is + a :class:`collections.namedtuple`, with ``data``, ``includes``, ``errors``, + and ``matching_rules`` fields, corresponding with the fields in responses + from Twitter's API. + + .. versionadded:: 4.6 diff --git a/docs/streamrule.rst b/docs/streamrule.rst new file mode 100644 index 0000000..ea65ed5 --- /dev/null +++ b/docs/streamrule.rst @@ -0,0 +1,7 @@ +.. _streamrule: + +.. currentmodule:: tweepy + +``StreamRule`` +============== +.. autoclass:: StreamRule diff --git a/tweepy/asynchronous/__init__.py b/tweepy/asynchronous/__init__.py index ff28002..d5dc1a8 100644 --- a/tweepy/asynchronous/__init__.py +++ b/tweepy/asynchronous/__init__.py @@ -19,5 +19,5 @@ except ModuleNotFoundError: "installed" ) -from tweepy.asynchronous.streaming import AsyncStream +from tweepy.asynchronous.streaming import AsyncStream, AsyncStreamingClient from tweepy.asynchronous.client import AsyncClient diff --git a/tweepy/asynchronous/streaming.py b/tweepy/asynchronous/streaming.py index 3bf1723..21241e3 100644 --- a/tweepy/asynchronous/streaming.py +++ b/tweepy/asynchronous/streaming.py @@ -13,48 +13,19 @@ from oauthlib.oauth1 import Client as OAuthClient from yarl import URL import tweepy +from tweepy.asynchronous.client import AsyncBaseClient +from tweepy.client import Response from tweepy.errors import TweepyException from tweepy.models import Status +from tweepy.streaming import StreamResponse, StreamRule +from tweepy.tweet import Tweet log = logging.getLogger(__name__) -class AsyncStream: - """Stream realtime Tweets asynchronously with Twitter API v1.1 - - .. versionadded:: 4.0 - - Parameters - ---------- - consumer_key: str - Twitter API Consumer Key - consumer_secret: str - Twitter API Consumer Secret - access_token: str - Twitter API Access Token - access_token_secret: str - Twitter API Access Token Secret - max_retries: int | None - Number of times to attempt to (re)connect the stream. - proxy: str | None - Proxy URL +class AsyncBaseStream: - Attributes - ---------- - session : aiohttp.ClientSession | None - Aiohttp client session used to connect to the API - task : asyncio.Task | None - The task running the stream - user_agent : str - User agent used when connecting to the API - """ - - def __init__(self, consumer_key, consumer_secret, access_token, - access_token_secret, *, max_retries=inf, proxy=None): - self.consumer_key = consumer_key - self.consumer_secret = consumer_secret - self.access_token = access_token - self.access_token_secret = access_token_secret + def __init__(self, *, max_retries=inf, proxy=None): self.max_retries = max_retries self.proxy = proxy @@ -66,10 +37,13 @@ class AsyncStream: f"Tweepy/{tweepy.__version__}" ) - async def _connect(self, method, endpoint, params={}, headers=None, - body=None): + async def _connect( + self, method, url, params=None, headers=None, body=None + ): error_count = 0 # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting + # https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/handling-disconnections + # https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/handling-disconnections stall_timeout = 90 network_error_wait = network_error_wait_step = 0.25 network_error_wait_max = 16 @@ -77,27 +51,18 @@ class AsyncStream: http_error_wait_max = 320 http_420_error_wait_start = 60 - oauth_client = OAuthClient(self.consumer_key, self.consumer_secret, - self.access_token, self.access_token_secret) - if self.session is None or self.session.closed: self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(sock_read=stall_timeout) ) self.session.headers["User-Agent"] = self.user_agent - url = f"https://stream.twitter.com/1.1/{endpoint}.json" - url = str(URL(url).with_query(sorted(params.items()))) - try: while error_count <= self.max_retries: - request_url, request_headers, request_body = oauth_client.sign( - url, method, body, headers - ) try: async with self.session.request( - method, request_url, headers=request_headers, - data=request_body, proxy=self.proxy + method, url, params=params, headers=headers, data=body, + proxy=self.proxy ) as resp: if resp.status == 200: error_count = 0 @@ -146,6 +111,131 @@ class AsyncStream: await self.session.close() await self.on_disconnect() + def disconnect(self): + """Disconnect the stream""" + if self.task is not None: + self.task.cancel() + + async def on_closed(self, resp): + """|coroutine| + + This is called when the stream has been closed by Twitter. + + Parameters + ---------- + response : aiohttp.ClientResponse + The response from Twitter + """ + log.error("Stream connection closed by Twitter") + + async def on_connect(self): + """|coroutine| + + This is called after successfully connecting to the streaming API. + """ + log.info("Stream connected") + + async def on_connection_error(self): + """|coroutine| + + This is called when the stream connection errors or times out. + """ + log.error("Stream connection has errored or timed out") + + async def on_disconnect(self): + """|coroutine| + + This is called when the stream has disconnected. + """ + log.info("Stream disconnected") + + async def on_exception(self, exception): + """|coroutine| + + This is called when an unhandled exception occurs. + + Parameters + ---------- + exception : Exception + The unhandled exception + """ + log.exception("Stream encountered an exception") + + async def on_keep_alive(self): + """|coroutine| + + This is called when a keep-alive signal is received. + """ + log.debug("Received keep-alive signal") + + async def on_request_error(self, status_code): + """|coroutine| + + This is called when a non-200 HTTP status code is encountered. + + Parameters + ---------- + status_code : int + The HTTP status code encountered + """ + log.error("Stream encountered HTTP Error: %d", status_code) + + +class AsyncStream(AsyncBaseStream): + """Stream realtime Tweets asynchronously with Twitter API v1.1 + + .. versionadded:: 4.0 + + Parameters + ---------- + consumer_key: str + Twitter API Consumer Key + consumer_secret: str + Twitter API Consumer Secret + access_token: str + Twitter API Access Token + access_token_secret: str + Twitter API Access Token Secret + max_retries: int | None + Number of times to attempt to (re)connect the stream. + proxy: str | None + Proxy URL + + Attributes + ---------- + session : aiohttp.ClientSession | None + Aiohttp client session used to connect to the API + task : asyncio.Task | None + The task running the stream + user_agent : str + User agent used when connecting to the API + """ + + def __init__(self, consumer_key, consumer_secret, access_token, + access_token_secret, **kwargs): + """__init__( \ + consumer_key, consumer_secret, access_token, access_token_secret, \ + *, max_retries=inf, proxy=None \ + ) + """ + self.consumer_key = consumer_key + self.consumer_secret = consumer_secret + self.access_token = access_token + self.access_token_secret = access_token_secret + super().__init__(**kwargs) + + async def _connect( + self, method, endpoint, params={}, headers=None, body=None + ): + oauth_client = OAuthClient(self.consumer_key, self.consumer_secret, + self.access_token, self.access_token_secret) + url = f"https://stream.twitter.com/1.1/{endpoint}.json" + url = str(URL(url).with_query(sorted(params.items()))) + url, headers, body = oauth_client.sign( + url, http_method=method, headers=headers, body=body + ) + await super()._connect(method, url, headers=headers, body=body) + def filter(self, *, follow=None, track=None, locations=None, filter_level=None, languages=None, stall_warnings=False): """Filter realtime Tweets @@ -286,75 +376,6 @@ class AsyncStream: # Use name parameter when support for Python 3.7 is dropped return self.task - def disconnect(self): - """Disconnect the stream""" - if self.task is not None: - self.task.cancel() - - async def on_closed(self, resp): - """|coroutine| - - This is called when the stream has been closed by Twitter. - - Parameters - ---------- - response : aiohttp.ClientResponse - The response from Twitter - """ - log.error("Stream connection closed by Twitter") - - async def on_connect(self): - """|coroutine| - - This is called after successfully connecting to the streaming API. - """ - log.info("Stream connected") - - async def on_connection_error(self): - """|coroutine| - - This is called when the stream connection errors or times out. - """ - log.error("Stream connection has errored or timed out") - - async def on_disconnect(self): - """|coroutine| - - This is called when the stream has disconnected. - """ - log.info("Stream disconnected") - - async def on_exception(self, exception): - """|coroutine| - - This is called when an unhandled exception occurs. - - Parameters - ---------- - exception : Exception - The unhandled exception - """ - log.exception("Stream encountered an exception") - - async def on_keep_alive(self): - """|coroutine| - - This is called when a keep-alive signal is received. - """ - log.debug("Received keep-alive signal") - - async def on_request_error(self, status_code): - """|coroutine| - - This is called when a non-200 HTTP status code is encountered. - - Parameters - ---------- - status_code : int - The HTTP status code encountered - """ - log.error("Stream encountered HTTP Error: %d", status_code) - async def on_data(self, raw_data): """|coroutine| @@ -492,3 +513,438 @@ class AsyncStream: The stall warning """ log.warning("Received stall warning: %s", notice) + + +class AsyncStreamingClient(AsyncBaseClient, AsyncBaseStream): + """Stream realtime Tweets asynchronously with Twitter API v2 + + .. versionadded:: 4.10 + + Parameters + ---------- + bearer_token : str + Twitter API Bearer Token + return_type : type[dict | requests.Response | Response] + Type to return from requests to the API + wait_on_rate_limit : bool + Whether to wait when rate limit is reached + max_retries: int | None + Number of times to attempt to (re)connect the stream. + proxy : str | None + URL of the proxy to use when connecting to the stream + + Attributes + ---------- + session : aiohttp.ClientSession | None + Aiohttp client session used to connect to the API + task : asyncio.Task | None + The task running the stream + user_agent : str + User agent used when connecting to the API + """ + + def __init__(self, bearer_token, *, return_type=Response, + wait_on_rate_limit=False, **kwargs): + """__init__( \ + bearer_token, *, return_type=Response, wait_on_rate_limit=False, \ + max_retries=inf, proxy=None \ + ) + """ + AsyncBaseClient.__init__(self, bearer_token, return_type=return_type, + wait_on_rate_limit=wait_on_rate_limit) + AsyncBaseStream.__init__(self, **kwargs) + + async def _connect(self, method, endpoint, **kwargs): + url = f"https://api.twitter.com/2/tweets/{endpoint}/stream" + headers = {"Authorization": f"Bearer {self.bearer_token}"} + await super()._connect(method, url, headers=headers, **kwargs) + + def _process_data(self, data, data_type=None): + if data_type is StreamRule: + if isinstance(data, list): + rules = [] + for rule in data: + if "tag" in rule: + rules.append(StreamRule( + value=rule["value"], id=rule["id"], tag=rule["tag"] + )) + else: + rules.append(StreamRule(value=rule["value"], + id=rule["id"])) + return rules + elif data is not None: + if "tag" in data: + return StreamRule(value=data["value"], id=data["id"], + tag=data["tag"]) + else: + return StreamRule(value=data["value"], id=data["id"]) + else: + super()._process_data(data, data_type=data_type) + + async def add_rules(self, add, **params): + """add_rules(add, *, dry_run) + + |coroutine| + + Add rules to filtered stream. + + Parameters + ---------- + add : list[StreamRule] | StreamRule + Specifies the operation you want to perform on the rules. + dry_run : bool + Set to true to test a the syntax of your rule without submitting + it. This is useful if you want to check the syntax of a rule before + removing one or more of your existing rules. + + Returns + ------- + dict | requests.Response | Response + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules + """ + json = {"add": []} + if isinstance(add, StreamRule): + add = (add,) + for rule in add: + if rule.tag is not None: + json["add"].append({"value": rule.value, "tag": rule.tag}) + else: + json["add"].append({"value": rule.value}) + + return await self._make_request( + "POST", f"/2/tweets/search/stream/rules", params=params, + endpoint_parameters=("dry_run",), json=json, data_type=StreamRule + ) + + async def delete_rules(self, ids, **params): + """delete_rules(ids, *, dry_run) + + |coroutine| + + Delete rules from filtered stream. + + Parameters + ---------- + ids : int | str | list[int | str | StreamRule] | StreamRule + Array of rule IDs, each one representing a rule already active in + your stream. IDs must be submitted as strings. + dry_run : bool + Set to true to test a the syntax of your rule without submitting + it. This is useful if you want to check the syntax of a rule before + removing one or more of your existing rules. + + Returns + ------- + dict | requests.Response | Response + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules + """ + json = {"delete": {"ids": []}} + if isinstance(ids, (int, str, StreamRule)): + ids = (ids,) + for id in ids: + if isinstance(id, StreamRule): + json["delete"]["ids"].append(str(StreamRule.id)) + else: + json["delete"]["ids"].append(str(id)) + + return await self._make_request( + "POST", f"/2/tweets/search/stream/rules", params=params, + endpoint_parameters=("dry_run",), json=json, data_type=StreamRule + ) + + def filter(self, **params): + """filter( \ + *, backfill_minutes=None, expansions=None, media_fields=None, \ + place_fields=None, poll_fields=None, tweet_fields=None, \ + user_fields=None \ + ) + + Streams Tweets in real-time based on a specific set of filter rules. + + If you are using the academic research product track, you can connect + up to two `redundant connections `_ to + maximize your streaming up-time. + + The Tweets returned by this endpoint count towards the Project-level + `Tweet cap`_. + + Parameters + ---------- + backfill_minutes : int | None + By passing this parameter, you can request up to five (5) minutes + worth of streaming data that you might have missed during a + disconnection to be delivered to you upon reconnection. The + backfilled Tweets will automatically flow through the reconnected + stream, with older Tweets generally being delivered before any + newly matching Tweets. You must include a whole number between 1 + and 5 as the value to this parameter. + + This feature will deliver duplicate Tweets, meaning that if you + were disconnected for 90 seconds, and you requested two minutes of + backfill, you will receive 30 seconds worth of duplicate Tweets. + Due to this, you should make sure your system is tolerant of + duplicate data. + + This feature is currently only available to the Academic Research + product track. + expansions : list[str] | str + :ref:`expansions_parameter` + media_fields : list[str] | str + :ref:`media_fields_parameter` + place_fields : list[str] | str + :ref:`place_fields_parameter` + poll_fields : list[str] | str + :ref:`poll_fields_parameter` + tweet_fields : list[str] | str + :ref:`tweet_fields_parameter` + user_fields : list[str] | str + :ref:`user_fields_parameter` + + Raises + ------ + TweepyException + When the stream is already connected + + Returns + ------- + asyncio.Task + The task running the stream + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream + + .. _filter redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/recovery-and-redundancy-features + .. _Tweet cap: https://developer.twitter.com/en/docs/twitter-api/tweet-caps + """ + if self.task is not None and not self.task.done(): + raise TweepyException("Stream is already connected") + + endpoint = "search" + + params = self._process_params( + params, endpoint_parameters=( + "backfill_minutes", "expansions", "media.fields", + "place.fields", "poll.fields", "tweet.fields", "user.fields" + ) + ) + + self.task = asyncio.create_task( + self._connect("GET", endpoint, params=params) + ) + # Use name parameter when support for Python 3.7 is dropped + return self.task + + async def get_rules(self, **params): + """get_rules(*, ids) + + |coroutine| + + Return a list of rules currently active on the streaming endpoint, + either as a list or individually. + + Parameters + ---------- + ids : list[str] | str + Comma-separated list of rule IDs. If omitted, all rules are + returned. + + Returns + ------- + dict | requests.Response | Response + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream-rules + """ + return await self._make_request( + "GET", f"/2/tweets/search/stream/rules", params=params, + endpoint_parameters=("ids",), data_type=StreamRule + ) + + def sample(self, **params): + """sample( \ + *, backfill_minutes=None, expansions=None, media_fields=None, \ + place_fields=None, poll_fields=None, tweet_fields=None, \ + user_fields=None \ + ) + + Streams about 1% of all Tweets in real-time. + + If you are using the academic research product track, you can connect + up to two `redundant connections `_ to + maximize your streaming up-time. + + Parameters + ---------- + backfill_minutes : int | None + By passing this parameter, you can request up to five (5) minutes + worth of streaming data that you might have missed during a + disconnection to be delivered to you upon reconnection. The + backfilled Tweets will automatically flow through the reconnected + stream, with older Tweets generally being delivered before any + newly matching Tweets. You must include a whole number between 1 + and 5 as the value to this parameter. + + This feature will deliver duplicate Tweets, meaning that if you + were disconnected for 90 seconds, and you requested two minutes of + backfill, you will receive 30 seconds worth of duplicate Tweets. + Due to this, you should make sure your system is tolerant of + duplicate data. + + This feature is currently only available to the Academic Research + product track. + expansions : list[str] | str + :ref:`expansions_parameter` + media_fields : list[str] | str + :ref:`media_fields_parameter` + place_fields : list[str] | str + :ref:`place_fields_parameter` + poll_fields : list[str] | str + :ref:`poll_fields_parameter` + tweet_fields : list[str] | str + :ref:`tweet_fields_parameter` + user_fields : list[str] | str + :ref:`user_fields_parameter` + + Raises + ------ + TweepyException + When the stream is already connected + + Returns + ------- + asyncio.Task + The task running the stream + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream + + .. _sample redundant connections: https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/integrate/recovery-and-redundancy-features + """ + if self.task is not None and not self.task.done(): + raise TweepyException("Stream is already connected") + + endpoint = "sample" + + params = self._process_params( + params, endpoint_parameters=( + "backfill_minutes", "expansions", "media.fields", + "place.fields", "poll.fields", "tweet.fields", "user.fields" + ) + ) + + self.task = asyncio.create_task( + self._connect("GET", endpoint, params=params) + ) + # Use name parameter when support for Python 3.7 is dropped + return self.task + + async def on_data(self, raw_data): + """|coroutine| + + This is called when raw data is received from the stream. + This method handles sending the data to other methods. + + Parameters + ---------- + raw_data : JSON + The raw data from the stream + + References + ---------- + https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/consuming-streaming-data + """ + data = json.loads(raw_data) + + tweet = None + includes = {} + errors = [] + matching_rules = [] + + if "data" in data: + tweet = Tweet(data["data"]) + await self.on_tweet(tweet) + if "includes" in data: + includes = self._process_includes(data["includes"]) + await self.on_includes(includes) + if "errors" in data: + errors = data["errors"] + await self.on_errors(errors) + if "matching_rules" in data: + matching_rules = [ + StreamRule(id=rule["id"], tag=rule["tag"]) + for rule in data["matching_rules"] + ] + await self.on_matching_rules(matching_rules) + + await self.on_response( + StreamResponse(tweet, includes, errors, matching_rules) + ) + + async def on_tweet(self, tweet): + """|coroutine| + + This is called when a Tweet is received. + + Parameters + ---------- + tweet : Tweet + The Tweet received + """ + pass + + async def on_includes(self, includes): + """|coroutine| + + This is called when includes are received. + + Parameters + ---------- + includes : dict + The includes received + """ + pass + + async def on_errors(self, errors): + """|coroutine| + + This is called when errors are received. + + Parameters + ---------- + errors : dict + The errors received + """ + log.error("Received errors: %s", errors) + + async def on_matching_rules(self, matching_rules): + """|coroutine| + + This is called when matching rules are received. + + Parameters + ---------- + matching_rules : list[StreamRule] + The matching rules received + """ + pass + + async def on_response(self, response): + """|coroutine| + + This is called when a response is received. + + Parameters + ---------- + response : StreamResponse + The response received + """ + log.debug("Received response: %s", response) -- 2.25.1