:returns: dict -- json formatted user info.
"""
- r = self.connection.get('bookmarklet')
- regex = re.compile(r'window.current_user_attributes = ({.*})')
- userdata = json.loads(regex.search(r.text).group(1))
- return userdata
+ return self.connection.getUserInfo()
def post_picture(self, filename):
"""This method posts a picture to D*.
:returns: list -- list of Post objects.
"""
- request = self.connection.get('stream.json')
-
- if request.status_code != 200:
- raise Exception('wrong status code: {0}'.format(request.status_code))
-
- stream = request.json()
- return [diaspy.models.Post(str(post['id']), self.connection) for post in stream]
+ stream = diaspy.models.Stream(self.connection)
+ stream.update()
+ return stream
def get_notifications(self):
"""This functions returns a list of notifications.
import re
import requests
+import json
class LoginError(Exception):
self.pod = pod
self.session = requests.Session()
self._token_regex = re.compile(r'content="(.*?)"\s+name="csrf-token')
+ self._userinfo_regex = re.compile(r'window.current_user_attributes = ({.*})')
self._setlogin(username, password)
def get(self, string):
Performs additional checks if needed.
Example:
- To obtain 'foo' from pod one should call `_sessionget('foo')`.
+ To obtain 'foo' from pod one should call `get('foo')`.
:param string: URL to get without the pod's URL and slash eg. 'stream'.
:type string: str
Performs additional checks if needed.
Example:
- To post to 'foo' one should call `_sessionpost('foo', data={})`.
+ To post to 'foo' one should call `post('foo', data={})`.
:param string: URL to post without the pod's URL and slash eg. 'status_messages'.
:type string: str
data=self.login_data,
headers={'accept': 'application/json'})
if request.status_code != 201:
- raise Exception('{0}: Login failed.'.format(request.status_code))
+ raise LoginError('{0}: Login failed.'.format(request.status_code))
def login(self, username='', password=''):
"""This function is used to log in to a pod.
self._login()
def podswitch(self, pod):
- """Switches pod.
+ """Switches pod from current to another one.
"""
self.pod = pod
self._login()
+ def getUserInfo(self):
+ """This function returns the current user's attributes.
+
+ :returns: dict -- json formatted user info.
+ """
+ request = self.get('bookmarklet')
+ userdata = json.loads(self._userinfo_regex.search(request.text).group(1))
+ return userdata
+
def getToken(self):
"""This function returns a token needed for authentication in most cases.
#!/usr/bin/env python3
+import json
+
+
class Post:
"""This class represents a post.
headers={'accept': 'application/json'})
if r.status_code != 204:
raise Exception('{0}: Post could not be deleted'.format(r.status_code))
+
+
+class Stream:
+ """Object representing user's stream.
+ """
+ def __init__(self, connection):
+ """
+ :param connection: Connection() object
+ :param type: diaspy.connection.Connection
+ """
+ self._connection = connection
+ self._stream = []
+ self.fill()
+
+ def __contains__(self, post):
+ """Returns True if stream contains given post.
+ """
+ if type(post) is not Post:
+ raise TypeError('stream can contain only posts: checked for {0}'.format(type(post)))
+ return post in self._stream
+
+ def __iter__(self):
+ """Provides iterable interface for stream.
+ """
+ return iter(self._stream)
+
+ def __getitem__(self, n):
+ """Returns n-th item in Stream.
+ """
+ return self._stream[n]
+
+ def __len__(self):
+ """Returns length of the Stream.
+ """
+ return len(self._stream)
+
+ def _obtain(self):
+ """Obtains stream from pod.
+ """
+ request = self._connection.get('stream.json')
+ if request.status_code != 200:
+ raise Exception('wrong status code: {0}'.format(request.status_code))
+ return [Post(str(post['id']), self._connection) for post in request.json()]
+
+ def clear(self):
+ """Removes all posts from stream.
+ """
+ self._stream = []
+
+ def update(self):
+ """Updates stream.
+ """
+ stream = self._obtain()
+ _stream = self._stream
+ for i in range(len(stream)):
+ if stream[-i] not in _stream:
+ _stream = [stream[-i]] + _stream
+ self._stream = _stream
+
+ def fill(self):
+ """Fills the stream with posts.
+ """
+ self._stream = self._obtain()
+
+ def post(self, text, aspect_ids='public', photos=None):
+ """This function sends a post to an aspect
+
+ :param text: Text to post.
+ :type text: str
+ :param aspect_ids: Aspect ids to send post to.
+ :type aspect_ids: str
+
+ :returns: diaspy.models.Post -- the Post which has been created
+ """
+ data = {}
+ data['aspect_ids'] = aspect_ids
+ data['status_message'] = {'text': text}
+ if photos: data['photos'] = photos
+ request = self._connection.post('status_messages',
+ data=json.dumps(data),
+ headers={'content-type': 'application/json',
+ 'accept': 'application/json',
+ 'x-csrf-token': self._connection.getToken()})
+ if request.status_code != 201:
+ raise Exception('{0}: Post could not be posted.'.format(
+ request.status_code))
+
+ post = Post(str(request.json()['id']), self._connection)
+ self.update()
+ return post
+
+ def post_picture(self, filename):
+ """This method posts a picture to D*.
+
+ :param filename: Path to picture file.
+ :type filename: str
+ """
+ aspects = self._connection.getUserInfo()['aspects']
+ params = {}
+ params['photo[pending]'] = 'true'
+ params['set_profile_image'] = ''
+ params['qqfile'] = filename
+ for i, aspect in enumerate(aspects):
+ params['photo[aspect_ids][%d]' % (i)] = aspect['id']
+
+ data = open(filename, 'rb')
+
+ headers = {'content-type': 'application/octet-stream',
+ 'x-csrf-token': self._connection.getToken(),
+ 'x-file-name': filename}
+ request = self._connection.post('photos', params=params, data=data, headers=headers)
+ data.close()
+ self.update()
+ return request
--- /dev/null
+#### `Stream()` object
+
+This object is used to represent user's stream on D\*.
+It is returned by `Client()`'s method `get_stream()` and
+is basically the list of posts.
+
+It can get more functionality in future (probably it would be
+moved from `Client()`).
+
+----
+
+##### Getting stream
+
+To get basic stream you have to have working `Connection()` as
+this is required by `Stream()`'s constructor.
+
+ c = diaspy.connection.Connection(pod='https://pod.example.com',
+ username='foo',
+ password='bar')
+ c.login()
+ stream = diaspy.models.Stream(c)
+
+Now you have a stream filled with posts (if any can be found on user's stream).
+
+----
+
+##### Length of a stream
+
+Stream's length can be checked by calling `len()` on it.
+
+ len(stream)
+ 10
__passwd__ = testconf.__passwd__
+class StreamTest(unittest.TestCase):
+ def testGetting(self):
+ c = diaspy.connection.Connection(pod=__pod__, username=__username__, password=__passwd__)
+ c.login()
+ stream = diaspy.models.Stream(c)
+ stream.update()
+
+ def testGettingLength(self):
+ c = diaspy.connection.Connection(pod=__pod__, username=__username__, password=__passwd__)
+ c.login()
+ stream = diaspy.models.Stream(c)
+ stream.update()
+ len(stream)
+
+ def testClearing(self):
+ c = diaspy.connection.Connection(pod=__pod__, username=__username__, password=__passwd__)
+ c.login()
+ stream = diaspy.models.Stream(c)
+ stream.update()
+ stream.clear()
+ self.assertEqual(0, len(stream))
+
+ def testPostingText(self):
+ c = diaspy.connection.Connection(pod=__pod__, username=__username__, password=__passwd__)
+ c.login()
+ stream = diaspy.models.Stream(c)
+ post = stream.post('`diaspy` test \n#diaspy')
+ self.assertEqual(diaspy.models.Post, type(post))
+
+ def testPostingImage(self):
+ c = diaspy.connection.Connection(pod=__pod__, username=__username__, password=__passwd__)
+ c.login()
+ stream = diaspy.models.Stream(c)
+ stream.post_picture('./test-image.png')
+
+
class ConnectionTest(unittest.TestCase):
def testLoginWithoutUsername(self):
connection = diaspy.connection.Connection(pod=__pod__)
connection = diaspy.connection.Connection(pod=__pod__)
self.assertRaises(diaspy.connection.LoginError, connection.login, username='user')
-class ClientTests(unittest.TestCase):
def testGettingUserInfo(self):
- client = diaspy.client.Client(__pod__, __username__, __passwd__)
- info = client.get_user_info()
+ connection = diaspy.connection.Connection(__pod__, __username__, __passwd__)
+ connection.login()
+ info = connection.getUserInfo()
self.assertEqual(dict, type(info))
+
+class ClientTests(unittest.TestCase):
def testGettingStream(self):
client = diaspy.client.Client(__pod__, __username__, __passwd__)
stream = client.get_stream()
- self.assertEqual(list, type(stream))
- if stream: self.assertEqual(diaspy.models.Post, type(stream[0]))
+ if len(stream): self.assertEqual(diaspy.models.Post, type(stream[0]))
def testGettingNotifications(self):
client = diaspy.client.Client(__pod__, __username__, __passwd__)