IMPORTANT: these changes will require dateutil or pytz
[diaspy.git] / diaspy / streams.py
1 """Docstrings for this module are taken from:
2 https://gist.github.com/MrZYX/01c93096c30dc44caf71
3
4 Documentation for D* JSON API taken from:
5 http://pad.spored.de/ro/r.qWmvhSZg7rk4OQam
6 """
7
8
9 import json
10 import time
11 from diaspy.models import Post, Aspect
12 from diaspy import errors
13
14 """
15 Remember created_at is in UTC so I found two options to
16 convert/parse it to UTC timestamp: dateutil or pytz (found some
17 more but those libs aren't in default repo of main distro's)
18
19 We need this to get a UTC timestamp from the latest loaded post in the
20 stream, so we can use it for the more() function.
21 """
22 try:
23 import dateutil.parser
24 def parse_utc_timestamp(date_str):
25 return round(dateutil.parser.parse(date_str).timestamp())
26
27 except ImportError:
28 try:
29 from datetime import datetime
30 from pytz import timezone
31 def parse_utc_timestamp(date_str):
32 return round(datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone('UTC')).timestamp())
33
34 except ImportError:
35 print("Please install either python-dateutil or python-pytz")
36 exit # TODO raise exception
37
38 class Generic():
39 """Object representing generic stream.
40 """
41 _location = 'stream.json'
42
43 def __init__(self, connection, location='', fetch=True):
44 """
45 :param connection: Connection() object
46 :type connection: diaspy.connection.Connection
47 :param location: location of json (optional)
48 :type location: str
49 :param fetch: will call .fill() if true
50 :type fetch: bool
51 """
52 self._connection = connection
53 if location: self._location = location
54 self.latest = None
55 self._stream = []
56 # since epoch
57 self.max_time = int(time.mktime(time.gmtime()))
58 if fetch: self.fill()
59
60 def __contains__(self, post):
61 """Returns True if stream contains given post.
62 """
63 return post in self._stream
64
65 def __iter__(self):
66 """Provides iterable interface for stream.
67 """
68 return iter(self._stream)
69
70 def __getitem__(self, n):
71 """Returns n-th item in Stream.
72 """
73 return self._stream[n]
74
75 def __len__(self):
76 """Returns length of the Stream.
77 """
78 return len(self._stream)
79
80 def _obtain(self, max_time=0, suppress=True):
81 """Obtains stream from pod.
82
83 suppress:bool - suppress post-fetching errors (e.g. 404)
84 """
85 params = {}
86 if max_time:
87 if self.latest == None:
88 self.latest = int(time.mktime(time.gmtime()) * 1000)
89 self.latest -= max_time
90 else: self.latest += 1
91 params['max_time'] = max_time
92 params['_'] = self.latest
93 print("Diaspy _obtain.params: {}".format(params))
94 request = self._connection.get(self._location, params=params)
95 if request.status_code != 200:
96 raise errors.StreamError('wrong status code: {0}'.format(request.status_code))
97 posts = []
98 latest_time = None # Used to get the created_at from the latest posts we received.
99 for post in request.json():
100 try:
101 comments = False
102 if post['interactions']['comments_count'] > 3: comments = True
103 posts.append(Post(self._connection, id=post['id'], guid=post['guid'], fetch=False, comments=comments, post_data=post))
104 if post['created_at']: latest_time = post['created_at']
105 except errors.PostError:
106 if not suppress:
107 raise
108 if latest_time:
109 self.max_time = parse_utc_timestamp( latest_time )
110 return posts
111
112 def _expand(self, new_stream):
113 """Appends older posts to stream.
114 """
115 guids = [post.guid for post in self._stream]
116 stream = self._stream
117 for post in new_stream:
118 if post.guid not in guids:
119 stream.append(post)
120 guids.append(post.guid)
121 self._stream = stream
122
123 def _update(self, new_stream):
124 """Updates stream with new posts.
125 """
126 guids = [post.guid for post in self._stream]
127
128 stream = self._stream
129 for i in range(len(new_stream)):
130 if new_stream[-i].guid not in guids:
131 stream = [new_stream[-i]] + stream
132 guids.append(new_stream[-i].guid)
133 self._stream = stream
134
135 def clear(self):
136 """Set stream to empty.
137 """
138 self._stream = []
139
140 def purge(self):
141 """Removes all unexistent posts from stream.
142 """
143 stream = []
144 for post in self._stream:
145 deleted = False
146 try:
147 # error will tell us that the post has been deleted
148 post.update()
149 except Exception:
150 deleted = True
151 finally:
152 if not deleted: stream.append(post)
153 self._stream = stream
154
155 def update(self):
156 """Updates stream with new posts.
157 """
158 self._update(self._obtain())
159
160 def fill(self):
161 """Fills the stream with posts.
162
163 **Notice:** this will create entirely new list of posts.
164 If you want to preseve posts already present in stream use update().
165 """
166 self._stream = self._obtain()
167
168 def more(self, max_time=0, backtime=86400):
169 """Tries to download more (older posts) posts from Stream.
170
171 TODO backtime isn't used anymore.
172 Diaspora reference: https://github.com/diaspora/diaspora/blob/26a9e50ef935628c800f9a21d345057556fa5c31/app/helpers/stream_helper.rb#L48
173
174 :param backtime: how many seconds substract each time (defaults to one day)
175 :type backtime: int
176 :param max_time: seconds since epoch (optional, diaspy'll figure everything on its own)
177 :type max_time: int
178 """
179
180 if not max_time: max_time = self.max_time
181 self.max_time = max_time
182 new_stream = self._obtain(max_time=max_time)
183 self._expand(new_stream)
184
185 def full(self, backtime=86400, retry=42, callback=None):
186 """Fetches full stream - containing all posts.
187 WARNING: this is a **VERY** long running function.
188 Use callback parameter to access information about the stream during its
189 run.
190
191 Default backtime is one day. But sometimes user might not have any activity for longer
192 period (in the beginning of my D* activity I was posting once a month or so).
193 The role of retry is to hadle such situations by trying to go further back in time.
194 If a post is found the counter is restored.
195
196 Default retry is 42. If you don't know why go to the nearest library (or to the nearest
197 Piratebay mirror) and grab a copy of "A Hitchhiker's Guide to the Galaxy" and read the
198 book to find out. This will also increase your level of geekiness and you'll have a
199 great time reading the book.
200
201 :param backtime: how many seconds to substract each time
202 :type backtime: int
203 :param retry: how many times the functin should look deeper than your last post
204 :type retry: int
205 :param callback: callable taking diaspy.streams.Generic as an argument
206 :returns: integer, lenght of the stream
207 """
208 oldstream = self.copy()
209 self.more()
210 while len(oldstream) < len(self):
211 oldstream = self.copy()
212 if callback is not None: callback(self)
213 self.more(backtime=backtime)
214 if len(oldstream) < len(self): continue
215 # but if no posts were found start retrying...
216 print('retrying... {0}'.format(retry))
217 n = retry
218 while n > 0:
219 print('\t', n, self.max_time)
220 # try to get even more posts...
221 self.more(backtime=backtime)
222 print('\t', len(oldstream), len(self))
223 # check if it was a success...
224 if len(oldstream) < len(self):
225 # and if so restore normal order of execution by
226 # going one loop higher
227 break
228 oldstream = self.copy()
229 # if it was not a success substract one backtime, keep calm and
230 # try going further back in time...
231 n -= 1
232 # check the comment below
233 # no commented code should be present in good software
234 #if len(oldstream) == len(self): break
235 return len(self)
236
237 def copy(self):
238 """Returns copy (list of posts) of current stream.
239 """
240 return [p for p in self._stream]
241
242 def json(self, comments=False, **kwargs):
243 """Returns JSON encoded string containing stream's data.
244
245 :param comments: to include comments or not to include 'em, that is the question this param holds answer to
246 :type comments: bool
247 """
248 stream = [post for post in self._stream]
249 if comments:
250 for i, post in enumerate(stream):
251 post._fetchcomments()
252 comments = [c.data for c in post.comments]
253 post['interactions']['comments'] = comments
254 stream[i] = post
255 stream = [post._data for post in stream]
256 return json.dumps(stream, **kwargs)
257
258
259 class Outer(Generic):
260 """Object used by diaspy.models.User to represent
261 stream of other user.
262 """
263 def __init__(self, connection, guid, fetch=True):
264 location = 'people/{}/stream.json'.format(guid)
265 super().__init__(connection, location, fetch)
266
267 class Stream(Generic):
268 """The main stream containing the combined posts of the
269 followed users and tags and the community spotlights posts
270 if the user enabled those.
271 """
272 location = 'stream.json'
273
274 def post(self, text='', aspect_ids='public', photos=None, photo='', poll_question=None, poll_answers=None, location_coords=None, provider_display_name=''):
275 """This function sends a post to an aspect.
276 If both `photo` and `photos` are specified `photos` takes precedence.
277
278 :param text: Text to post.
279 :type text: str
280
281 :param aspect_ids: Aspect ids to send post to.
282 :type aspect_ids: str
283
284 :param photo: filename of photo to post
285 :type photo: str
286
287 :param photos: id of photo to post (obtained from _photoupload())
288 :type photos: int
289
290 :param provider_display_name: name of provider displayed under the post
291 :type provider_display_name: str
292
293 :param poll_question: Question string
294 :type poll_question: str
295
296 :param poll_answers: Anwsers to the poll
297 :type poll_answers: list with strings
298
299 :param location_coords: TODO
300 :type location_coords: TODO
301
302 :returns: diaspy.models.Post -- the Post which has been created
303 """
304 data = {}
305 data['aspect_ids'] = aspect_ids
306 data['status_message'] = {'text': text, 'provider_display_name': provider_display_name}
307 if photo: data['photos'] = self._photoupload(photo)
308 if photos: data['photos'] = photos
309 if poll_question and poll_answers:
310 data['poll_question'] = poll_question
311 data['poll_answers'] = poll_answers
312 if location_coords: data['location_coords'] = location_coords
313
314 request = self._connection.post('status_messages',
315 data=json.dumps(data),
316 headers={'content-type': 'application/json',
317 'accept': 'application/json',
318 'x-csrf-token': repr(self._connection)})
319 if request.status_code != 201:
320 raise Exception('{0}: Post could not be posted.'.format(request.status_code))
321 post_json = request.json()
322 post = Post(self._connection, id=post_json['id'], guid=post_json['guid'], post_data=post_json)
323 return post
324
325 def _photoupload(self, filename, aspects=[]):
326 """Uploads picture to the pod.
327
328 :param filename: path to picture file
329 :type filename: str
330 :param aspect_ids: list of ids of aspects to which you want to upload this photo
331 :type aspect_ids: list of integers
332
333 :returns: id of the photo being uploaded
334 """
335 data = open(filename, 'rb')
336 image = data.read()
337 data.close()
338
339 params = {}
340 params['photo[pending]'] = 'true'
341 params['set_profile_image'] = ''
342 params['qqfile'] = filename
343 if not aspects: aspects = self._connection.getUserData()['aspects']
344 for i, aspect in enumerate(aspects):
345 params['photo[aspect_ids][{0}]'.format(i)] = aspect['id']
346
347 headers = {'content-type': 'application/octet-stream',
348 'x-csrf-token': repr(self._connection),
349 'x-file-name': filename}
350
351 request = self._connection.post('photos', data=image, params=params, headers=headers)
352 if request.status_code != 200:
353 raise errors.StreamError('photo cannot be uploaded: {0}'.format(request.status_code))
354 return request.json()['data']['photo']['id']
355
356
357 class Activity(Stream):
358 """Stream representing user's activity.
359 """
360 _location = 'activity.json'
361
362 def _delid(self, id):
363 """Deletes post with given id.
364 """
365 post = None
366 for p in self._stream:
367 if p['id'] == id:
368 post = p
369 break
370 if post is not None: post.delete()
371
372 def delete(self, post):
373 """Deletes post from users activity.
374 `post` can be either post id or Post()
375 object which will be identified and deleted.
376 After deleting post the stream will be purged.
377
378 :param post: post identifier
379 :type post: str, diaspy.models.Post
380 """
381 if type(post) == str: self._delid(post)
382 elif type(post) == Post: post.delete()
383 else: raise TypeError('this method accepts str or Post types: {0} given')
384 self.purge()
385
386
387 class Aspects(Generic):
388 """This stream contains the posts filtered by
389 the specified aspect IDs. You can choose the aspect IDs with
390 the parameter `aspect_ids` which value should be
391 a comma seperated list of aspect IDs.
392 If the parameter is ommitted all aspects are assumed.
393 An example call would be `aspects.json?aspect_ids=23,5,42`
394 """
395 _location = 'aspects.json'
396
397 def getAspectID(self, aspect_name):
398 """Returns id of an aspect of given name.
399 Returns -1 if aspect is not found.
400
401 :param aspect_name: aspect name (must be spelled exactly as when created)
402 :type aspect_name: str
403 :returns: int
404 """
405 id = -1
406 aspects = self._connection.getUserData()['aspects']
407 for aspect in aspects:
408 if aspect['name'] == aspect_name: id = aspect['id']
409 return id
410
411 def filter(self, ids):
412 """Filters posts by given aspect ids.
413
414 :parameter ids: list of apsect ids
415 :type ids: list of integers
416 """
417 self._location = 'aspects.json?a_ids[]=' + '{}'.format('&a_ids[]='.join(ids))
418 self.fill() # this will create entirely new list of posts.
419
420 def add(self, aspect_name, visible=0):
421 """This function adds a new aspect.
422 Status code 422 is accepted because it is returned by D* when
423 you try to add aspect already present on your aspect list.
424
425 :param aspect_name: name of aspect to create
426 :param visible: whether the contacts in this aspect are visible to each other or not
427
428 :returns: Aspect() object of just created aspect
429 """
430 data = {'authenticity_token': repr(self._connection),
431 'aspect[name]': aspect_name,
432 'aspect[contacts_visible]': visible}
433
434 request = self._connection.post('aspects', data=data)
435 if request.status_code not in [200, 422]:
436 raise Exception('wrong status code: {0}'.format(request.status_code))
437
438 id = self.getAspectID(aspect_name)
439 return Aspect(self._connection, id)
440
441 def remove(self, id=-1, name=''):
442 """This method removes an aspect.
443 You can give it either id or name of the aspect.
444 When both are specified, id takes precedence over name.
445
446 Status code 500 is accepted because although the D* will
447 go nuts it will remove the aspect anyway.
448
449 :param aspect_id: id fo aspect to remove
450 :type aspect_id: int
451 :param name: name of aspect to remove
452 :type name: str
453 """
454 if id == -1 and name: id = self.getAspectID(name)
455 data = {'_method': 'delete',
456 'authenticity_token': repr(self._connection)}
457 request = self._connection.post('aspects/{0}'.format(id), data=data)
458 if request.status_code not in [200, 302, 500]:
459 raise Exception('wrong status code: {0}: cannot remove aspect'.format(request.status_code))
460
461
462 class Commented(Generic):
463 """This stream contains all posts
464 the user has made a comment on.
465 """
466 _location = 'commented.json'
467
468
469 class Liked(Generic):
470 """This stream contains all posts the user liked.
471 """
472 _location = 'liked.json'
473
474
475 class Mentions(Generic):
476 """This stream contains all posts
477 the user is mentioned in.
478 """
479 _location = 'mentions.json'
480
481
482 class FollowedTags(Generic):
483 """This stream contains all posts
484 containing tags the user is following.
485 """
486 _location = 'followed_tags.json'
487
488 def get(self):
489 """Returns list of followed tags.
490 """
491 return []
492
493 def remove(self, tag_id):
494 """Stop following a tag.
495
496 :param tag_id: tag id
497 :type tag_id: int
498 """
499 data = {'authenticity_token': self._connection.get_token()}
500 request = self._connection.delete('tag_followings/{0}'.format(tag_id), data=data)
501 if request.status_code != 404:
502 raise Exception('wrong status code: {0}'.format(request.status_code))
503
504 def add(self, tag_name):
505 """Follow new tag.
506 Error code 403 is accepted because pods respod with it when request
507 is sent to follow a tag that a user already follows.
508
509 :param tag_name: tag name
510 :type tag_name: str
511 :returns: int (response code)
512 """
513 data = {'name': tag_name,
514 'authenticity_token': repr(self._connection),
515 }
516 headers = {'content-type': 'application/json',
517 'x-csrf-token': repr(self._connection),
518 'accept': 'application/json'
519 }
520
521 request = self._connection.post('tag_followings', data=json.dumps(data), headers=headers)
522
523 if request.status_code not in [201, 403]:
524 raise Exception('wrong error code: {0}'.format(request.status_code))
525 return request.status_code
526
527
528 class Tag(Generic):
529 """This stream contains all posts containing a tag.
530 """
531 def __init__(self, connection, tag, fetch=True):
532 """
533 :param connection: Connection() object
534 :type connection: diaspy.connection.Connection
535 :param tag: tag name
536 :type tag: str
537 """
538 self._connection = connection
539 self._location = 'tags/{0}.json'.format(tag)
540 if fetch: self.fill()