Replace old C-style string formatting/interpolation with f-strings
authorHarmon <Harmon758@gmail.com>
Mon, 28 Dec 2020 04:49:52 +0000 (22:49 -0600)
committerHarmon <Harmon758@gmail.com>
Mon, 28 Dec 2020 04:49:52 +0000 (22:49 -0600)
setup.py
tests/test_api.py
tests/test_auth.py
tests/test_streaming.py
tweepy/api.py
tweepy/auth.py
tweepy/binder.py
tweepy/models.py
tweepy/parsers.py
tweepy/streaming.py

index 77e2dedffbe6e769a8ce33efc8415591b5296b0f..07fab6f2d437135e8f224dec8212edbcff1d9c4d 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -11,7 +11,7 @@ with open(VERSION_FILE) as version_file:
 if match:
     version = match.group(1)
 else:
-    raise RuntimeError("Unable to find version string in %s." % (VERSION_FILE,))
+    raise RuntimeError(f"Unable to find version string in {VERSION_FILE}.")
 
 with open("README.md") as readme_file:
     long_description = readme_file.read()
index cf1f6cba2b37e299aa9430e784ecb3b52397655e..f86bbcf08164121c7128f9b630603bd4209a3f39 100644 (file)
@@ -428,7 +428,7 @@ class TweepyCacheTests(unittest.TestCase):
 
         # test count
         for i in range(0, 20):
-            self.cache.store('testkey%i' % i, 'testvalue')
+            self.cache.store(f'testkey{i}', 'testvalue')
         self.assertEqual(self.cache.count(), 20, 'Count is wrong')
 
         # test flush
index d82ff73fc9007bd035e2064c3a67e001d7dbe657..c1dfb0006f7f15043277a7cc662ed499bfa323a1 100644 (file)
@@ -20,7 +20,7 @@ class TweepyAuthTests(unittest.TestCase):
 
         # build api object test using oauth
         api = API(auth)
-        s = api.update_status('test %i' % random.randint(0, 1000))
+        s = api.update_status(f'test {random.randint(0, 1000)}')
         api.destroy_status(s.id)
 
     def testaccesstype(self):
index c64b516c3452cba1e46d0d61ac606e06427bb27e..592a4cc0cfee2162ce83594a50ced72565104ae1 100644 (file)
@@ -29,7 +29,7 @@ class MockStreamListener(StreamListener):
         return False
 
     def on_error(self, code):
-        print("response: %s" % code)
+        print(f"response: {code}")
         return True
 
     def on_status(self, status):
index 7276d839ac5841ed59dd82ffcd31bd96ca3e5370..e514fc6a9743130a0108879666e5bf1a897111ef 100644 (file)
@@ -1381,18 +1381,16 @@ class API:
         if f is None:
             try:
                 if os.path.getsize(filename) > (max_size * 1024):
-                    raise TweepError('File is too big, must be less than %skb.'
-                                     % max_size)
+                    raise TweepError(f'File is too big, must be less than {max_size}kb.')
             except os.error as e:
-                raise TweepError('Unable to access file: %s' % e.strerror)
+                raise TweepError(f'Unable to access file: {e.strerror}')
 
             # build the mulitpart-formdata body
             fp = open(filename, 'rb')
         else:
             f.seek(0, 2)  # Seek to end of file
             if f.tell() > (max_size * 1024):
-                raise TweepError('File is too big, must be less than %skb.'
-                                 % max_size)
+                raise TweepError(f'File is too big, must be less than {max_size}kb.')
             f.seek(0)  # Reset to beginning of file
             fp = f
 
@@ -1408,7 +1406,7 @@ class API:
         if file_type in ['gif', 'jpeg', 'png', 'webp']:
             file_type = 'image/' + file_type
         elif file_type not in ['image/gif', 'image/jpeg', 'image/png']:
-            raise TweepError('Invalid file type for image: %s' % file_type)
+            raise TweepError(f'Invalid file type for image: {file_type}')
 
         if isinstance(filename, str):
             filename = filename.encode('utf-8')
index 3cd7efa886272cc920aeac5c8df7b30c13d39433..b283d6567499a56c61409b9d705faa878305a55b 100644 (file)
@@ -67,7 +67,7 @@ class OAuthHandler(AuthHandler):
         try:
             url = self._get_oauth_url('request_token')
             if access_type:
-                url += '?x_auth_access_type=%s' % access_type
+                url += f'?x_auth_access_type={access_type}'
             return self.oauth.fetch_request_token(url)
         except Exception as e:
             raise TweepError(e)
@@ -172,7 +172,7 @@ class AppAuthHandler(AuthHandler):
         data = resp.json()
         if data.get('token_type') != 'bearer':
             raise TweepError('Expected token_type to equal "bearer", '
-                             'but got %s instead' % data.get('token_type'))
+                             f'but got {data.get("token_type")} instead')
 
         self._bearer_token = data['access_token']
 
index 757733be71b9a976c6673e6a2e838fe8a12bbefb..60bdeffcefaf0ea6e6bc1ea7da0c1abeec8c7959 100644 (file)
@@ -100,7 +100,7 @@ def bind_api(**config):
                 if arg is None:
                     continue
                 if k in self.session.params:
-                    raise TweepError('Multiple values for parameter %s supplied!' % k)
+                    raise TweepError(f'Multiple values for parameter {k} supplied!')
 
                 self.session.params[k] = convert_to_utf8_str(arg)
 
@@ -117,7 +117,7 @@ def bind_api(**config):
                     try:
                         value = quote(self.session.params[name])
                     except KeyError:
-                        raise TweepError('No parameter value found for path variable: %s' % name)
+                        raise TweepError(f'No parameter value found for path variable: {name}')
                     del self.session.params[name]
 
                 self.path = self.path.replace(variable, value)
@@ -132,7 +132,7 @@ def bind_api(**config):
             # Query the cache if one is available
             # and this request uses a GET method.
             if self.use_cache and self.api.cache and self.method == 'GET':
-                cache_result = self.api.cache.get('%s?%s' % (url, urlencode(self.session.params)))
+                cache_result = self.api.cache.get(f'{url}?{urlencode(self.session.params)}')
                 # if cache result found and not expired, return it
                 if cache_result:
                     # must restore api reference
@@ -158,7 +158,7 @@ def bind_api(**config):
                                 sleep_time = self._reset_time - int(time.time())
                                 if sleep_time > 0:
                                     if self.wait_on_rate_limit_notify:
-                                        log.warning("Rate limit reached. Sleeping for: %d" % sleep_time)
+                                        log.warning(f"Rate limit reached. Sleeping for: {sleep_time}")
                                     time.sleep(sleep_time + 5)  # sleep for few extra sec
 
                 # if self.wait_on_rate_limit and self._reset_time is not None and \
@@ -166,7 +166,7 @@ def bind_api(**config):
                 #     sleep_time = self._reset_time - int(time.time())
                 #     if sleep_time > 0:
                 #         if self.wait_on_rate_limit_notify:
-                #             log.warning("Rate limit reached. Sleeping for: %d" % sleep_time)
+                #             log.warning(f"Rate limit reached. Sleeping for: {sleep_time}")
                 #         time.sleep(sleep_time + 5)  # sleep for few extra sec
 
                 # Apply authentication
@@ -188,7 +188,7 @@ def bind_api(**config):
                                                 auth=auth,
                                                 proxies=self.api.proxy)
                 except Exception as e:
-                    raise TweepError('Failed to send request: %s' % e).with_traceback(sys.exc_info()[2])
+                    raise TweepError(f'Failed to send request: {e}').with_traceback(sys.exc_info()[2])
 
                 rem_calls = resp.headers.get('x-rate-limit-remaining')
 
@@ -224,7 +224,7 @@ def bind_api(**config):
                     error_msg, api_error_code = \
                         self.parser.parse_error(resp.text)
                 except Exception:
-                    error_msg = "Twitter error response: status code = %s" % resp.status_code
+                    error_msg = f"Twitter error response: status code = {resp.status_code}"
                     api_error_code = None
 
                 if is_rate_limit_error_message(error_msg):
@@ -239,7 +239,7 @@ def bind_api(**config):
 
             # Store result into cache if one is available.
             if self.use_cache and self.api.cache and self.method == 'GET' and result:
-                self.api.cache.store('%s?%s' % (url, urlencode(self.session.params)), result)
+                self.api.cache.store(f'{url}?{urlencode(self.session.params)}', result)
 
             return result
 
index f330a7f093dafa5f7e9952267ff55f0fe3d1c4f1..d3bdbcb5a8a82767c14a1d52356548c56c4cb49a 100644 (file)
@@ -78,8 +78,8 @@ class Model:
         return results
 
     def __repr__(self):
-        state = ['%s=%s' % (k, repr(v)) for (k, v) in vars(self).items()]
-        return '%s(%s)' % (self.__class__.__name__, ', '.join(state))
+        state = [f'{k}={v!r}' for (k, v) in vars(self).items()]
+        return f'{self.__class__.__name__}({", ".join(state)})'
 
 
 class Status(Model):
index cd2fb8b115ca30329ad32d8dfefa2925cbe0465b..e8c1abff6542b6b8c18a007692933e5eb5deb20d 100644 (file)
@@ -47,7 +47,7 @@ class JSONParser(Parser):
         try:
             json = json_lib.loads(payload)
         except Exception as e:
-            raise TweepError('Failed to parse JSON payload: %s' % e)
+            raise TweepError(f'Failed to parse JSON payload: {e}')
 
         if return_cursors and isinstance(json, dict):
             if 'next' in json:
@@ -87,8 +87,7 @@ class ModelParser(JSONParser):
                 return
             model = getattr(self.model_factory, method.payload_type)
         except AttributeError:
-            raise TweepError('No model for this payload type: '
-                             '%s' % method.payload_type)
+            raise TweepError(f'No model for this payload type: {method.payload_type}')
 
         json = JSONParser.parse(self, method, payload, return_cursors=return_cursors)
         if isinstance(json, tuple):
index 8c22fe80b5deddc814042fae99426056f7c9044c..649e70fe26ebe695cb095f2d77d377a76bd7a361 100644 (file)
@@ -246,7 +246,7 @@ class Stream:
 
     def _run(self):
         # Authenticate
-        url = "https://%s%s" % (self.host, self.url)
+        url = f"https://{self.host}{self.url}"
 
         # Connect and process the stream
         error_counter = 0
@@ -399,7 +399,7 @@ class Stream:
         self.session.params = {'delimited': 'length'}
         if self.running:
             raise TweepError('Stream object already connected!')
-        self.url = '/%s/user.json' % STREAM_VERSION
+        self.url = f'/{STREAM_VERSION}/user.json'
         self.host = 'userstream.twitter.com'
         if stall_warnings:
             self.session.params['stall_warnings'] = stall_warnings
@@ -411,7 +411,7 @@ class Stream:
             if len(locations) % 4 != 0:
                 raise TweepError("Wrong number of locations points, "
                                  "it has to be a multiple of 4")
-            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
+            self.session.params['locations'] = ','.join([f'{l:.2f}' for l in locations])
         if track:
             self.session.params['track'] = ','.join(track).encode(encoding)
 
@@ -421,23 +421,23 @@ class Stream:
         self.session.params = {'delimited': 'length'}
         if self.running:
             raise TweepError('Stream object already connected!')
-        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
+        self.url = f'/{STREAM_VERSION}/statuses/firehose.json'
         if count:
-            self.url += '&count=%s' % count
+            self.url += f'&count={count}'
         self._start(is_async)
 
     def retweet(self, is_async=False):
         self.session.params = {'delimited': 'length'}
         if self.running:
             raise TweepError('Stream object already connected!')
-        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
+        self.url = f'/{STREAM_VERSION}/statuses/retweet.json'
         self._start(is_async)
 
     def sample(self, is_async=False, languages=None, stall_warnings=False):
         self.session.params = {'delimited': 'length'}
         if self.running:
             raise TweepError('Stream object already connected!')
-        self.url = '/%s/statuses/sample.json' % STREAM_VERSION
+        self.url = f'/{STREAM_VERSION}/statuses/sample.json'
         if languages:
             self.session.params['language'] = ','.join(map(str, languages))
         if stall_warnings:
@@ -450,7 +450,7 @@ class Stream:
         self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
         if self.running:
             raise TweepError('Stream object already connected!')
-        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
+        self.url = f'/{STREAM_VERSION}/statuses/filter.json'
         if follow:
             self.body['follow'] = ','.join(follow).encode(encoding)
         if track:
@@ -459,7 +459,7 @@ class Stream:
             if len(locations) % 4 != 0:
                 raise TweepError("Wrong number of locations points, "
                                  "it has to be a multiple of 4")
-            self.body['locations'] = ','.join(['%.4f' % l for l in locations])
+            self.body['locations'] = ','.join([f'{l:.4f}' for l in locations])
         if stall_warnings:
             self.body['stall_warnings'] = stall_warnings
         if languages:
@@ -474,7 +474,7 @@ class Stream:
         self.body = {}
         if self.running:
             raise TweepError('Stream object already connected!')
-        self.url = '/%s/site.json' % STREAM_VERSION
+        self.url = f'/{STREAM_VERSION}/site.json'
         self.body['follow'] = ','.join(map(str, follow))
         self.body['delimited'] = 'length'
         if stall_warnings: