X-Git-Url: https://vcs.fsf.org/?a=blobdiff_plain;f=mediagoblin%2Ftests%2Ftest_misc.py;h=b3e59c0924ce120fd9e32c661603014a0d8a26ef;hb=c7c26b17406cdcb03de4d2579f4851d2513d11e1;hp=755d863fbf1a14f3601c11f8f3852a8de87cd215;hpb=b69dd853049fd7560542b47bac73675b33c78364;p=mediagoblin.git diff --git a/mediagoblin/tests/test_misc.py b/mediagoblin/tests/test_misc.py index 755d863f..b3e59c09 100644 --- a/mediagoblin/tests/test_misc.py +++ b/mediagoblin/tests/test_misc.py @@ -14,7 +14,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import pytz +import datetime + +from werkzeug.datastructures import FileStorage + +from .resources import GOOD_JPG from mediagoblin.db.base import Session +from mediagoblin.media_types import sniff_media +from mediagoblin.submit.lib import new_upload_entry +from mediagoblin.submit.task import collect_garbage from mediagoblin.db.models import User, MediaEntry, MediaComment from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry @@ -28,8 +37,10 @@ def test_user_deletes_other_comments(test_app): user_a = fixture_add_user(u"chris_a") user_b = fixture_add_user(u"chris_b") - media_a = fixture_media_entry(uploader=user_a.id, save=False) - media_b = fixture_media_entry(uploader=user_b.id, save=False) + media_a = fixture_media_entry(uploader=user_a.id, save=False, + expunge=False, fake_upload=False) + media_b = fixture_media_entry(uploader=user_b.id, save=False, + expunge=False, fake_upload=False) Session.add(media_a) Session.add(media_b) Session.flush() @@ -79,7 +90,7 @@ def test_user_deletes_other_comments(test_app): def test_media_deletes_broken_attachment(test_app): user_a = fixture_add_user(u"chris_a") - media = fixture_media_entry(uploader=user_a.id, save=False) + media = fixture_media_entry(uploader=user_a.id, save=False, expunge=False) media.attachment_files.append(dict( name=u"some name", filepath=[u"does", u"not", u"exist"], @@ -89,3 +100,35 @@ def test_media_deletes_broken_attachment(test_app): MediaEntry.query.get(media.id).delete() User.query.get(user_a.id).delete() + +def test_garbage_collection_task(test_app): + """ Test old media entry are removed by GC task """ + user = fixture_add_user() + + # Create a media entry that's unprocessed and over an hour old. + entry_id = 72 + now = datetime.datetime.now(pytz.UTC) + file_data = FileStorage( + stream=open(GOOD_JPG, "rb"), + filename="mah_test.jpg", + content_type="image/jpeg" + ) + + # Find media manager + media_type, media_manager = sniff_media(file_data, "mah_test.jpg") + entry = new_upload_entry(user) + entry.id = entry_id + entry.title = "Mah Image" + entry.slug = "slugy-slug-slug" + entry.media_type = 'image' + entry.created = now - datetime.timedelta(days=2) + entry.save() + + # Validate the model exists + assert MediaEntry.query.filter_by(id=entry_id).first() is not None + + # Call the garbage collection task + collect_garbage() + + # Now validate the image has been deleted + assert MediaEntry.query.filter_by(id=entry_id).first() is None