python3 -m unittest --verbose --catch --failfast tests.py
test-python2:
- python3 -m unittest --verbose --catch --failfast tests.py
+ python2 -m unittest --verbose --catch --failfast tests.py
+
+clean:
+ rm -rv {*/,}__pycache__/
+ rm -rv {*/,}*.pyc
import diaspy.conversations as conversations
import diaspy.streams as streams
import diaspy.client as client
-"""
-from diaspy import client
-from diaspy import models
-from diaspy import conversations
-from diaspy import streams
-"""
+import diaspy.people as people
--- /dev/null
+import re
+from diaspy.streams import Outer
+
+
+class User:
+ """This class abstracts a D* user.
+ This object goes around the limitations of current D* API and will
+ extract user data using black magic.
+ However, no chickens are harmed when you use it.
+ """
+ def __init__(self, connection):
+ self._connection = connection
+ self.data = {}
+
+ def __getitem__(self, key):
+ return self.data[key]
+
+ def _sephandle(self, handle):
+ """Separate D* handle into pod pod and user.
+
+ :param handle: diaspora id: user@pod.example.com
+ :type handle: str
+ :returns: two-tuple (pod, user)
+ """
+ if re.match('^[a-zA-Z]+[a-zA-Z0-9_-]*@[a-z0-9.]+\.[a-z]+$', handle) is None:
+ raise Exception('invalid handle: {0}'.format(handle))
+ handle = handle.split('@')
+ pod, user = handle[1], handle[0]
+ return (pod, user)
+
+ def _gethandle(self, diaspora_id, protocol='https'):
+ """Get user data using handle.
+ """
+ pod, user = self._sephandle(diaspora_id)
+ request = self._connection.session.get('{0}://{1}/u/{2}.json'.format(protocol, pod, user)).json()
+ data = request[0]['author']
+ self.data = data
+ self.stream = Outer(self._connection, location='people/{0}.json'.format(self.data['guid']))
+
+ def _getguid(self, guid):
+ """Get user data using guid.
+ """
+ request = self._connection.get('people/{0}.json'.format(guid)).json()
+ data = request[0]['author']
+ self.data = data
+ self.stream = Outer(self._connection, location='people/{0}.json'.format(self.data['guid']))
+
+ def fetchguid(self, guid):
+ """Fetch user data using guid.
+ """
+ self._getguid(guid)
+
+ def fetchhandle(self, diaspora_id, protocol='https'):
+ """Fetch user data using diaspora id.
+ """
+ self._gethandle(diaspora_id, protocol)
self._stream = self._obtain()
+class Outer(Generic):
+ """Object used by diaspy.models.User to represent
+ stream of other user.
+ """
+ def _obtain(self):
+ """Obtains stream of other user.
+ """
+ request = self._connection.get(self._location)
+ if request.status_code != 200:
+ raise Exception('wrong status code: {0}'.format(request.status_code))
+ return [Post(str(post['id']), self._connection) for post in request.json()]
+
+
class Stream(Generic):
"""The main stream containing the combined posts of the
followed users and tags and the community spotlights posts
--- /dev/null
+#### `User()` object
+
+This object is used to represent a D\* user.
+
+----
+
+##### Getting user data
+
+You have to know either GUID or *diaspora_id* of a user.
+Assume that *12345678abcdefgh* and *otheruser@pod.example.com* point to
+the same user.
+
+
+ >>> c = diaspy.connection.Connection('https://pod.example.com', 'foo', 'bar')
+ >>>
+ >>> user_guid = diaspy.people.User(c)
+ >>> user_guid.fetchguid('12345678abcdefgh')
+ >>>
+ >>> user_handle = diaspy.people.User(c)
+ >>> user_handle.fetchhandle('otheruser@pod.example.com')
+
+Now, you have two `User()` objects containing the data of one user.
+
+The object is subscriptable so you can do like this:
+
+ >>> user_guid['diaspora_id']
+ 'otheruser@pod.example.com'
+ >>>
+ >>> user_handle['guid']
+ '12345678abcdefgh'
+
+
+User object contains following items:
+
+* `id`, `str`, id of the user;
+* `guid`, `str`, guid of the user;
+* `diaspora_id`, `str`, D\* id of the user;
+* `name`, `str`, name of the user;
+* `avatar`, `dict`, links to avatar of the user;
+* `stream`, `diaspy.streams.Outer`, stream of the user (provides all methods of generic stream);
+
+
+----
+
+###### Manual for `diaspy`, written by Marek Marecki
stream = diaspy.streams.Stream(test_connection)
stream.post_picture('./test-image.png')
- def testingAddingTag(self)
+ def testingAddingTag(self):
ft = diaspy.streams.FollowedTags(test_connection)
ft.add('test')