Code fixes, new features and refactoring in the field of user data and
[diaspy.git] / diaspy / streams.py
1 """Docstrings for this module are taken from:
2 https://gist.github.com/MrZYX/01c93096c30dc44caf71
3
4 Documentation for D* JSON API taken from:
5 http://pad.spored.de/ro/r.qWmvhSZg7rk4OQam
6 """
7
8
9 import json
10 import time
11 from diaspy.models import Post, Aspect
12 from diaspy import errors
13
14
15 class Generic():
16 """Object representing generic stream.
17 """
18 _location = 'stream.json'
19
20 def __init__(self, connection, location='', fetch=True):
21 """
22 :param connection: Connection() object
23 :type connection: diaspy.connection.Connection
24 :param location: location of json (optional)
25 :type location: str
26 :param fetch: will call .fill() if true
27 :type fetch: bool
28 """
29 self._connection = connection
30 if location: self._location = location
31 self._stream = []
32 # since epoch
33 self.max_time = int(time.mktime(time.gmtime()))
34 if fetch: self.fill()
35
36 def __contains__(self, post):
37 """Returns True if stream contains given post.
38 """
39 return post in self._stream
40
41 def __iter__(self):
42 """Provides iterable interface for stream.
43 """
44 return iter(self._stream)
45
46 def __getitem__(self, n):
47 """Returns n-th item in Stream.
48 """
49 return self._stream[n]
50
51 def __len__(self):
52 """Returns length of the Stream.
53 """
54 return len(self._stream)
55
56 def _obtain(self, max_time=0):
57 """Obtains stream from pod.
58 """
59 params = {}
60 if max_time:
61 params['max_time'] = max_time
62 params['_'] = int(time.time() * 1000)
63 request = self._connection.get(self._location, params=params)
64 if request.status_code != 200:
65 raise errors.StreamError('wrong status code: {0}'.format(request.status_code))
66 return [Post(self._connection, guid=post['guid']) for post in request.json()]
67
68 def _expand(self, new_stream):
69 """Appends older posts to stream.
70 """
71 ids = [post.id for post in self._stream]
72 stream = self._stream
73 for post in new_stream:
74 if post.id not in ids:
75 stream.append(post)
76 ids.append(post.id)
77 self._stream = stream
78
79 def _update(self, new_stream):
80 """Updates stream with new posts.
81 """
82 ids = [post.id for post in self._stream]
83
84 stream = self._stream
85 for i in range(len(new_stream)):
86 if new_stream[-i].id not in ids:
87 stream = [new_stream[-i]] + stream
88 ids.append(new_stream[-i].id)
89 self._stream = stream
90
91 def clear(self):
92 """Set stream to empty.
93 """
94 self._stream = []
95
96 def purge(self):
97 """Removes all unexistent posts from stream.
98 """
99 stream = []
100 for post in self._stream:
101 deleted = False
102 try:
103 # error will tell us that the post has been deleted
104 post.update()
105 except Exception:
106 deleted = True
107 finally:
108 if not deleted: stream.append(post)
109 self._stream = stream
110
111 def update(self):
112 """Updates stream with new posts.
113 """
114 self._update(self._obtain())
115
116 def fill(self):
117 """Fills the stream with posts.
118
119 **Notice:** this will create entirely new list of posts.
120 If you want to preseve posts already present in stream use update().
121 """
122 self._stream = self._obtain()
123
124 def more(self, max_time=0, backtime=84600):
125 """Tries to download more (older posts) posts from Stream.
126
127 :param backtime: how many seconds substract each time (defaults to one day)
128 :type backtime: int
129 :param max_time: seconds since epoch (optional, diaspy'll figure everything on its own)
130 :type max_time: int
131 """
132 if not max_time: max_time = self.max_time - backtime
133 self.max_time = max_time
134 new_stream = self._obtain(max_time=max_time)
135 self._expand(new_stream)
136
137 def full(self, backtime=84600, retry=42, callback=None):
138 """Fetches full stream - containing all posts.
139 WARNING: this is a **VERY** long running function.
140 Use callback parameter to access information about the stream during its
141 run.
142
143 Default backtime is one day. But sometimes user might not have any activity for longer
144 period (in the beginning of my D* activity I was posting once a month or so).
145 The role of retry is to hadle such situations by trying to go further back in time.
146 If a post is found the counter is restored.
147
148 Default retry is 42. If you don't know why go to the nearest library (or to the nearest
149 Piratebay mirror) and grab a copy of "A Hitchhiker's Guide to the Galaxy" and read the
150 book to find out. This will also increase your level of geekiness and you'll have a
151 great time reading the book.
152
153 :param backtime: how many seconds to substract each time
154 :type backtime: int
155 :param retry: how many times the functin should look deeper than your last post
156 :type retry: int
157 :param callback: callable taking diaspy.streams.Generic as an argument
158 :returns: integer, lenght of the stream
159 """
160 oldstream = self.copy()
161 self.more()
162 while len(oldstream) < len(self):
163 oldstream = self.copy()
164 if callback is not None: callback(self)
165 self.more(backtime=backtime)
166 if len(oldstream) < len(self): continue
167 # but if no posts were found start retrying...
168 print('retrying... {0}'.format(retry))
169 n = retry
170 while n > 0:
171 print('\t', n, self.max_time)
172 # try to get even more posts...
173 self.more(backtime=backtime)
174 print('\t', len(oldstream), len(self))
175 # check if it was a success...
176 if len(oldstream) < len(self):
177 # and if so restore normal order of execution by
178 # going one loop higher
179 break
180 oldstream = self.copy()
181 # if it was not a success substract one backtime, keep calm and
182 # try going further back in time...
183 n -= 1
184 # check the comment below
185 # no commented code should be present in good software
186 #if len(oldstream) == len(self): break
187 return len(self)
188
189 def copy(self):
190 """Returns copy (list of posts) of current stream.
191 """
192 return [p for p in self._stream]
193
194 def json(self, comments=False, **kwargs):
195 """Returns JSON encoded string containing stream's data.
196
197 :param comments: to include comments or not to include 'em, that is the question this param holds answer to
198 :type comments: bool
199 """
200 stream = [post for post in self._stream]
201 if comments:
202 for i, post in enumerate(stream):
203 post._fetchcomments()
204 comments = [c.data for c in post.comments]
205 post['interactions']['comments'] = comments
206 stream[i] = post
207 stream = [post._data for post in stream]
208 return json.dumps(stream, **kwargs)
209
210
211 class Outer(Generic):
212 """Object used by diaspy.models.User to represent
213 stream of other user.
214 """
215 def _obtain(self, max_time=0):
216 """Obtains stream from pod.
217 """
218 params = {}
219 if max_time: params['max_time'] = max_time
220 request = self._connection.get(self._location, params=params)
221 if request.status_code != 200:
222 raise errors.StreamError('wrong status code: {0}'.format(request.status_code))
223 return [Post(self._connection, post['id']) for post in request.json()]
224
225
226 class Stream(Generic):
227 """The main stream containing the combined posts of the
228 followed users and tags and the community spotlights posts
229 if the user enabled those.
230 """
231 location = 'stream.json'
232
233 def post(self, text='', aspect_ids='public', photos=None, photo='', provider_display_name=''):
234 """This function sends a post to an aspect.
235 If both `photo` and `photos` are specified `photos` takes precedence.
236
237 :param text: Text to post.
238 :type text: str
239 :param aspect_ids: Aspect ids to send post to.
240 :type aspect_ids: str
241 :param photo: filename of photo to post
242 :type photo: str
243 :param photos: id of photo to post (obtained from _photoupload())
244 :type photos: int
245 :param provider_display_name: name of provider displayed under the post
246 :type provider_display_name: str
247
248 :returns: diaspy.models.Post -- the Post which has been created
249 """
250 data = {}
251 data['aspect_ids'] = aspect_ids
252 data['status_message'] = {'text': text, 'provider_display_name': provider_display_name}
253 if photo: data['photos'] = self._photoupload(photo)
254 if photos: data['photos'] = photos
255
256 request = self._connection.post('status_messages',
257 data=json.dumps(data),
258 headers={'content-type': 'application/json',
259 'accept': 'application/json',
260 'x-csrf-token': repr(self._connection)})
261 if request.status_code != 201:
262 raise Exception('{0}: Post could not be posted.'.format(request.status_code))
263 post = Post(self._connection, request.json()['id'])
264 return post
265
266 def _photoupload(self, filename, aspects=[]):
267 """Uploads picture to the pod.
268
269 :param filename: path to picture file
270 :type filename: str
271 :param aspect_ids: list of ids of aspects to which you want to upload this photo
272 :type aspect_ids: list of integers
273
274 :returns: id of the photo being uploaded
275 """
276 data = open(filename, 'rb')
277 image = data.read()
278 data.close()
279
280 params = {}
281 params['photo[pending]'] = 'true'
282 params['set_profile_image'] = ''
283 params['qqfile'] = filename
284 if not aspects: aspects = self._connection.getUserData()['aspects']
285 for i, aspect in enumerate(aspects):
286 params['photo[aspect_ids][{0}]'.format(i)] = aspect['id']
287
288 headers = {'content-type': 'application/octet-stream',
289 'x-csrf-token': repr(self._connection),
290 'x-file-name': filename}
291
292 request = self._connection.post('photos', data=image, params=params, headers=headers)
293 if request.status_code != 200:
294 raise errors.StreamError('photo cannot be uploaded: {0}'.format(request.status_code))
295 return request.json()['data']['photo']['id']
296
297
298 class Activity(Stream):
299 """Stream representing user's activity.
300 """
301 _location = 'activity.json'
302
303 def _delid(self, id):
304 """Deletes post with given id.
305 """
306 post = None
307 for p in self._stream:
308 if p['id'] == id:
309 post = p
310 break
311 if post is not None: post.delete()
312
313 def delete(self, post):
314 """Deletes post from users activity.
315 `post` can be either post id or Post()
316 object which will be identified and deleted.
317 After deleting post the stream will be purged.
318
319 :param post: post identifier
320 :type post: str, diaspy.models.Post
321 """
322 if type(post) == str: self._delid(post)
323 elif type(post) == Post: post.delete()
324 else: raise TypeError('this method accepts str or Post types: {0} given')
325 self.purge()
326
327
328 class Aspects(Generic):
329 """This stream contains the posts filtered by
330 the specified aspect IDs. You can choose the aspect IDs with
331 the parameter `aspect_ids` which value should be
332 a comma seperated list of aspect IDs.
333 If the parameter is ommitted all aspects are assumed.
334 An example call would be `aspects.json?aspect_ids=23,5,42`
335 """
336 _location = 'aspects.json'
337
338 def getAspectID(self, aspect_name):
339 """Returns id of an aspect of given name.
340 Returns -1 if aspect is not found.
341
342 :param aspect_name: aspect name (must be spelled exactly as when created)
343 :type aspect_name: str
344 :returns: int
345 """
346 id = -1
347 aspects = self._connection.getUserData()['aspects']
348 for aspect in aspects:
349 if aspect['name'] == aspect_name: id = aspect['id']
350 return id
351
352 def filter(self, ids):
353 """Filters posts by given aspect ids.
354
355 :parameter ids: list of apsect ids
356 :type ids: list of integers
357 """
358 self._location = 'aspects.json' + '?{0}'.format(','.join(ids))
359 self.fill()
360
361 def add(self, aspect_name, visible=0):
362 """This function adds a new aspect.
363 Status code 422 is accepted because it is returned by D* when
364 you try to add aspect already present on your aspect list.
365
366 :param aspect_name: name of aspect to create
367 :param visible: whether the contacts in this aspect are visible to each other or not
368
369 :returns: Aspect() object of just created aspect
370 """
371 data = {'authenticity_token': repr(self._connection),
372 'aspect[name]': aspect_name,
373 'aspect[contacts_visible]': visible}
374
375 request = self._connection.post('aspects', data=data)
376 if request.status_code not in [200, 422]:
377 raise Exception('wrong status code: {0}'.format(request.status_code))
378
379 id = self.getAspectID(aspect_name)
380 return Aspect(self._connection, id)
381
382 def remove(self, id=-1, name=''):
383 """This method removes an aspect.
384 You can give it either id or name of the aspect.
385 When both are specified, id takes precedence over name.
386
387 Status code 500 is accepted because although the D* will
388 go nuts it will remove the aspect anyway.
389
390 :param aspect_id: id fo aspect to remove
391 :type aspect_id: int
392 :param name: name of aspect to remove
393 :type name: str
394 """
395 if id == -1 and name: id = self.getAspectID(name)
396 data = {'_method': 'delete',
397 'authenticity_token': repr(self._connection)}
398 request = self._connection.post('aspects/{0}'.format(id), data=data)
399 if request.status_code not in [200, 302, 500]:
400 raise Exception('wrong status code: {0}: cannot remove aspect'.format(request.status_code))
401
402
403 class Commented(Generic):
404 """This stream contains all posts
405 the user has made a comment on.
406 """
407 _location = 'commented.json'
408
409
410 class Liked(Generic):
411 """This stream contains all posts the user liked.
412 """
413 _location = 'liked.json'
414
415
416 class Mentions(Generic):
417 """This stream contains all posts
418 the user is mentioned in.
419 """
420 _location = 'mentions.json'
421
422
423 class FollowedTags(Generic):
424 """This stream contains all posts
425 containing tags the user is following.
426 """
427 _location = 'followed_tags.json'
428
429 def get(self):
430 """Returns list of followed tags.
431 """
432 return []
433
434 def remove(self, tag_id):
435 """Stop following a tag.
436
437 :param tag_id: tag id
438 :type tag_id: int
439 """
440 data = {'authenticity_token': self._connection.get_token()}
441 request = self._connection.delete('tag_followings/{0}'.format(tag_id), data=data)
442 if request.status_code != 404:
443 raise Exception('wrong status code: {0}'.format(request.status_code))
444
445 def add(self, tag_name):
446 """Follow new tag.
447 Error code 403 is accepted because pods respod with it when request
448 is sent to follow a tag that a user already follows.
449
450 :param tag_name: tag name
451 :type tag_name: str
452 :returns: int (response code)
453 """
454 data = {'name': tag_name,
455 'authenticity_token': repr(self._connection),
456 }
457 headers = {'content-type': 'application/json',
458 'x-csrf-token': repr(self._connection),
459 'accept': 'application/json'
460 }
461
462 request = self._connection.post('tag_followings', data=json.dumps(data), headers=headers)
463
464 if request.status_code not in [201, 403]:
465 raise Exception('wrong error code: {0}'.format(request.status_code))
466 return request.status_code
467
468
469 class Tag(Generic):
470 """This stream contains all posts containing a tag.
471 """
472 def __init__(self, connection, tag, fetch=True):
473 """
474 :param connection: Connection() object
475 :type connection: diaspy.connection.Connection
476 :param tag: tag name
477 :type tag: str
478 """
479 self._connection = connection
480 self._location = 'tags/{0}.json'.format(tag)
481 if fetch: self.fill()