request = self.get('stream')
token = self._token_regex.search(request.text).group(1)
return token
-
+
def get_token(self, new=False):
"""This function returns a token needed for authentication in most cases.
Each time it is run a _fetchtoken() is called and refreshed token is stored.
import time
+
from diaspy.models import Notification
def __iter__(self):
return iter(self._notifications)
+ def __getitem__(self, n):
+ return self._notifications[n]
+
def last(self):
"""Returns list of most recent notifications.
"""
raise Exception('status code: {0}: cannot retreive notifications'.format(request.status_code))
return [Notification(self._connection, n) for n in request.json()]
-
def get(self, per_page=5, page=1):
"""Returns list of notifications.
"""
headers = {'x-csrf-token': self._connection.get_token()}
request = self._connection.get('notifications.json', headers=headers, params=params)
-
+
if request.status_code != 200:
raise Exception('status code: {0}: cannot retreive notifications'.format(request.status_code))
if request.status_code not in [201, 403]:
raise Exception('wrong error code: {0}'.format(request.status_code))
return request.status_code
-
+
class Tag(Generic):
"""This stream contains all posts containing a tag.
import unittest
# failure to import any of the modules below indicates failed tests
+# =================================================================
# modules used by diaspy
-import requests
import re
+import requests
+import warnings
# actual diaspy code
import diaspy
def testGettingNotifications(self):
client = diaspy.client.Client(test_connection)
notifications = client.get_notifications()
- self.assertEqual(list, type(notifications))
- if notifications: self.assertEqual(dict, type(notifications[0]))
+ self.assertEqual(diaspy.notifications.Notifications, type(notifications))
+ if notifications: self.assertEqual(diaspy.models.Notification, type(notifications[0]))
def testGettingTag(self):
client = diaspy.client.Client(test_connection)
def testAspectsAdd(self):
aspects = diaspy.streams.Aspects(test_connection)
aspects.add(testconf.test_aspect_name_fake)
- testconf.test_aspect_id = aspects.add(testconf.test_aspect_name)
+ testconf.test_aspect_id = aspects.add(testconf.test_aspect_name).id
def testAspectsGettingID(self):
aspects = diaspy.streams.Aspects(test_connection)