import unittest
import random
+from time import sleep
from tweepy import *
self.assert_(isinstance(source, Friendship))
self.assert_(isinstance(target, Friendship))
+# Authentication tests
class TweepyAuthTests(unittest.TestCase):
consumer_key = 'ZbzSsdQj7t68VYlqIFvdcA'
api = API(auth)
api.update_status('test %i' % random.randint(1,1000))
+
+# Cache tests
+class TweepyCacheTests(unittest.TestCase):
+
+ timeout = 2.0
+
+ def _run_tests(self):
+ # test store and get
+ self.cache.store('testkey', 'testvalue')
+ self.assertEqual(self.cache.count(), 1, 'Count is wrong')
+ self.assertEqual(self.cache.get('testkey'), 'testvalue', 'Stored value does not match retrieved value')
+
+ # test timeout
+ sleep(self.timeout)
+ self.assertEqual(self.cache.get('testkey'), None, 'Cache entry should have expired')
+
+ # test cleanup
+ self.cache.store('testkey', 'testvalue')
+ sleep(self.timeout)
+ self.cache.cleanup()
+ self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed')
+
+ # test flush
+ for i in range(0,10):
+ self.cache.store('testkey%i' % i, 'testvalue')
+ self.cache.flush()
+ self.assertEqual(self.cache.count(), 0, 'Cache failed to flush')
+
+ def testmemorycache(self):
+ self.cache = MemoryCache(timeout=self.timeout)
+ self._run_tests()
+
+ def testfilecache(self):
+ os.mkdir('cache_test_dir')
+ self.cache = FileCache('cache_test_dir', self.timeout)
+ self._run_tests()
+ self.cache.flush()
+ os.rmdir('cache_test_dir')
+
+
if __name__ == '__main__':
unittest.main()
"""
raise NotImplementedError
+ def count(self):
+ """Get count of entries currently stored in cache"""
+ raise NotImplementedError
+
def cleanup(self):
"""Delete any expired entries in cache."""
raise NotImplementedError
# entry found and not expired, return it
return entry[1]
+ def count(self):
+ return len(self._entries)
+
def cleanup(self):
with self.lock:
for k,v in self._entries.items():
f_lock.close()
return value
+ def count(self):
+ c = 0
+ for entry in os.listdir(self.cache_dir):
+ if entry.endswith('.lock'): continue
+ c += 1
+ return c
+
def cleanup(self):
for entry in os.listdir(self.cache_dir):
if entry.endswith('.lock'): continue
return value
+ def count(self):
+ # TODO: implement
+ raise NotImplementedError
+
def cleanup(self):
# not implemented for this cache
return