And the test suite was updated. Yay!
+* __new__: `diaspy.errors.SettingsError`,
+
+
+* __upd__: `diaspy.settings.Account.setEmail()` can now raise `SettingsError` when request fails,
+
+
**`0.4.1-rc.3` (2013-09-08):**
* __new__: `diaspy.settings.Profile.load()` method for loading profile information,
+# flake8: noqa
+
import diaspy.connection as connection
import diaspy.models as models
import diaspy.streams as streams
import diaspy.settings as settings
-__version__ = '0.4.1-rc.3'
+__version__ = '0.4.1'
request = self._connection.get('conversations.json')
if request.status_code != 200:
- raise errors.DiaspyError('wrong status code: {0}'.format(r.status_code))
+ raise errors.DiaspyError('wrong status code: {0}'.format(request.status_code))
mailbox = request.json()
self._mailbox = [models.Conversation(self._connection, c['conversation']['id']) for c in mailbox]
pass
+class SettingsError(DiaspyError):
+ """Exception raised when something related to settings goes wrong.
+ """
+ pass
+
+
def react(r, message='', accepted=[200, 201, 202, 203, 204, 205, 206], exception=DiaspyError):
"""This method tries to decides how to react
to a response code passed to it. If it's an
headers={'accept': 'application/json'})
if request.status_code != 200:
raise errors.ConversationError('{0}: Answer could not be posted.'
- .format(request.status_code))
+ .format(request.status_code))
return request.json()
def delete(self):
data = {'authenticity_token': repr(self._connection)}
request = self._connection.delete('conversations/{0}/visibility/'
- .format(self.id),
- data=data,
- headers={'accept': 'application/json'})
+ .format(self.id),
+ data=data,
+ headers={'accept': 'application/json'})
if request.status_code != 404:
raise errors.ConversationError('{0}: Conversation could not be deleted.'
- .format(request.status_code))
+ .format(request.status_code))
def get_subject(self):
"""Returns the subject of this conversation
class Comment():
"""Represents comment on post.
-
+
Does not require Connection() object. Note that you should not manually
create `Comment()` objects -- they are designed to be created automatically
by `Post()` objects.
data = {'authenticity_token': repr(self._connection)}
request = self._connection.post('posts/{0}/likes'.format(self.id),
- data=data,
- headers={'accept': 'application/json'})
+ data=data,
+ headers={'accept': 'application/json'})
if request.status_code != 201:
raise errors.PostError('{0}: Post could not be liked.'
- .format(request.status_code))
+ .format(request.status_code))
return request.json()
def reshare(self):
'authenticity_token': repr(self._connection)}
request = self._connection.post('reshares',
- data=data,
- headers={'accept': 'application/json'})
+ data=data,
+ headers={'accept': 'application/json'})
if request.status_code != 201:
raise Exception('{0}: Post could not be reshared'.format(request.status_code))
return request.json()
data = {'text': text,
'authenticity_token': repr(self._connection)}
request = self._connection.post('posts/{0}/comments'.format(self.id),
- data=data,
- headers={'accept': 'application/json'})
+ data=data,
+ headers={'accept': 'application/json'})
if request.status_code != 201:
raise Exception('{0}: Comment could not be posted.'
"""
data = {'authenticity_token': repr(self._connection)}
request = self._connection.delete('posts/{0}'.format(self.id),
- data=data,
- headers={'accept': 'application/json'})
+ data=data,
+ headers={'accept': 'application/json'})
if request.status_code != 204:
raise errors.PostError('{0}: Post could not be deleted'.format(request.status_code))
"""
data = {'authenticity_token': repr(self._connection)}
request = self._connection.delete('posts/{0}/comments/{1}'
- .format(self.id,
- comment_id),
- data=data,
- headers={'accept': 'application/json'})
+ .format(self.id, comment_id),
+ data=data,
+ headers={'accept': 'application/json'})
if request.status_code != 204:
raise errors.PostError('{0}: Comment could not be deleted'
- .format(request.status_code))
+ .format(request.status_code))
def delete_like(self):
"""This function removes a like from a post
"""
data = {'authenticity_token': self._connection.get_token()}
-
- request = self._connection.delete('posts/{0}/likes/{1}'
- .format(self.id,
- self.data['interactions']
- ['likes'][0]['id']),
- data=data)
+ url = 'posts/{0}/likes/{1}'.format(self.id, self.data['interactions']['likes'][0]['id'])
+ request = self._connection.delete(url, data=data)
if request.status_code != 204:
raise errors.PostError('{0}: Like could not be removed.'
- .format(request.status_code))
+ .format(request.status_code))
def author(self, key='name'):
"""Returns author of the post.
def __init__(self, connection, guid='', handle='', fetch='posts', id=0):
self._connection = connection
self.stream = []
- self.handle = handle
- self.guid = guid
self.data = {
'guid': guid,
'handle': handle,
"""Fetch user posts or data.
"""
if fetch == 'posts':
- if self.handle and not self.guid: self.fetchhandle()
+ if self['handle'] and not self['guid']: self.fetchhandle()
else: self.fetchguid()
- elif fetch == 'data' and self.handle:
+ elif fetch == 'data' and self['handle']:
self.fetchprofile()
def _finalize_data(self, data):
"""Adjustments are needed to have similar results returned
- by search feature and fetchguid/handle().
+ by search feature and fetchguid()/fetchhandle().
"""
- names = [ ('id', 'id'),
- ('guid', 'guid'),
- ('name', 'name'),
- ('avatar', 'avatar'),
- ('handle', 'diaspora_id'),
- ]
+ names = [('id', 'id'),
+ ('guid', 'guid'),
+ ('name', 'name'),
+ ('avatar', 'avatar'),
+ ('handle', 'diaspora_id'),
+ ]
final = {}
for f, d in names:
final[f] = data[d]
:type request: request
"""
if request.status_code != 200: raise Exception('wrong error code: {0}'.format(request.status_code))
- else: request = request.json()
+ request = request.json()
if not len(request): raise errors.UserError('cannot extract user data: no posts to analyze')
self.data = self._finalize_data(request[0]['author'])
self.stream = Outer(self._connection, location='people/{0}.json'.format(self['guid']))
def fetchhandle(self, protocol='https'):
"""Fetch user data and posts using Diaspora handle.
"""
- pod, user = sephandle(self.handle)
+ pod, user = sephandle(self['handle'])
request = self._connection.get('{0}://{1}/u/{2}.json'.format(protocol, pod, user), direct=True)
self._postproc(request)
def fetchguid(self):
"""Fetch user data and posts using guid.
"""
- if self.guid:
- request = self._connection.get('people/{0}.json'.format(self.guid))
+ if self['guid']:
+ request = self._connection.get('people/{0}.json'.format(self['guid']))
self._postproc(request)
else:
raise errors.UserError('GUID not set')
def fetchprofile(self):
"""Fetches user data.
"""
- data = search.Search(self._connection).user(self.handle)[0]
+ data = search.Search(self._connection).user(self['handle'])[0]
self.data = data
request = self._connection.get('contacts.json', params=params)
if request.status_code != 200:
raise Exception('status code {0}: cannot get contacts'.format(request.status_code))
- contacts = [User(self._connection, guid=user['guid'], handle=user['handle'], fetch=None, id=user['id']) for user in request.json()]
- return contacts
+ return [User(self._connection, guid=user['guid'], handle=user['handle'], fetch=None) for user in request.json()]
class Account():
"""Provides interface to account settings.
"""
+ email_regexp = re.compile('<input id="user_email" name="user\[email\]" size="30" type="text" value="(.+?)"')
+
def __init__(self, connection):
self._connection = connection
else: nsfw = ''
if post['photos']:
for n, photo in enumerate(post['photos']):
- name = '{0}_{1}{2}.{3}'.format(post['guid'], photo['guid'], nsfw, photo['sizes'][size].split('.')[-1])
+ # photo format -- .jpg, .png etc.
+ ext = photo['sizes'][size].split('.')[-1]
+ name = '{0}_{1}{2}.{3}'.format(post['guid'], photo['guid'], nsfw, ext)
filename = os.path.join(path, name)
try:
urllib.request.urlretrieve(url=photo['sizes'][size], filename=filename)
"""
data = {'_method': 'put', 'utf8': '✓', 'user[email]': email, 'authenticity_token': repr(self._connection)}
request = self._connection.post('user', data=data, allow_redirects=False)
+ if request.status_code != 302:
+ raise errors.SettingsError('setting email failed: {0}'.format(request.status_code))
def getEmail(self):
"""Returns currently used email.
"""
data = self._connection.get('user/edit')
- email = re.compile('<input id="user_email" name="user\[email\]" size="30" type="text" value=".+?"').search(data.text)
+ email = self.email_regexp.search(data.text)
if email is None: raise errors.DiaspyError('cannot fetch email')
- email = email.group(0)[:-1]
- email = email[email.rfind('"')+1:]
+ email = email.group(1)
return email
def setLanguage(self, lang):
Setters can then be used to adjust the data.
Finally, `update()` can be called to send data back to pod.
"""
- firstname_regexp = re.compile('<input id="profile_first_name" name="profile\[first_name\]" type="text" value="(.*?)" />')
- lastname_regexp = re.compile('<input id="profile_last_name" name="profile\[last_name\]" type="text" value="(.*?)" />')
+ firstname_regexp = re.compile('id="profile_first_name" name="profile\[first_name\]" type="text" value="(.*?)" />')
+ lastname_regexp = re.compile('id="profile_last_name" name="profile\[last_name\]" type="text" value="(.*?)" />')
bio_regexp = re.compile('<textarea id="profile_bio" name="profile\[bio\]" placeholder="Fill me out" rows="5">\n(.*?)</textarea>')
- location_regexp = re.compile('<input id="profile_location" name="profile\[location\]" placeholder="Fill me out" type="text" value="(.*?)" />')
- gender_regexp = re.compile('<input id="profile_gender" name="profile\[gender\]" placeholder="Fill me out" type="text" value="(.*?)" />')
- birth_year_regexp = re.compile('<option selected="selected" value="([0-9]{4,4})">[0-9]{4,4}</option>')
- birth_month_regexp = re.compile('<option selected="selected" value="([0-9]{1,2})">(.*?)</option>')
- birth_day_regexp = re.compile('<option selected="selected" value="([0-9]{1,2})">[0-9]{1,2}</option>')
- is_searchable_regexp = re.compile('<input checked="checked" id="profile_searchable" name="profile\[searchable\]" type="checkbox" value="(.*?)" />')
- is_nsfw_regexp = re.compile('<input checked="checked" id="profile_nsfw" name="profile\[nsfw\]" type="checkbox" value="(.*?)" />')
+ location_regexp = re.compile('id="profile_location" name="profile\[location\]" placeholder="Fill me out" type="text" value="(.*?)" />')
+ gender_regexp = re.compile('id="profile_gender" name="profile\[gender\]" placeholder="Fill me out" type="text" value="(.*?)" />')
+ birth_year_regexp = re.compile('selected="selected" value="([0-9]{4,4})">[0-9]{4,4}</option>')
+ birth_month_regexp = re.compile('selected="selected" value="([0-9]{1,2})">(.*?)</option>')
+ birth_day_regexp = re.compile('selected="selected" value="([0-9]{1,2})">[0-9]{1,2}</option>')
+ is_searchable_regexp = re.compile('checked="checked" id="profile_searchable" name="profile\[searchable\]" type="checkbox" value="(.*?)" />')
+ is_nsfw_regexp = re.compile('checked="checked" id="profile_nsfw" name="profile\[nsfw\]" type="checkbox" value="(.*?)" />')
def __init__(self, connection):
self._connection = connection
# Details of your account
diaspora_name = 'Foo Bar'
user_names_tuple = ('Foo', 'Bar')
+ user_email = 'email@example.com'
user_location_string = 'Nowhere'
user_gender_string = 'Gender'
user_date_of_birth = (2013, 9, 7)
def testGettingLanguages(self):
self.assertIn(('English', 'en'), self.account.getLanguages())
+ def testGettingEmail(self):
+ self.assertEqual(testconf.user_email, self.account.getEmail())
+
if __name__ == '__main__':
print('Hello World!')