self.auth = create_auth()
self.api = API(self.auth)
self.api.retry_count = 2
- self.api.retry_delay = 5
+ self.api.retry_delay = 0 if use_replay else 5
def create_auth():
import unittest
import random
import shutil
-from time import sleep
+import time
import os
from ast import literal_eval
class TweepyCacheTests(unittest.TestCase):
- timeout = 2.0
+ timeout = 0.5
memcache_servers = ['127.0.0.1:11211'] # must be running for test to pass
def _run_tests(self, do_cleanup=True):
'Stored value does not match retrieved value')
# test timeout
- sleep(self.timeout)
+ sleep(self.timeout, True)
self.assertEqual(self.cache.get('testkey'), None,
'Cache entry should have expired')
# test cleanup
if do_cleanup:
self.cache.store('testkey', 'testvalue')
- sleep(self.timeout)
+ sleep(self.timeout, True)
self.cache.cleanup()
self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed')
if os.path.exists('cache_test_dir'):
shutil.rmtree('cache_test_dir')
+old_sleep = time.sleep
+
+def sleep(t, override=False):
+ if not use_replay or override:
+ old_sleep(t)
if __name__ == '__main__':
unittest.main()