#!/usr/bin/env python3 """This module is only imported in other diaspy modules and MUST NOT import anything. """ import json import copy BS4_SUPPORT=False try: from bs4 import BeautifulSoup except ImportError: import re print("[diaspy] BeautifulSoup not found, falling back on regex.") else: BS4_SUPPORT=True from diaspy import errors class Aspect(): """This class represents an aspect. Class can be initialized by passing either an id and/or name as parameters. If both are missing, an exception will be raised. """ def __init__(self, connection, id, name=None): self._connection = connection self.id, self.name = id, name self._cached = [] def getUsers(self, fetch = True): """Returns list of GUIDs of users who are listed in this aspect. """ if fetch: request = self._connection.get('contacts.json?a_id={}'.format(self.id)) self._cached = request.json() return self._cached def removeAspect(self): """ --> POST /aspects/{id} HTTP/1.1 --> _method=delete&authenticity_token={token} <-- HTTP/1.1 302 Found Removes whole aspect. :returns: None """ request = self._connection.tokenFrom('contacts').delete('aspects/{}'.format(self.id)) if request.status_code != 302: raise errors.AspectError('wrong status code: {0}'.format(request.status_code)) def addUser(self, user_id): """Add user to current aspect. :param user_id: user to add to aspect :type user_id: int :returns: JSON from request --> POST /aspect_memberships HTTP/1.1 --> Accept: application/json, text/javascript, */*; q=0.01 --> Content-Type: application/json; charset=UTF-8 --> {"aspect_id":123,"person_id":123} <-- HTTP/1.1 200 OK """ data = {'aspect_id': self.id, 'person_id': user_id} headers = {'content-type': 'application/json', 'accept': 'application/json'} request = self._connection.tokenFrom('contacts').post('aspect_memberships', data=json.dumps(data), headers=headers) if request.status_code == 400: raise errors.AspectError('duplicate record, user already exists in aspect: {0}'.format(request.status_code)) elif request.status_code == 404: raise errors.AspectError('user not found from this pod: {0}'.format(request.status_code)) elif request.status_code != 200: raise errors.AspectError('wrong status code: {0}'.format(request.status_code)) response = None try: response = request.json() except json.decoder.JSONDecodeError: """ Should be OK now, but I'll leave this commentary here at first to see if anything comes up """ # FIXME For some (?) reason removing users from aspects works, but # adding them is a no-go and Diaspora* kicks us out with CSRF errors. # Weird. pass if response is None: raise errors.CSRFProtectionKickedIn() # Now you should fetchguid(fetch_stream=False) on User to update aspect membership_id's # Or update it locally with the response return response def removeUser(self, user): """Remove user from current aspect. :param user: user to remove from aspect :type user: diaspy.people.User object """ membership_id = None to_remove = None for each in user.aspectMemberships(): if each.get('aspect', {}).get('id') == self.id: membership_id = each.get('id') to_remove = each break # no need to continue if membership_id is None: raise errors.UserIsNotMemberOfAspect(user, self) request = self._connection.delete('aspect_memberships/{0}'.format(membership_id)) if request.status_code == 404: raise errors.AspectError('cannot remove user from aspect, probably tried too fast after adding: {0}'.format(request.status_code)) elif request.status_code != 200: raise errors.AspectError('cannot remove user from aspect: {0}'.format(request.status_code)) if 'contact' in user.data: # User object if to_remove: user.data['contact']['aspect_memberships'].remove( to_remove ) # remove local aspect membership_id else: # User object from Contacts() if to_remove: user.data['aspect_memberships'].remove( to_remove ) # remove local aspect membership_id return request.json() class Notification(): """This class represents single notification. """ if not BS4_SUPPORT: _who_regexp = re.compile(r'/people/([0-9a-f]+)["\']{1} class=["\']{1}hovercardable') _aboutid_regexp = re.compile(r'/posts/[0-9a-f]+') _htmltag_regexp = re.compile('') def __init__(self, connection, data): self._connection = connection self.type = data['type'] self._data = data[self.type] self.id = self._data['id'] self.unread = self._data['unread'] def __getitem__(self, key): """Returns a key from notification data. """ return self._data[key] def __str__(self): """Returns notification note. """ if BS4_SUPPORT: soup = BeautifulSoup(self._data['note_html'], 'lxml') media_body = soup.find('div', {"class": "media-body"}) div = media_body.find('div') if div: div.decompose() return media_body.getText().strip() else: string = re.sub(self._htmltag_regexp, '', self._data['note_html']) string = string.strip().split('\n')[0] while ' ' in string: string = string.replace(' ', ' ') return string def __repr__(self): """Returns notification note with more details. """ return '{0}: {1}'.format(self.when(), str(self)) def about(self): """Returns id of post about which the notification is informing OR: If the id is None it means that it's about user so .who() is called. """ if BS4_SUPPORT: soup = BeautifulSoup(self._data['note_html'], 'lxml') id = soup.find('a', {"data-ref": True}) if id: return id['data-ref'] else: return self.who()[0] else: about = self._aboutid_regexp.search(self._data['note_html']) if about is None: about = self.who()[0] else: about = int(about.group(0)[7:]) return about def who(self): """Returns list of guids of the users who caused you to get the notification. """ if BS4_SUPPORT: # Parse the HTML with BS4 soup = BeautifulSoup(self._data['note_html'], 'lxml') hovercardable_soup = soup.findAll('a', {"class": "hovercardable"}) return list(set([soup['href'][8:] for soup in hovercardable_soup])) else: return list(set([who for who in self._who_regexp.findall(self._data['note_html'])])) def when(self): """Returns UTC time as found in note_html. """ return self._data['created_at'] def mark(self, unread=False): """Marks notification to read/unread. Marks notification to read if `unread` is False. Marks notification to unread if `unread` is True. :param unread: which state set for notification :type unread: bool """ headers = {'x-csrf-token': repr(self._connection)} params = {'set_unread': json.dumps(unread)} response = self._connection.put('notifications/{0}'.format(self['id']), params=params, headers=headers) if response.status_code != 200: raise errors.NotificationError('Cannot mark notification: {0}'.format(response.status_code)) self._data['unread'] = unread self.unread = unread class Conversation(): """This class represents a conversation. .. note:: Remember that you need to have access to the conversation. """ if not BS4_SUPPORT: _message_stream_regexp = re.compile(r'
(.*?)
', re.DOTALL) _message_guid_regexp = re.compile(r'data-guid=["\']{1}([0-9]+)["\']{1}') _message_created_at_regexp = re.compile(r'
', re.DOTALL) _message_author_guid_regexp = re.compile(r' PUT /share_visibilities/42 HTTP/1.1 post_id=123 <- HTTP/1.1 200 OK """ headers = {'x-csrf-token': repr(self._connection)} params = {'post_id': json.dumps(self.id)} request = self._connection.put('share_visibilities/42', params=params, headers=headers) if request.status_code != 200: raise Exception('{0}: Failed to hide post.' .format(request.status_code)) def mute(self): """ -> POST /blocks HTTP/1.1 {"block":{"person_id":123}} <- HTTP/1.1 204 No Content """ headers = {'content-type':'application/json', 'x-csrf-token': repr(self._connection)} data = json.dumps({ 'block': { 'person_id' : self._data['author']['id'] } }) request = self._connection.post('blocks', data=data, headers=headers) if request.status_code != 204: raise Exception('{0}: Failed to block person' .format(request.status_code)) def subscribe(self): """ -> POST /posts/123/participation HTTP/1.1 <- HTTP/1.1 201 Created """ headers = {'x-csrf-token': repr(self._connection)} data = {} request = self._connection.post('posts/{}/participation' .format( self.id ), data=data, headers=headers) if request.status_code != 201: raise Exception('{0}: Failed to subscribe to post' .format(request.status_code)) self._data.update({"participation" : True}) def unsubscribe(self): """ -> POST /posts/123/participation HTTP/1.1 _method=delete <- HTTP/1.1 200 OK """ headers = {'x-csrf-token': repr(self._connection)} data = { "_method": "delete" } request = self._connection.post('posts/{}/participation' .format( self.id ), headers=headers, data=data) if request.status_code != 200: raise Exception('{0}: Failed to unsubscribe to post' .format(request.status_code)) self._data.update({"participation" : False}) def report(self): """ TODO """ pass def delete(self): """ This function deletes this post """ data = {'authenticity_token': repr(self._connection)} request = self._connection.delete('posts/{0}'.format(self.id), data=data, headers={'accept': 'application/json'}) if request.status_code != 204: raise errors.PostError('{0}: Post could not be deleted'.format(request.status_code)) def delete_comment(self, comment_id): """This function removes a comment from a post :param comment_id: id of the comment to remove. :type comment_id: str """ data = {'authenticity_token': repr(self._connection)} request = self._connection.delete('posts/{0}/comments/{1}' .format(self.id, comment_id), data=data, headers={'accept': 'application/json'}) if request.status_code != 204: raise errors.PostError('{0}: Comment could not be deleted' .format(request.status_code)) self.comments.delete(comment_id) def delete_like(self): """This function removes a like from a post """ data = {'authenticity_token': repr(self._connection)} url = 'posts/{0}/likes/{1}'.format(self.id, self._data['interactions']['likes'][0]['id']) request = self._connection.delete(url, data=data) if request.status_code != 204: raise errors.PostError('{0}: Like could not be removed.' .format(request.status_code)) self._data['interactions']['likes'].pop(0); self._data['interactions']['likes_count'] = str(int(self._data['interactions']['likes_count'])-1) def author(self, key='name'): """Returns author of the post. :param key: all keys available in data['author'] """ return self._data['author'][key] class FollowedTag(): """This class represents a followed tag. `diaspy.tagFollowings.TagFollowings()` uses it. """ def __init__(self, connection, id, name, taggings_count): self._connection = connection self._id, self._name, self._taggings_count = id, name, taggings_count def id(self): return self._id def name(self): return self._name def count(self): return self._taggings_count def delete(self): data = {'authenticity_token': repr(self._connection)} request = self._connection.delete('tag_followings/{0}'.format(self._id), data=data, headers={'accept': 'application/json'}) if request.status_code != 204: raise errors.TagError('{0}: Tag could not be deleted.' .format(request.status_code))