if match:
version = match.group(1)
else:
- raise RuntimeError("Unable to find version string in %s." % (VERSION_FILE,))
+ raise RuntimeError(f"Unable to find version string in {VERSION_FILE}.")
with open("README.md") as readme_file:
long_description = readme_file.read()
# test count
for i in range(0, 20):
- self.cache.store('testkey%i' % i, 'testvalue')
+ self.cache.store(f'testkey{i}', 'testvalue')
self.assertEqual(self.cache.count(), 20, 'Count is wrong')
# test flush
# build api object test using oauth
api = API(auth)
- s = api.update_status('test %i' % random.randint(0, 1000))
+ s = api.update_status(f'test {random.randint(0, 1000)}')
api.destroy_status(s.id)
def testaccesstype(self):
return False
def on_error(self, code):
- print("response: %s" % code)
+ print(f"response: {code}")
return True
def on_status(self, status):
if f is None:
try:
if os.path.getsize(filename) > (max_size * 1024):
- raise TweepError('File is too big, must be less than %skb.'
- % max_size)
+ raise TweepError(f'File is too big, must be less than {max_size}kb.')
except os.error as e:
- raise TweepError('Unable to access file: %s' % e.strerror)
+ raise TweepError(f'Unable to access file: {e.strerror}')
# build the mulitpart-formdata body
fp = open(filename, 'rb')
else:
f.seek(0, 2) # Seek to end of file
if f.tell() > (max_size * 1024):
- raise TweepError('File is too big, must be less than %skb.'
- % max_size)
+ raise TweepError(f'File is too big, must be less than {max_size}kb.')
f.seek(0) # Reset to beginning of file
fp = f
if file_type in ['gif', 'jpeg', 'png', 'webp']:
file_type = 'image/' + file_type
elif file_type not in ['image/gif', 'image/jpeg', 'image/png']:
- raise TweepError('Invalid file type for image: %s' % file_type)
+ raise TweepError(f'Invalid file type for image: {file_type}')
if isinstance(filename, str):
filename = filename.encode('utf-8')
try:
url = self._get_oauth_url('request_token')
if access_type:
- url += '?x_auth_access_type=%s' % access_type
+ url += f'?x_auth_access_type={access_type}'
return self.oauth.fetch_request_token(url)
except Exception as e:
raise TweepError(e)
data = resp.json()
if data.get('token_type') != 'bearer':
raise TweepError('Expected token_type to equal "bearer", '
- 'but got %s instead' % data.get('token_type'))
+ f'but got {data.get("token_type")} instead')
self._bearer_token = data['access_token']
if arg is None:
continue
if k in self.session.params:
- raise TweepError('Multiple values for parameter %s supplied!' % k)
+ raise TweepError(f'Multiple values for parameter {k} supplied!')
self.session.params[k] = convert_to_utf8_str(arg)
try:
value = quote(self.session.params[name])
except KeyError:
- raise TweepError('No parameter value found for path variable: %s' % name)
+ raise TweepError(f'No parameter value found for path variable: {name}')
del self.session.params[name]
self.path = self.path.replace(variable, value)
# Query the cache if one is available
# and this request uses a GET method.
if self.use_cache and self.api.cache and self.method == 'GET':
- cache_result = self.api.cache.get('%s?%s' % (url, urlencode(self.session.params)))
+ cache_result = self.api.cache.get(f'{url}?{urlencode(self.session.params)}')
# if cache result found and not expired, return it
if cache_result:
# must restore api reference
sleep_time = self._reset_time - int(time.time())
if sleep_time > 0:
if self.wait_on_rate_limit_notify:
- log.warning("Rate limit reached. Sleeping for: %d" % sleep_time)
+ log.warning(f"Rate limit reached. Sleeping for: {sleep_time}")
time.sleep(sleep_time + 5) # sleep for few extra sec
# if self.wait_on_rate_limit and self._reset_time is not None and \
# sleep_time = self._reset_time - int(time.time())
# if sleep_time > 0:
# if self.wait_on_rate_limit_notify:
- # log.warning("Rate limit reached. Sleeping for: %d" % sleep_time)
+ # log.warning(f"Rate limit reached. Sleeping for: {sleep_time}")
# time.sleep(sleep_time + 5) # sleep for few extra sec
# Apply authentication
auth=auth,
proxies=self.api.proxy)
except Exception as e:
- raise TweepError('Failed to send request: %s' % e).with_traceback(sys.exc_info()[2])
+ raise TweepError(f'Failed to send request: {e}').with_traceback(sys.exc_info()[2])
rem_calls = resp.headers.get('x-rate-limit-remaining')
error_msg, api_error_code = \
self.parser.parse_error(resp.text)
except Exception:
- error_msg = "Twitter error response: status code = %s" % resp.status_code
+ error_msg = f"Twitter error response: status code = {resp.status_code}"
api_error_code = None
if is_rate_limit_error_message(error_msg):
# Store result into cache if one is available.
if self.use_cache and self.api.cache and self.method == 'GET' and result:
- self.api.cache.store('%s?%s' % (url, urlencode(self.session.params)), result)
+ self.api.cache.store(f'{url}?{urlencode(self.session.params)}', result)
return result
return results
def __repr__(self):
- state = ['%s=%s' % (k, repr(v)) for (k, v) in vars(self).items()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(state))
+ state = [f'{k}={v!r}' for (k, v) in vars(self).items()]
+ return f'{self.__class__.__name__}({", ".join(state)})'
class Status(Model):
try:
json = json_lib.loads(payload)
except Exception as e:
- raise TweepError('Failed to parse JSON payload: %s' % e)
+ raise TweepError(f'Failed to parse JSON payload: {e}')
if return_cursors and isinstance(json, dict):
if 'next' in json:
return
model = getattr(self.model_factory, method.payload_type)
except AttributeError:
- raise TweepError('No model for this payload type: '
- '%s' % method.payload_type)
+ raise TweepError(f'No model for this payload type: {method.payload_type}')
json = JSONParser.parse(self, method, payload, return_cursors=return_cursors)
if isinstance(json, tuple):
def _run(self):
# Authenticate
- url = "https://%s%s" % (self.host, self.url)
+ url = f"https://{self.host}{self.url}"
# Connect and process the stream
error_counter = 0
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/%s/user.json' % STREAM_VERSION
+ self.url = f'/{STREAM_VERSION}/user.json'
self.host = 'userstream.twitter.com'
if stall_warnings:
self.session.params['stall_warnings'] = stall_warnings
if len(locations) % 4 != 0:
raise TweepError("Wrong number of locations points, "
"it has to be a multiple of 4")
- self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
+ self.session.params['locations'] = ','.join([f'{l:.2f}' for l in locations])
if track:
self.session.params['track'] = ','.join(track).encode(encoding)
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
+ self.url = f'/{STREAM_VERSION}/statuses/firehose.json'
if count:
- self.url += '&count=%s' % count
+ self.url += f'&count={count}'
self._start(is_async)
def retweet(self, is_async=False):
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
+ self.url = f'/{STREAM_VERSION}/statuses/retweet.json'
self._start(is_async)
def sample(self, is_async=False, languages=None, stall_warnings=False):
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/%s/statuses/sample.json' % STREAM_VERSION
+ self.url = f'/{STREAM_VERSION}/statuses/sample.json'
if languages:
self.session.params['language'] = ','.join(map(str, languages))
if stall_warnings:
self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/%s/statuses/filter.json' % STREAM_VERSION
+ self.url = f'/{STREAM_VERSION}/statuses/filter.json'
if follow:
self.body['follow'] = ','.join(follow).encode(encoding)
if track:
if len(locations) % 4 != 0:
raise TweepError("Wrong number of locations points, "
"it has to be a multiple of 4")
- self.body['locations'] = ','.join(['%.4f' % l for l in locations])
+ self.body['locations'] = ','.join([f'{l:.4f}' for l in locations])
if stall_warnings:
self.body['stall_warnings'] = stall_warnings
if languages:
self.body = {}
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/%s/site.json' % STREAM_VERSION
+ self.url = f'/{STREAM_VERSION}/site.json'
self.body['follow'] = ','.join(map(str, follow))
self.body['delimited'] = 'length'
if stall_warnings: