Commit | Line | Data |
---|---|---|
968dd9e7 | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
cf29e8a8 | 2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
968dd9e7 E |
3 | # |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
5e5d4458 JT |
17 | import pytz |
18 | import datetime | |
19 | ||
20 | from werkzeug.datastructures import FileStorage | |
21 | ||
22 | from .resources import GOOD_JPG | |
3f931680 | 23 | from mediagoblin.db.base import Session |
5e5d4458 JT |
24 | from mediagoblin.media_types import sniff_media |
25 | from mediagoblin.submit.lib import new_upload_entry | |
26 | from mediagoblin.submit.task import collect_garbage | |
e9b4e500 | 27 | from mediagoblin.db.models import User, MediaEntry, MediaComment |
5c2ece74 | 28 | from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry |
e9b4e500 | 29 | |
968dd9e7 | 30 | |
5c2ece74 | 31 | def test_404_for_non_existent(test_app): |
b97144dc | 32 | res = test_app.get('/does-not-exist/', expect_errors=True) |
7d503a89 | 33 | assert res.status_int == 404 |
e9b4e500 E |
34 | |
35 | ||
5c2ece74 | 36 | def test_user_deletes_other_comments(test_app): |
e9b4e500 E |
37 | user_a = fixture_add_user(u"chris_a") |
38 | user_b = fixture_add_user(u"chris_b") | |
39 | ||
2d7b6bde | 40 | media_a = fixture_media_entry(uploader=user_a.id, save=False, |
56d13467 | 41 | expunge=False, fake_upload=False) |
2d7b6bde | 42 | media_b = fixture_media_entry(uploader=user_b.id, save=False, |
56d13467 | 43 | expunge=False, fake_upload=False) |
3f931680 E |
44 | Session.add(media_a) |
45 | Session.add(media_b) | |
46 | Session.flush() | |
e9b4e500 E |
47 | |
48 | # Create all 4 possible comments: | |
49 | for u_id in (user_a.id, user_b.id): | |
50 | for m_id in (media_a.id, media_b.id): | |
51 | cmt = MediaComment() | |
52 | cmt.media_entry = m_id | |
53 | cmt.author = u_id | |
54 | cmt.content = u"Some Comment" | |
3f931680 E |
55 | Session.add(cmt) |
56 | ||
57 | Session.flush() | |
e9b4e500 E |
58 | |
59 | usr_cnt1 = User.query.count() | |
60 | med_cnt1 = MediaEntry.query.count() | |
61 | cmt_cnt1 = MediaComment.query.count() | |
62 | ||
3f931680 | 63 | User.query.get(user_a.id).delete(commit=False) |
e9b4e500 E |
64 | |
65 | usr_cnt2 = User.query.count() | |
66 | med_cnt2 = MediaEntry.query.count() | |
67 | cmt_cnt2 = MediaComment.query.count() | |
68 | ||
69 | # One user deleted | |
7d503a89 | 70 | assert usr_cnt2 == usr_cnt1 - 1 |
e9b4e500 | 71 | # One media gone |
7d503a89 | 72 | assert med_cnt2 == med_cnt1 - 1 |
e9b4e500 | 73 | # Three of four comments gone. |
7d503a89 | 74 | assert cmt_cnt2 == cmt_cnt1 - 3 |
e9b4e500 E |
75 | |
76 | User.query.get(user_b.id).delete() | |
77 | ||
78 | usr_cnt2 = User.query.count() | |
79 | med_cnt2 = MediaEntry.query.count() | |
80 | cmt_cnt2 = MediaComment.query.count() | |
81 | ||
82 | # All users gone | |
7d503a89 | 83 | assert usr_cnt2 == usr_cnt1 - 2 |
e9b4e500 | 84 | # All media gone |
7d503a89 | 85 | assert med_cnt2 == med_cnt1 - 2 |
e9b4e500 | 86 | # All comments gone |
7d503a89 | 87 | assert cmt_cnt2 == cmt_cnt1 - 4 |
df5b142a E |
88 | |
89 | ||
5c2ece74 | 90 | def test_media_deletes_broken_attachment(test_app): |
df5b142a E |
91 | user_a = fixture_add_user(u"chris_a") |
92 | ||
2d7b6bde | 93 | media = fixture_media_entry(uploader=user_a.id, save=False, expunge=False) |
df5b142a E |
94 | media.attachment_files.append(dict( |
95 | name=u"some name", | |
96 | filepath=[u"does", u"not", u"exist"], | |
97 | )) | |
98 | Session.add(media) | |
99 | Session.flush() | |
100 | ||
101 | MediaEntry.query.get(media.id).delete() | |
102 | User.query.get(user_a.id).delete() | |
5e5d4458 JT |
103 | |
104 | def test_garbage_collection_task(test_app): | |
105 | """ Test old media entry are removed by GC task """ | |
106 | user = fixture_add_user() | |
107 | ||
108 | # Create a media entry that's unprocessed and over an hour old. | |
109 | entry_id = 72 | |
110 | now = datetime.datetime.now(pytz.UTC) | |
111 | file_data = FileStorage( | |
112 | stream=open(GOOD_JPG, "rb"), | |
113 | filename="mah_test.jpg", | |
114 | content_type="image/jpeg" | |
115 | ) | |
116 | ||
117 | # Find media manager | |
118 | media_type, media_manager = sniff_media(file_data, "mah_test.jpg") | |
119 | entry = new_upload_entry(user) | |
120 | entry.id = entry_id | |
121 | entry.title = "Mah Image" | |
122 | entry.slug = "slugy-slug-slug" | |
123 | entry.media_type = 'image' | |
124 | entry.created = now - datetime.timedelta(days=2) | |
125 | entry.save() | |
126 | ||
127 | # Validate the model exists | |
128 | assert MediaEntry.query.filter_by(id=entry_id).first() is not None | |
129 | ||
130 | # Call the garbage collection task | |
131 | collect_garbage() | |
132 | ||
133 | # Now validate the image has been deleted | |
134 | assert MediaEntry.query.filter_by(id=entry_id).first() is None |