pod, user = handle[1], handle[0]
return (pod, user)
- def _gethandle(self, diaspora_id, protocol='https'):
+ def _postproc(self, request):
+ """Makes necessary modifications to user data and
+ sets up a stream.
+
+ :param request: request object
+ :type request: request
+ """
+ if request.status_code != 200:
+ raise Exception('wrong error code: {0}'.format(request.status_code))
+ else:
+ request = request.json()
+ data, final = request[0]['author'], {}
+ names = [('id', 'id'),
+ ('diaspora_id', 'diaspora_id'),
+ ('guid', 'guid'),
+ ('name', 'diaspora_name'),
+ ('avatar', 'image_urls'),
+ ]
+ for d, f in names:
+ final[f] = data[d]
+ self.data = final
+ self.stream = Outer(self._connection, location='people/{0}.json'.format(self.data['guid']))
+
- def _gethandle(self, diaspora_id, protocol='https'):
++ def _getbyhandle(self, diaspora_id, protocol='https'):
"""Get user data using handle.
"""
pod, user = self._sephandle(diaspora_id)
- request = self._connection.session.get('{0}://{1}/u/{2}.json'.format(protocol, pod, user)).json()
- data = request[0]['author']
- self.data = data
- self.stream = Outer(self._connection, location='people/{0}.json'.format(self.data['guid']))
+ request = self._connection.session.get('{0}://{1}/u/{2}.json'.format(protocol, pod, user))
+ self._postproc(request)
-- def _getguid(self, guid):
++ def _getbyguid(self, guid):
"""Get user data using guid.
"""
- request = self._connection.get('people/{0}.json'.format(guid)).json()
- data = request[0]['author']
- self.data = data
- self.stream = Outer(self._connection, location='people/{0}.json'.format(self.data['guid']))
+ request = self._connection.get('people/{0}.json'.format(guid))
+ self._postproc(request)
def fetchguid(self, guid):
"""Fetch user data using guid.
"""
-- self._getguid(guid)
++ self._getbyguid(guid)
def fetchhandle(self, diaspora_id, protocol='https'):
"""Fetch user data using diaspora id.
"""
-- self._gethandle(diaspora_id, protocol)
++ self._getbyhandle(diaspora_id, protocol)
test_count_file = open('TEST_COUNT', 'w')
test_count_file.write(str(test_count))
test_count_file.close()
-
-# Test connection setup
print('Running test no. {0}'.format(test_count))
-print('Running tests on connection to pod: "{0}"'.format(__pod__))
-print('Connecting to pod...\t', end='')
-try:
- test_connection = diaspy.connection.Connection(pod=__pod__, username=__username__, password=__passwd__)
- test_connection.login()
- print('[ CONNECTED ]\n')
- err = False
-except:
- print('[ FAIL ]')
- input('Hit [Return] to continue...')
- err = True
-finally:
- if err: raise
+print('Running tests on connection to pod: "{0}"\t'.format(__pod__), end='')
+test_connection = diaspy.connection.Connection(pod=__pod__, username=__username__, password=__passwd__)
+test_connection.login()
+print('[ CONNECTED ]\n')
+
+post_text = '#diaspy test no. {0}'.format(test_count)
- #### Test suite code
- class StreamTest(unittest.TestCase):
- def testGetting(self):
- stream = diaspy.streams.Generic(test_connection)
-
- def testGettingLength(self):
- stream = diaspy.streams.Generic(test_connection)
- len(stream)
-
- def testClearing(self):
- stream = diaspy.streams.Stream(test_connection)
- stream.clear()
- self.assertEqual(0, len(stream))
-
- def testPurging(self):
- stream = diaspy.streams.Stream(test_connection)
- post = stream.post('#diaspy test')
- stream.update()
- post.delete()
- stream.purge()
- self.assertNotIn(post.post_id, [p.post_id for p in stream])
-
- def testPostingText(self):
- stream = diaspy.streams.Stream(test_connection)
- post = stream.post(post_text)
- self.assertEqual(diaspy.models.Post, type(post))
-
- def testPostingImage(self):
- stream = diaspy.streams.Stream(test_connection)
- stream.post_picture('./test-image.png', post_text)
-
- def testingAddingTag(self):
- ft = diaspy.streams.FollowedTags(test_connection)
- ft.add('test')
-
-
+ #######################################
+ #### TEST SUITE CODE ####
+ #######################################
class ConnectionTest(unittest.TestCase):
def testLoginWithoutUsername(self):
connection = diaspy.connection.Connection(pod=__pod__)
self.assertEqual(diaspy.conversations.Conversation, type(mailbox[0]))
- if __name__ == '__main__':
- unittest.main()
- c = diaspy.connection.Connection(__pod__, __username__, __passwd__)
- c.login()
- stream = diaspy.modules.Stream(c)
- for post in stream:
- if post['text'] == '#diaspy test': post.delete()
+ class StreamTest(unittest.TestCase):
+ def testGetting(self):
+ stream = diaspy.streams.Generic(test_connection)
+
+ def testGettingLength(self):
+ stream = diaspy.streams.Generic(test_connection)
+ len(stream)
+
+ def testClearing(self):
+ stream = diaspy.streams.Stream(test_connection)
+ stream.clear()
+ self.assertEqual(0, len(stream))
+
+ def testPurging(self):
+ stream = diaspy.streams.Stream(test_connection)
+ post = stream.post('#diaspy test')
+ stream.update()
+ post.delete()
+ stream.purge()
+ self.assertNotIn(post.post_id, [p.post_id for p in stream])
+
+ def testPostingText(self):
+ stream = diaspy.streams.Stream(test_connection)
+ post = stream.post('#diaspy test no. {0}'.format(test_count))
+ self.assertEqual(diaspy.models.Post, type(post))
+
+ def testPostingImage(self):
+ stream = diaspy.streams.Stream(test_connection)
+ stream.post_picture('test-image.png')
+
+ def testingAddingTag(self):
+ ft = diaspy.streams.FollowedTags(test_connection)
+ ft.add('test')
+
+
+ class UserTests(unittest.TestCase):
+ def testHandleSeparatorRaisingExceptions(self):
+ user = diaspy.people.User(test_connection)
+ handles = ['user.pod.example.com',
+ 'user@podexamplecom',
+ '@pod.example.com',
+ 'use r@pod.example.com',
+ 'user0@pod300 example.com',
+ ]
+ for h in handles:
+ self.assertRaises(Exception, user._sephandle, h)
+
+ def testGettingUserByHandle(self):
+ user = diaspy.people.User(test_connection)
+ user.fetchhandle(testconf.diaspora_id)
+ self.assertEqual(testconf.guid, user['guid'])
+ self.assertEqual(testconf.diaspora_name, user['diaspora_name'])
+ self.assertIn('id', user.data)
+ self.assertIn('image_urls', user.data)
+ self.assertEqual(type(user.stream), diaspy.streams.Outer)
+
+ def testGettingUserByGUID(self):
+ user = diaspy.people.User(test_connection)
+ user.fetchguid(testconf.guid)
+ self.assertEqual(testconf.diaspora_id, user['diaspora_id'])
+ self.assertEqual(testconf.diaspora_name, user['diaspora_name'])
+ self.assertIn('id', user.data)
+ self.assertIn('image_urls', user.data)
+ self.assertEqual(type(user.stream), diaspy.streams.Outer)
+
+
-if __name__ == '__main__':
- unittest.main()
- c = diaspy.connection.Connection(__pod__, __username__, __passwd__)
- c.login()
- stream = diaspy.modules.Stream(c)
- for post in stream:
- if post['text'] == '#diaspy test': post.delete()
++if __name__ == '__main__': unittest.main()