)
async def _connect(
- self, method, url, params=None, headers=None, body=None, oauth_1=False
+ self, method, url, params=None, headers=None, body=None,
+ oauth_client=None
):
error_count = 0
# https://developer.twitter.com/en/docs/twitter-api/v1/tweets/filter-realtime/guides/connecting
try:
while error_count <= self.max_retries:
try:
- if oauth_1:
- oauth_client = OAuthClient(
- self.consumer_key, self.consumer_secret,
- self.access_token, self.access_token_secret
- )
+ if oauth_client:
url, headers, body = oauth_client.sign(
url, http_method=method, headers=headers, body=body
)
async def _connect(
self, method, endpoint, params={}, headers=None, body=None
):
+ oauth_client = OAuthClient(self.consumer_key, self.consumer_secret,
+ self.access_token, self.access_token_secret)
url = f"https://stream.twitter.com/1.1/{endpoint}.json"
url = str(URL(url).with_query(sorted(params.items())))
await super()._connect(
- method, url, headers=headers, body=body, oauth_1=True
+ method, url, headers=headers, body=body, oauth_client=oauth_client
)
def filter(self, *, follow=None, track=None, locations=None,