b3e59c0924ce120fd9e32c661603014a0d8a26ef
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from werkzeug
.datastructures
import FileStorage
22 from .resources
import GOOD_JPG
23 from mediagoblin
.db
.base
import Session
24 from mediagoblin
.media_types
import sniff_media
25 from mediagoblin
.submit
.lib
import new_upload_entry
26 from mediagoblin
.submit
.task
import collect_garbage
27 from mediagoblin
.db
.models
import User
, MediaEntry
, MediaComment
28 from mediagoblin
.tests
.tools
import fixture_add_user
, fixture_media_entry
31 def test_404_for_non_existent(test_app
):
32 res
= test_app
.get('/does-not-exist/', expect_errors
=True)
33 assert res
.status_int
== 404
36 def test_user_deletes_other_comments(test_app
):
37 user_a
= fixture_add_user(u
"chris_a")
38 user_b
= fixture_add_user(u
"chris_b")
40 media_a
= fixture_media_entry(uploader
=user_a
.id, save
=False,
41 expunge
=False, fake_upload
=False)
42 media_b
= fixture_media_entry(uploader
=user_b
.id, save
=False,
43 expunge
=False, fake_upload
=False)
48 # Create all 4 possible comments:
49 for u_id
in (user_a
.id, user_b
.id):
50 for m_id
in (media_a
.id, media_b
.id):
52 cmt
.media_entry
= m_id
54 cmt
.content
= u
"Some Comment"
59 usr_cnt1
= User
.query
.count()
60 med_cnt1
= MediaEntry
.query
.count()
61 cmt_cnt1
= MediaComment
.query
.count()
63 User
.query
.get(user_a
.id).delete(commit
=False)
65 usr_cnt2
= User
.query
.count()
66 med_cnt2
= MediaEntry
.query
.count()
67 cmt_cnt2
= MediaComment
.query
.count()
70 assert usr_cnt2
== usr_cnt1
- 1
72 assert med_cnt2
== med_cnt1
- 1
73 # Three of four comments gone.
74 assert cmt_cnt2
== cmt_cnt1
- 3
76 User
.query
.get(user_b
.id).delete()
78 usr_cnt2
= User
.query
.count()
79 med_cnt2
= MediaEntry
.query
.count()
80 cmt_cnt2
= MediaComment
.query
.count()
83 assert usr_cnt2
== usr_cnt1
- 2
85 assert med_cnt2
== med_cnt1
- 2
87 assert cmt_cnt2
== cmt_cnt1
- 4
90 def test_media_deletes_broken_attachment(test_app
):
91 user_a
= fixture_add_user(u
"chris_a")
93 media
= fixture_media_entry(uploader
=user_a
.id, save
=False, expunge
=False)
94 media
.attachment_files
.append(dict(
96 filepath
=[u
"does", u
"not", u
"exist"],
101 MediaEntry
.query
.get(media
.id).delete()
102 User
.query
.get(user_a
.id).delete()
104 def test_garbage_collection_task(test_app
):
105 """ Test old media entry are removed by GC task """
106 user
= fixture_add_user()
108 # Create a media entry that's unprocessed and over an hour old.
110 now
= datetime
.datetime
.now(pytz
.UTC
)
111 file_data
= FileStorage(
112 stream
=open(GOOD_JPG
, "rb"),
113 filename
="mah_test.jpg",
114 content_type
="image/jpeg"
118 media_type
, media_manager
= sniff_media(file_data
, "mah_test.jpg")
119 entry
= new_upload_entry(user
)
121 entry
.title
= "Mah Image"
122 entry
.slug
= "slugy-slug-slug"
123 entry
.media_type
= 'image'
124 entry
.created
= now
- datetime
.timedelta(days
=2)
127 # Validate the model exists
128 assert MediaEntry
.query
.filter_by(id=entry_id
).first() is not None
130 # Call the garbage collection task
133 # Now validate the image has been deleted
134 assert MediaEntry
.query
.filter_by(id=entry_id
).first() is None