import diaspy.models
import diaspy.streams
import diaspy.connection
+from diaspy import notifications
class Client:
:returns: diaspy.streams.Activity
"""
- activity = diaspy.streams.Activity(self.connection, 'activity.json')
- return activity
+ return diaspy.streams.Activity(self.connection, 'activity.json')
def get_stream(self):
"""This functions returns stream.
:returns: list -- list of json formatted notifications
"""
- r = self.connection.get('notifications.json')
-
- if r.status_code != 200:
- raise Exception('wrong status code: {0}'.format(r.status_code))
-
- notifications = r.json()
- return notifications
+ return notifications.Notifications(self.connection)
def get_mailbox(self):
"""This functions returns a list of messages found in the conversation.
import json
+"""This module abstracts connection to pod.
+"""
+
+
class LoginError(Exception):
pass
request = self.session.post(string, data, headers=headers, params=params)
return request
+ def put(self, string, data=None, headers={}, params={}):
+ """This method PUTs to session.
+ """
+ string = '{0}/{1}'.format(self.pod, string)
+ if data is not None: request = self.session.put(string, data, headers=headers, params=params)
+ else: request = self.session.put(string, headers=headers, params=params)
+ return request
+
def delete(self, string, data, headers={}):
"""This method lets you send delete request to session.
Performs additional checks if needed.
#!/usr/bin/env python3
+import json
+
+
+"""This module is only imported in other diaspy modules and
+MUST NOT import anything.
+"""
+
+
class Aspect():
"""This class represents an aspect.
"""
return request.json()
+class Notification():
+ """This class represents single notification.
+ """
+ def __init__(self, connection, data):
+ self._connection = connection
+
+ self.type = list(data.keys())[0]
+ self.data = data[self.type]
+ self.id = self.data['id']
+ self.unread = self.data['unread']
+
+ def __getitem__(self, key):
+ return self.data[key]
+
+ def mark(self, unread=False):
+ """Marks notification to read/unread.
+ Marks notification to read if `unread` is False.
+ Marks notification to unread if `unread` is True.
+
+ :param unread: which state set for notification
+ :type unread: bool
+ """
+ headers = {'x-csrf-token': self._connection.get_token()}
+ params = {'set_unread': json.dumps(unread)}
+ print(json.dumps(False))
+ self._connection.put('notifications/{0}'.format(self['id']), params=params, headers=headers)
+ self.data['unread'] = unread
+
+
class Post():
"""This class represents a post.
--- /dev/null
+#!/usr/bin/env python3
+
+
+import time
+from diaspy.models import Notification
+
+
+"""This module abstracts notifications.
+"""
+
+
+class Notifications():
+ """This class represents notifications of a user.
+ """
+ def __init__(self, connection):
+ self._connection = connection
+ self._notifications = self.get()
+
+ def __iter__(self):
+ return iter(self._notifications)
+
+ def last(self):
+ """Returns list of most recent notifications.
+ """
+ params = {'per_page': 5, '_': int(round(time.time(), 3)*1000)}
+ headers = {'x-csrf-token': self._connection.get_token()}
+
+ request = self._connection.get('notifications.json', headers=headers, params=params)
+
+ if request.status_code != 200:
+ raise Exception('status code: {0}: cannot retreive notifications'.format(request.status_code))
+ return [Notification(self._connection, n) for n in request.json()]
+
+
+ def get(self, per_page=5, page=1):
+ """Returns list of notifications.
+ """
+ params = {'per_page': per_page, 'page': page}
+ headers = {'x-csrf-token': self._connection.get_token()}
+
+ request = self._connection.get('notifications.json', headers=headers, params=params)
+
+ if request.status_code != 200:
+ raise Exception('status code: {0}: cannot retreive notifications'.format(request.status_code))
+
+ notifications = request.json()
+ return [Notification(self._connection, n) for n in notifications]
def __contains__(self, post):
"""Returns True if stream contains given post.
"""
- if type(post) is not Post:
- raise TypeError('stream can contain only posts: checked for {0}'.format(type(post)))
return post in self._stream
def __iter__(self):
"""
self._stream = self._obtain()
- def more(self):
+ def more(self, max_time=0):
"""Tries to download more (older ones) Posts from Stream.
"""
- self.max_time -= 3000000
+ if not max_time: max_time = self.max_time - 3000000
+ self.max_time = max_time
params = {'max_time': self.max_time}
- request = self._connection.get('{0}', params=params)
+ request = self._connection.get(self._location, params=params)
if request.status_code != 200:
raise Exception('wrong status code: {0}'.format(request.status_code))
--- /dev/null
+In order to get list of notifications use `diaspy.notifications.Notifications()` object.
+
+Single notification (it should be obvious that it requires object of its own) is located in
+`diaspy.models.Notification()`.
+
+
+----
+
+
+To mark notification as `read` or `unread` use `Notification().mark()` method.
+It has only one parameter - `unread` which is boolean.