)
async def _connect(
- self, method, url, params=None, headers=None, body=None
+ self, method, url, params=None, headers=None, body=None, oauth_1=False
):
error_count = 0
# https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting
try:
while error_count <= self.max_retries:
try:
+ if oauth_1:
+ oauth_client = OAuthClient(
+ self.consumer_key, self.consumer_secret,
+ self.access_token, self.access_token_secret
+ )
+ url, headers, body = oauth_client.sign(
+ url, http_method=method, headers=headers, body=body
+ )
async with self.session.request(
method, url, params=params, headers=headers, data=body,
proxy=self.proxy
async def _connect(
self, method, endpoint, params={}, headers=None, body=None
):
- oauth_client = OAuthClient(self.consumer_key, self.consumer_secret,
- self.access_token, self.access_token_secret)
url = f"https://stream.twitter.com/1.1/{endpoint}.json"
url = str(URL(url).with_query(sorted(params.items())))
- url, headers, body = oauth_client.sign(
- url, http_method=method, headers=headers, body=body
+ await super()._connect(
+ method, url, headers=headers, body=body, oauth_1=True
)
- await super()._connect(method, url, headers=headers, body=body)
def filter(self, *, follow=None, track=None, locations=None,
filter_level=None, languages=None, stall_warnings=False):