* __new__: `diaspy.people.User._fetchstream()` method,
* __new__: `diaspy.people.Me()` object representing current user,
* __new__: `**kwargs` added to `diaspy.streams.Generic.json()` methdo to give developers control over the creation of JSON,
+* __new__: `.getHCard()` method added to `diaspy.people.User()`,
* __upd__: `diaspy.connection.Connection.login()` modifies connection object in-place **and** returns it (this allows more fluent API),
* __upd__: `diaspy.connection.Connection._login()` no longer returns status code (if login was unsuccessful it'll raise an exception),
* __upd__: better error message in `diaspy.models.Post().__init__()`,
* __upd__: `data` variable in `diaspy.models.Post()` renamed to `_data` to indicate that it's considered private,
+* __upd__: after deleting a post `Activity` stream is purged instead of being refilled (this preserves state of stream which is not reset to last 15 posts),
+* __upd__: `filterByIDs()` method in `Aspects` stream renamed to `filter()`,
-* __rem__: `diaspy.connection.Connection.getUserInfo()` moved to `diaspy.people.Me.getInfo()`,
-* __rem__: `diaspy.people.Me.getInfo()` (moved from `diaspy.connection.Connection.getUserInfo()`),
+* __rem__: `diaspy.connection.Connection.getUserInfo()` moved to `diaspy.connection.Connection.getUserData()`,
+* __rem__: `fetch` parameter removed from `diaspy.connection.Connection.getUserData()`,
* __dep__: `max_time` parameter in `diaspy.streams.*.more()` method is deprecated,
"""Object representing connection with the pod.
"""
_token_regex = re.compile(r'content="(.*?)"\s+name="csrf-token')
+ _userinfo_regex = re.compile(r'window.current_user_attributes = ({.*})')
+ # this is for older version of D*
+ _userinfo_regex_2 = re.compile(r'gon.user=({.*});gon.preloads')
def __init__(self, pod, username, password, schema='https'):
"""
"""Returns session token string (_diaspora_session).
"""
return self._diaspora_session
+
+ def getUserData(self):
+ """Returns user data.
+ """
+ request = self.get('bookmarklet')
+ userdata = self._userinfo_regex.search(request.text)
+ if userdata is None: userdata = self._userinfo_regex_2.search(request.text)
+ if userdata is None: raise errors.DiaspyError('cannot find user data')
+ userdata = userdata.group(1)
+ return json.loads(userdata)
#!/usr/bin/env python3
+"""This module contains custom exceptions that are raised by diaspy.
+These are not described by DIASPORA* protocol as exceptions that should be
+raised by API implementations but are specific to this particular implementation.
+
+If your program should catch all exceptions raised by diaspy and
+does not need to handle them specifically you can use following code:
+
+ # this line imports all errors
+ from diaspy.errors import *
+
+ try:
+ # your code...
+ except DiaspyError as e:
+ # your error handling code...
+ finally:
+ # closing code...
+"""
+
+import warnings
+
class DiaspyError(Exception):
"""Base exception for all errors
pass
-class TokenError(Exception):
+class TokenError(DiaspyError):
pass
:param exception: preferred exception to raise
:type exception: valid exception type (default: DiaspyError)
"""
+ warnings.warn(DeprecationWarning)
if r in accepted: e = None
else: e = DiaspyError
:param message: message for exception
:type message: str
"""
+ warnings.warn(DeprecationWarning)
if e is None: pass
else: raise e(message)
"""Finds name for aspect.
"""
name = None
- aspects = self._connection.getUserInfo()['aspects']
+ aspects = self._connection.getUserData()['aspects']
for a in aspects:
if a['id'] == self.id:
name = a['name']
"""Finds id for aspect.
"""
id = None
- aspects = self._connection.getUserInfo()['aspects']
+ aspects = self._connection.getUserData()['aspects']
for a in aspects:
if a['name'] == self.name:
id = a['id']
def __repr__(self):
"""Returns string containing more information then str().
"""
- return '{0} ({1}): {2}'.format(self._data['author']['name'], self.data['author']['guid'], self.data['text'])
+ return '{0} ({1}): {2}'.format(self._data['author']['name'], self._data['author']['guid'], self._data['text'])
def __str__(self):
"""Returns text of a post.
def _fetchcomments(self):
"""Retreives comments for this post.
+ Retrieving comments via GUID will result in 404 error.
+ DIASPORA* does not supply comments through /posts/:guid/ endpoint.
"""
- if self.id: id = self.id
- if self.guid: id = self.guid
+ id = self._data['id']
if self['interactions']['comments_count']:
request = self._connection.get('posts/{0}/comments.json'.format(id))
if request.status_code != 200:
data = search.Search(self._connection).user(self['handle'])[0]
self.data = data
+ def getHCard(self):
+ """Returns XML string containing user HCard.
+ """
+ request = self._connection.get('hcard/users/{0}'.format(self['guid']))
+ if request.status_code != 200: raise errors.UserError('could not fetch hcard for user: {0}'.format(self['guid']))
+ return request.text
+
class Me():
"""Object represetnting current user.
def __init__(self, connection):
self._connection = connection
- def getInfo(self, fetch=False):
+ def getInfo(self):
"""This function returns the current user's attributes.
:returns: dict -- json formatted user info.
return json.loads(userdata)
-
class Contacts():
"""This class represents user's list of contacts.
"""
def getTags(self):
"""Returns tags user had selected when describing him/her-self.
"""
- guid = self._connection.getUserInfo()['guid']
+ guid = self._connection.getUserData()['guid']
html = self._connection.get('people/{0}'.format(guid)).text
description_regexp = re.compile('<a href="/tags/(.*?)" class="tag">#.*?</a>')
return [tag.lower() for tag in re.findall(description_regexp, html)]
if max_time:
params['max_time'] = max_time
params['_'] = int(time.time() * 1000)
- request = self._connection.get(self._location, params=params, headers={'cookie': ''})
+ request = self._connection.get(self._location, params=params)
if request.status_code != 200:
raise errors.StreamError('wrong status code: {0}'.format(request.status_code))
return [Post(self._connection, guid=post['guid']) for post in request.json()]
for post in self._stream:
deleted = False
try:
+ # error will tell us that the post has been deleted
post.update()
- stream.append(post)
except Exception:
deleted = True
finally:
post = Post(self._connection, request.json()['id'])
return post
- def _photoupload(self, filename):
+ def _photoupload(self, filename, aspects=[]):
"""Uploads picture to the pod.
:param filename: path to picture file
:type filename: str
+ :param aspect_ids: list of ids of aspects to which you want to upload this photo
+ :type aspect_ids: list of integers
:returns: id of the photo being uploaded
"""
params['photo[pending]'] = 'true'
params['set_profile_image'] = ''
params['qqfile'] = filename
- aspects = self._connection.getUserInfo()['aspects']
+ if not aspects: aspects = self._connection.getUserData()['aspects']
for i, aspect in enumerate(aspects):
params['photo[aspect_ids][{0}]'.format(i)] = aspect['id']
"""Deletes post from users activity.
`post` can be either post id or Post()
object which will be identified and deleted.
- After deleting post the stream will be filled.
+ After deleting post the stream will be purged.
:param post: post identifier
:type post: str, diaspy.models.Post
if type(post) == str: self._delid(post)
elif type(post) == Post: post.delete()
else: raise TypeError('this method accepts str or Post types: {0} given')
- self.fill()
+ self.purge()
class Aspects(Generic):
:returns: int
"""
id = -1
- aspects = self._connection.getUserInfo()['aspects']
+ aspects = self._connection.getUserData()['aspects']
for aspect in aspects:
if aspect['name'] == aspect_name: id = aspect['id']
return id
- def filterByIDs(self, ids):
- self._location += '?{0}'.format(','.join(ids))
+ def filter(self, ids):
+ """Filters posts by given aspect ids.
+
+ :parameter ids: list of apsect ids
+ :type ids: list of integers
+ """
+ self._location = 'aspects.json' + '?{0}'.format(','.join(ids))
self.fill()
def add(self, aspect_name, visible=0):
"""
_location = 'followed_tags.json'
+ def get(self):
+ """Returns list of followed tags.
+ """
+ return []
+
def remove(self, tag_id):
"""Stop following a tag.
class Tag(Generic):
"""This stream contains all posts containing a tag.
"""
- def __init__(self, connection, tag):
+ def __init__(self, connection, tag, fetch=True):
"""
:param connection: Connection() object
:type connection: diaspy.connection.Connection
"""
self._connection = connection
self._location = 'tags/{0}.json'.format(tag)
- self.fill()
+ if fetch: self.fill()
import warnings
# actual diaspy code
import diaspy
-from diaspy import client as dclient
#### SETUP STUFF
testconf.test_aspect_id = diaspy.streams.Aspects(test_connection).add(testconf.test_aspect_name).id
print('OK')
-print([i['name'] for i in test_connection.getUserInfo()['aspects']])
+print([i['name'] for i in test_connection.getUserData()['aspects']])
post_text = '#diaspy test no. {0}'.format(test_count)
#######################################
class ConnectionTest(unittest.TestCase):
def testGettingUserInfo(self):
- info = test_connection.getUserInfo()
+ info = test_connection.getUserData()
self.assertEqual(dict, type(info))
def testAspectsRemoveById(self):
aspects = diaspy.streams.Aspects(test_connection)
- for i in test_connection.getUserInfo()['aspects']:
+ for i in test_connection.getUserData()['aspects']:
if i['name'] == testconf.test_aspect_name:
print(i['id'], end=' ')
aspects.remove(id=i['id'])
break
- names = [i['name'] for i in test_connection.getUserInfo()['aspects']]
+ names = [i['name'] for i in test_connection.getUserData()['aspects']]
print(names)
self.assertNotIn(testconf.test_aspect_name, names)
aspects = diaspy.streams.Aspects(test_connection)
print(testconf.test_aspect_name_fake, end=' ')
aspects.remove(name=testconf.test_aspect_name_fake)
- names = [i['name'] for i in test_connection.getUserInfo()['aspects']]
+ names = [i['name'] for i in test_connection.getUserData()['aspects']]
print(names)
self.assertNotIn(testconf.test_aspect_name_fake, names)
print('It\'s testing time!')
n = unittest.main()
print(n)
- print('It was testing time!')
+ print('Good! All tests passed!')