class Stream:
+ """Filter and sample realtime Tweets
+
+ Parameters
+ ----------
+ consumer_key : str
+ Twitter API Consumer Key
+ consumer_secret : str
+ Twitter API Consumer Secret
+ access_token: str
+ Twitter API Access Token
+ access_token_secret : str
+ Twitter API Access Token Secret
+ chunk_size : int
+ The default socket.read size. Default to 512, less than half the size
+ of a Tweet so that it reads Tweets with the minimal latency of 2 reads
+ per Tweet. Values higher than ~1kb will increase latency by waiting for
+ more data to arrive but may also increase throughput by doing fewer
+ socket read calls.
+ daemon : bool
+ Whether or not to use a daemon thread when using a thread to run the
+ stream
+ max_retries : int
+ Max number of times to retry connecting the stream
+ proxy : Optional[str]
+ URL of the proxy to use when connecting to the stream
+ verify : Union[bool, str]
+ Either a boolean, in which case it controls whether to verify the
+ server’s TLS certificate, or a string, in which case it must be a path
+ to a CA bundle to use.
+
+ Attributes
+ ----------
+ running : bool
+ Whether there's currently a stream running
+ session : Optional[:class:`requests.Session`]
+ Requests Session used to connect to the stream
+ thread : Optional[:class:`threading.Thread`]
+ Thread used to run the stream
+ user_agent : str
+ User agent used when connecting to the stream
+ """
def __init__(self, consumer_key, consumer_secret, access_token,
access_token_secret, *, chunk_size=512, daemon=False,
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
- # The default socket.read size. Default to less than half the size of
- # a tweet so that it reads tweets with the minimal latency of 2 reads
- # per tweet. Values higher than ~1kb will increase latency by waiting
- # for more data to arrive but may also increase throughput by doing
- # fewer socket read calls.
self.chunk_size = chunk_size
self.daemon = daemon
self.max_retries = max_retries
def filter(self, *, follow=None, track=None, locations=None,
filter_level=None, languages=None, stall_warnings=False,
threaded=False):
+ """Filter realtime Tweets
+
+ Parameters
+ ----------
+ follow : Optional[List[Union[int, str]]]
+ User IDs, indicating the users to return statuses for in the stream
+ track : Optional[List[str]]
+ Keywords to track
+ locations : Optional[List[float]]
+ Specifies a set of bounding boxes to track
+ filter_level : Optional[str]
+ Setting this parameter to one of none, low, or medium will set the
+ minimum value of the filter_level Tweet attribute required to be
+ included in the stream. The default value is none, which includes
+ all available Tweets.
+
+ When displaying a stream of Tweets to end users (dashboards or live
+ feeds at a presentation or conference, for example) it is suggested
+ that you set this value to medium.
+ languages : Optional[List[str]]
+ Setting this parameter to a comma-separated list of `BCP 47`_
+ language identifiers corresponding to any of the languages listed
+ on Twitter’s `advanced search`_ page will only return Tweets that
+ have been detected as being written in the specified languages. For
+ example, connecting with language=en will only stream Tweets
+ detected to be in the English language.
+ stall_warnings : bool
+ Specifies whether stall warnings should be delivered
+ threaded : bool
+ Whether or not to use a thread to run the stream
+
+ Raises
+ ------
+ TweepyException
+ When number of location coordinates is not a multiple of 4
+
+ Returns
+ -------
+ Optional[threading.Thread]
+ The thread if ``threaded`` is set to ``True``, else ``None``
+
+ References
+ ----------
+ https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/api-reference/post-statuses-filter
+
+ .. _BCP 47: https://tools.ietf.org/html/bcp47
+ .. _advanced search: https://twitter.com/search-advanced
+ """
if self.running:
raise TweepyException("Stream is already connected")
self._connect(method, endpoint, headers=headers, body=body)
def sample(self, *, languages=None, stall_warnings=False, threaded=False):
+ """Sample realtime Tweets
+
+ Parameters
+ ----------
+ languages : Optional[List[str]]
+ Setting this parameter to a comma-separated list of `BCP 47`_
+ language identifiers corresponding to any of the languages listed
+ on Twitter’s `advanced search`_ page will only return Tweets that
+ have been detected as being written in the specified languages. For
+ example, connecting with language=en will only stream Tweets
+ detected to be in the English language.
+ stall_warnings : bool
+ Specifies whether stall warnings should be delivered
+ threaded : bool
+ Whether or not to use a thread to run the stream
+
+ Returns
+ -------
+ Optional[threading.Thread]
+ The thread if ``threaded`` is set to ``True``, else ``None``
+
+ References
+ ----------
+ https://developer.twitter.com/en/docs/twitter-api/v1/tweets/sample-realtime/api-reference/get-statuses-sample
+ """
if self.running:
raise TweepyException("Stream is already connected")
self._connect(method, endpoint, params=params)
def disconnect(self):
+ """Disconnect the stream"""
self.running = False
def on_closed(self, resp):
- """This is called when the stream has been closed by Twitter."""
+ """This is called when the stream has been closed by Twitter.
+
+ Parameters
+ ----------
+ resp : requests.Response
+ The Response from Twitter
+ """
log.error("Stream connection closed by Twitter")
def on_connect(self):
log.info("Stream disconnected")
def on_exception(self, exception):
- """This is called when an unhandled exception occurs."""
+ """This is called when an unhandled exception occurs.
+
+ Parameters
+ ----------
+ exception : Exception
+ The unhandled exception
+ """
log.exception("Stream encountered an exception")
def on_keep_alive(self):
log.debug("Received keep-alive signal")
def on_request_error(self, status_code):
- """This is called when a non-200 HTTP status code is encountered."""
+ """This is called when a non-200 HTTP status code is encountered.
+
+ Parameters
+ ----------
+ status_code : int
+ The HTTP status code encountered
+ """
log.error("Stream encountered HTTP error: %d", status_code)
def on_data(self, raw_data):
This method handles sending the data to other methods, depending on the
message type.
+ Parameters
+ ----------
+ raw_data : JSON
+ The raw data from the stream
+
+ References
+ ----------
https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/streaming-message-types
"""
data = json.loads(raw_data)
log.error("Received unknown message type: %s", raw_data)
def on_status(self, status):
- """This is called when a status is received."""
+ """This is called when a status is received.
+
+ Parameters
+ ----------
+ status : Status
+ The Status received
+ """
log.debug("Received status: %d", status.id)
def on_delete(self, status_id, user_id):
- """This is called when a status deletion notice is received."""
+ """This is called when a status deletion notice is received.
+
+ Parameters
+ ----------
+ status_id : int
+ The ID of the deleted Tweet
+ user_id : int
+ The ID of the author of the Tweet
+ """
log.debug("Received status deletion notice: %d", status_id)
def on_disconnect_message(self, message):
- """This is called when a disconnect message is received."""
+ """This is called when a disconnect message is received.
+
+ Parameters
+ ----------
+ message : JSON
+ The disconnect message
+ """
log.warning("Received disconnect message: %s", message)
def on_limit(self, track):
- """This is called when a limit notice is received."""
+ """This is called when a limit notice is received.
+
+ Parameters
+ ----------
+ track : int
+ Total count of the number of undelivered Tweets since the
+ connection was opened
+ """
log.debug("Received limit notice: %d", track)
def on_scrub_geo(self, notice):
- """This is called when a location deletion notice is received."""
+ """This is called when a location deletion notice is received.
+
+ Parameters
+ ----------
+ notice : JSON
+ The location deletion notice
+ """
log.debug("Received location deletion notice: %s", notice)
def on_status_withheld(self, notice):
- """This is called when a status withheld content notice is received."""
+ """This is called when a status withheld content notice is received.
+
+ Parameters
+ ----------
+ notice : JSON
+ The status withheld content notice
+ """
log.debug("Received status withheld content notice: %s", notice)
def on_user_withheld(self, notice):
- """This is called when a user withheld content notice is received."""
+ """This is called when a user withheld content notice is received.
+
+ Parameters
+ ----------
+ notice : JSON
+ The user withheld content notice
+ """
log.debug("Received user withheld content notice: %s", notice)
def on_warning(self, warning):
- """This is called when a stall warning message is received."""
+ """This is called when a stall warning message is received.
+
+ Parameters
+ ----------
+ warning : JSON
+ The stall warning
+ """
log.warning("Received stall warning: %s", warning)