* Removed `backtime` parameter from `diaspy.streams.Generic()` it's `more()` method...
[diaspy.git] / diaspy / streams.py
1 """Docstrings for this module are taken from:
2 https://gist.github.com/MrZYX/01c93096c30dc44caf71
3
4 Documentation for D* JSON API taken from:
5 http://pad.spored.de/ro/r.qWmvhSZg7rk4OQam
6 """
7
8
9 import json
10 import time
11 from diaspy.models import Post, Aspect
12 from diaspy import errors
13
14 """
15 Remember created_at is in UTC
16
17 We need this to get a UTC timestamp from the latest loaded post in the
18 stream, so we can use it for the more() function.
19 """
20 import dateutil.parser
21 def parse_utc_timestamp(date_str):
22 return round(dateutil.parser.parse(date_str).timestamp())
23
24 class Generic():
25 """Object representing generic stream.
26 """
27 _location = 'stream.json'
28
29 def __init__(self, connection, location='', fetch=True):
30 """
31 :param connection: Connection() object
32 :type connection: diaspy.connection.Connection
33 :param location: location of json (optional)
34 :type location: str
35 :param fetch: will call .fill() if true
36 :type fetch: bool
37 """
38 self._connection = connection
39 if location: self._location = location
40 self.latest = None
41 self._stream = []
42 # since epoch
43 self.max_time = int(time.mktime(time.gmtime()))
44 if fetch: self.fill()
45
46 def __contains__(self, post):
47 """Returns True if stream contains given post.
48 """
49 return post in self._stream
50
51 def __iter__(self):
52 """Provides iterable interface for stream.
53 """
54 return iter(self._stream)
55
56 def __getitem__(self, n):
57 """Returns n-th item in Stream.
58 """
59 return self._stream[n]
60
61 def __len__(self):
62 """Returns length of the Stream.
63 """
64 return len(self._stream)
65
66 def _obtain(self, max_time=0, suppress=True):
67 """Obtains stream from pod.
68
69 suppress:bool - suppress post-fetching errors (e.g. 404)
70 """
71 params = {}
72 if max_time:
73 if self.latest == None:
74 self.latest = int(time.mktime(time.gmtime()) * 1000)
75 self.latest -= max_time
76 else: self.latest += 1
77 params['max_time'] = max_time
78 params['_'] = self.latest
79 request = self._connection.get(self._location, params=params)
80 if request.status_code != 200:
81 raise errors.StreamError('wrong status code: {0}'.format(request.status_code))
82 posts = []
83 latest_time = None # Used to get the created_at from the latest posts we received.
84 for post in request.json():
85 try:
86 comments = False
87 if post['interactions']['comments_count'] > 3: comments = True
88 posts.append(Post(self._connection, id=post['id'], guid=post['guid'], fetch=False, comments=comments, post_data=post))
89 if post['created_at']: latest_time = post['created_at']
90 except errors.PostError:
91 if not suppress:
92 raise
93 if latest_time:
94 self.max_time = parse_utc_timestamp( latest_time )
95 return posts
96
97 def _expand(self, new_stream):
98 """Appends older posts to stream.
99 """
100 guids = [post.guid for post in self._stream]
101 stream = self._stream
102 for post in new_stream:
103 if post.guid not in guids:
104 stream.append(post)
105 guids.append(post.guid)
106 self._stream = stream
107
108 def _update(self, new_stream):
109 """Updates stream with new posts.
110 """
111 guids = [post.guid for post in self._stream]
112
113 stream = self._stream
114 for i in range(len(new_stream)):
115 if new_stream[-i].guid not in guids:
116 stream = [new_stream[-i]] + stream
117 guids.append(new_stream[-i].guid)
118 self._stream = stream
119
120 def clear(self):
121 """Set stream to empty.
122 """
123 self._stream = []
124
125 def purge(self):
126 """Removes all unexistent posts from stream.
127 """
128 stream = []
129 for post in self._stream:
130 deleted = False
131 try:
132 # error will tell us that the post has been deleted
133 post.update()
134 except Exception:
135 deleted = True
136 finally:
137 if not deleted: stream.append(post)
138 self._stream = stream
139
140 def update(self):
141 """Updates stream with new posts.
142 """
143 self._update(self._obtain())
144
145 def fill(self):
146 """Fills the stream with posts.
147
148 **Notice:** this will create entirely new list of posts.
149 If you want to preseve posts already present in stream use update().
150 """
151 self._stream = self._obtain()
152
153 def more(self, max_time=0):
154 """Tries to download more (older posts) posts from Stream.
155
156 :param max_time: seconds since epoch (optional, diaspy'll figure everything on its own)
157 :type max_time: int
158 """
159
160 if not max_time: max_time = self.max_time
161 self.max_time = max_time
162 new_stream = self._obtain(max_time=max_time)
163 self._expand(new_stream)
164
165 def full(self, backtime=86400, retry=42, callback=None):
166 """Fetches full stream - containing all posts.
167 WARNING: this is a **VERY** long running function.
168 Use callback parameter to access information about the stream during its
169 run.
170
171 Default backtime is one day. But sometimes user might not have any activity for longer
172 period (in the beginning of my D* activity I was posting once a month or so).
173 The role of retry is to hadle such situations by trying to go further back in time.
174 If a post is found the counter is restored.
175
176 Default retry is 42. If you don't know why go to the nearest library (or to the nearest
177 Piratebay mirror) and grab a copy of "A Hitchhiker's Guide to the Galaxy" and read the
178 book to find out. This will also increase your level of geekiness and you'll have a
179 great time reading the book.
180
181 :param backtime: how many seconds to substract each time
182 :type backtime: int
183 :param retry: how many times the functin should look deeper than your last post
184 :type retry: int
185 :param callback: callable taking diaspy.streams.Generic as an argument
186 :returns: integer, lenght of the stream
187 """
188 oldstream = self.copy()
189 self.more()
190 while len(oldstream) < len(self):
191 oldstream = self.copy()
192 if callback is not None: callback(self)
193 self.more(backtime=backtime)
194 if len(oldstream) < len(self): continue
195 # but if no posts were found start retrying...
196 print('[diaspy] retrying... {0}'.format(retry))
197 n = retry
198 while n > 0:
199 # try to get even more posts...
200 self.more(backtime=backtime)
201 # check if it was a success...
202 if len(oldstream) < len(self):
203 # and if so restore normal order of execution by
204 # going one loop higher
205 break
206 oldstream = self.copy()
207 # if it was not a success substract one backtime, keep calm and
208 # try going further back in time...
209 n -= 1
210 # check the comment below
211 # no commented code should be present in good software
212 #if len(oldstream) == len(self): break
213 return len(self)
214
215 def copy(self):
216 """Returns copy (list of posts) of current stream.
217 """
218 return [p for p in self._stream]
219
220 def json(self, comments=False, **kwargs):
221 """Returns JSON encoded string containing stream's data.
222
223 :param comments: to include comments or not to include 'em, that is the question this param holds answer to
224 :type comments: bool
225 """
226 stream = [post for post in self._stream]
227 if comments:
228 for i, post in enumerate(stream):
229 post._fetchcomments()
230 comments = [c.data for c in post.comments]
231 post['interactions']['comments'] = comments
232 stream[i] = post
233 stream = [post._data for post in stream]
234 return json.dumps(stream, **kwargs)
235
236
237 class Outer(Generic):
238 """Object used by diaspy.models.User to represent
239 stream of other user.
240 """
241 def __init__(self, connection, guid, fetch=True):
242 location = 'people/{}/stream.json'.format(guid)
243 super().__init__(connection, location, fetch)
244
245 class Stream(Generic):
246 """The main stream containing the combined posts of the
247 followed users and tags and the community spotlights posts
248 if the user enabled those.
249 """
250 location = 'stream.json'
251
252 def post(self, text='', aspect_ids='public', photos=None, photo='', poll_question=None, poll_answers=None, location_coords=None, provider_display_name=''):
253 """This function sends a post to an aspect.
254 If both `photo` and `photos` are specified `photos` takes precedence.
255
256 :param text: Text to post.
257 :type text: str
258
259 :param aspect_ids: Aspect ids to send post to.
260 :type aspect_ids: str
261
262 :param photo: filename of photo to post
263 :type photo: str
264
265 :param photos: id of photo to post (obtained from _photoupload())
266 :type photos: int
267
268 :param provider_display_name: name of provider displayed under the post
269 :type provider_display_name: str
270
271 :param poll_question: Question string
272 :type poll_question: str
273
274 :param poll_answers: Anwsers to the poll
275 :type poll_answers: list with strings
276
277 :param location_coords: TODO
278 :type location_coords: TODO
279
280 :returns: diaspy.models.Post -- the Post which has been created
281 """
282 data = {}
283 data['aspect_ids'] = aspect_ids
284 data['status_message'] = {'text': text, 'provider_display_name': provider_display_name}
285 if photo: data['photos'] = self._photoupload(photo)
286 if photos: data['photos'] = photos
287 if poll_question and poll_answers:
288 data['poll_question'] = poll_question
289 data['poll_answers'] = poll_answers
290 if location_coords: data['location_coords'] = location_coords
291
292 request = self._connection.post('status_messages',
293 data=json.dumps(data),
294 headers={'content-type': 'application/json',
295 'accept': 'application/json',
296 'x-csrf-token': repr(self._connection)})
297 if request.status_code != 201:
298 raise Exception('{0}: Post could not be posted.'.format(request.status_code))
299 post_json = request.json()
300 post = Post(self._connection, id=post_json['id'], guid=post_json['guid'], post_data=post_json)
301 return post
302
303 def _photoupload(self, filename, aspects=[]):
304 """Uploads picture to the pod.
305
306 :param filename: path to picture file
307 :type filename: str
308 :param aspect_ids: list of ids of aspects to which you want to upload this photo
309 :type aspect_ids: list of integers
310
311 :returns: id of the photo being uploaded
312 """
313 data = open(filename, 'rb')
314 image = data.read()
315 data.close()
316
317 params = {}
318 params['photo[pending]'] = 'true'
319 params['set_profile_image'] = ''
320 params['qqfile'] = filename
321 if not aspects: aspects = self._connection.getUserData()['aspects']
322 for i, aspect in enumerate(aspects):
323 params['photo[aspect_ids][{0}]'.format(i)] = aspect['id']
324
325 headers = {'content-type': 'application/octet-stream',
326 'x-csrf-token': repr(self._connection),
327 'x-file-name': filename}
328
329 request = self._connection.post('photos', data=image, params=params, headers=headers)
330 if request.status_code != 200:
331 raise errors.StreamError('photo cannot be uploaded: {0}'.format(request.status_code))
332 return request.json()['data']['photo']['id']
333
334
335 class Activity(Stream):
336 """Stream representing user's activity.
337 """
338 _location = 'activity.json'
339
340 def _delid(self, id):
341 """Deletes post with given id.
342 """
343 post = None
344 for p in self._stream:
345 if p['id'] == id:
346 post = p
347 break
348 if post is not None: post.delete()
349
350 def delete(self, post):
351 """Deletes post from users activity.
352 `post` can be either post id or Post()
353 object which will be identified and deleted.
354 After deleting post the stream will be purged.
355
356 :param post: post identifier
357 :type post: str, diaspy.models.Post
358 """
359 if type(post) == str: self._delid(post)
360 elif type(post) == Post: post.delete()
361 else: raise TypeError('this method accepts str or Post types: {0} given')
362 self.purge()
363
364
365 class Aspects(Generic):
366 """This stream contains the posts filtered by
367 the specified aspect IDs. You can choose the aspect IDs with
368 the parameter `aspect_ids` which value should be
369 a comma seperated list of aspect IDs.
370 If the parameter is ommitted all aspects are assumed.
371 An example call would be `aspects.json?aspect_ids=23,5,42`
372 """
373 _location = 'aspects.json'
374
375 def getAspectID(self, aspect_name):
376 """Returns id of an aspect of given name.
377 Returns -1 if aspect is not found.
378
379 :param aspect_name: aspect name (must be spelled exactly as when created)
380 :type aspect_name: str
381 :returns: int
382 """
383 id = -1
384 aspects = self._connection.getUserData()['aspects']
385 for aspect in aspects:
386 if aspect['name'] == aspect_name: id = aspect['id']
387 return id
388
389 def filter(self, ids):
390 """Filters posts by given aspect ids.
391
392 :parameter ids: list of apsect ids
393 :type ids: list of integers
394 """
395 self._location = 'aspects.json?a_ids[]=' + '{}'.format('&a_ids[]='.join(ids))
396 self.fill() # this will create entirely new list of posts.
397
398 def add(self, aspect_name, visible=0):
399 """This function adds a new aspect.
400 Status code 422 is accepted because it is returned by D* when
401 you try to add aspect already present on your aspect list.
402
403 :param aspect_name: name of aspect to create
404 :param visible: whether the contacts in this aspect are visible to each other or not
405
406 :returns: Aspect() object of just created aspect
407 """
408 data = {'authenticity_token': repr(self._connection),
409 'aspect[name]': aspect_name,
410 'aspect[contacts_visible]': visible}
411
412 request = self._connection.post('aspects', data=data)
413 if request.status_code not in [200, 422]:
414 raise Exception('wrong status code: {0}'.format(request.status_code))
415
416 id = self.getAspectID(aspect_name)
417 return Aspect(self._connection, id)
418
419 def remove(self, id=-1, name=''):
420 """This method removes an aspect.
421 You can give it either id or name of the aspect.
422 When both are specified, id takes precedence over name.
423
424 Status code 500 is accepted because although the D* will
425 go nuts it will remove the aspect anyway.
426
427 :param aspect_id: id fo aspect to remove
428 :type aspect_id: int
429 :param name: name of aspect to remove
430 :type name: str
431 """
432 if id == -1 and name: id = self.getAspectID(name)
433 data = {'_method': 'delete',
434 'authenticity_token': repr(self._connection)}
435 request = self._connection.post('aspects/{0}'.format(id), data=data)
436 if request.status_code not in [200, 302, 500]:
437 raise Exception('wrong status code: {0}: cannot remove aspect'.format(request.status_code))
438
439
440 class Commented(Generic):
441 """This stream contains all posts
442 the user has made a comment on.
443 """
444 _location = 'commented.json'
445
446
447 class Liked(Generic):
448 """This stream contains all posts the user liked.
449 """
450 _location = 'liked.json'
451
452
453 class Mentions(Generic):
454 """This stream contains all posts
455 the user is mentioned in.
456 """
457 _location = 'mentions.json'
458
459
460 class FollowedTags(Generic):
461 """This stream contains all posts
462 containing tags the user is following.
463 """
464 _location = 'followed_tags.json'
465
466 def get(self):
467 """Returns list of followed tags.
468 """
469 return []
470
471 def remove(self, tag_id):
472 """Stop following a tag.
473
474 :param tag_id: tag id
475 :type tag_id: int
476 """
477 data = {'authenticity_token': self._connection.get_token()}
478 request = self._connection.delete('tag_followings/{0}'.format(tag_id), data=data)
479 if request.status_code != 404:
480 raise Exception('wrong status code: {0}'.format(request.status_code))
481
482 def add(self, tag_name):
483 """Follow new tag.
484 Error code 403 is accepted because pods respod with it when request
485 is sent to follow a tag that a user already follows.
486
487 :param tag_name: tag name
488 :type tag_name: str
489 :returns: int (response code)
490 """
491 data = {'name': tag_name,
492 'authenticity_token': repr(self._connection),
493 }
494 headers = {'content-type': 'application/json',
495 'x-csrf-token': repr(self._connection),
496 'accept': 'application/json'
497 }
498
499 request = self._connection.post('tag_followings', data=json.dumps(data), headers=headers)
500
501 if request.status_code not in [201, 403]:
502 raise Exception('wrong error code: {0}'.format(request.status_code))
503 return request.status_code
504
505
506 class Tag(Generic):
507 """This stream contains all posts containing a tag.
508 """
509 def __init__(self, connection, tag, fetch=True):
510 """
511 :param connection: Connection() object
512 :type connection: diaspy.connection.Connection
513 :param tag: tag name
514 :type tag: str
515 """
516 self._connection = connection
517 self._location = 'tags/{0}.json'.format(tag)
518 if fetch: self.fill()