X-Git-Url: https://vcs.fsf.org/?a=blobdiff_plain;f=mediagoblin%2Ftests%2Ftest_storage.py;h=f6f1d18f532004b9e4280638fab10e1dfc70554c;hb=f339b76a4ee04571bd0a94d20a5d53d7f3d8d235;hp=cdcddf098130f7f91e9cf08ad3f535a175b8ace2;hpb=770c12be8d29ab0d6960bbc20ca5c58363da753d;p=mediagoblin.git diff --git a/mediagoblin/tests/test_storage.py b/mediagoblin/tests/test_storage.py index cdcddf09..f6f1d18f 100644 --- a/mediagoblin/tests/test_storage.py +++ b/mediagoblin/tests/test_storage.py @@ -1,5 +1,5 @@ -# GNU Mediagoblin -- federated, autonomous media hosting -# Copyright (C) 2011 Free Software Foundation, Inc +# GNU MediaGoblin -- federated, autonomous media hosting +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -15,9 +15,19 @@ # along with this program. If not, see . +import os +import tempfile + +import pytest +from werkzeug.utils import secure_filename + from mediagoblin import storage +################ +# Test utilities +################ + def test_clean_listy_filepath(): expected = [u'dir1', u'dir2', u'linooks.jpg'] assert storage.clean_listy_filepath( @@ -31,11 +41,281 @@ def test_clean_listy_filepath(): assert storage.clean_listy_filepath( ['../../../etc/', 'passwd']) == expected - try: - storage.clean_listy_filepath( - ['../../', 'linooks.jpg']) - except storage.InvalidFilepath: - # Yes, this error should be raise - pass + with pytest.raises(storage.InvalidFilepath): + storage.clean_listy_filepath(['../../', 'linooks.jpg']) + + +class FakeStorageSystem(): + def __init__(self, foobie, blech, **kwargs): + self.foobie = foobie + self.blech = blech + +class FakeRemoteStorage(storage.filestorage.BasicFileStorage): + # Theoretically despite this, all the methods should work but it + # should force copying to the workbench + local_storage = False + + def copy_local_to_storage(self, *args, **kwargs): + return storage.StorageInterface.copy_local_to_storage( + self, *args, **kwargs) + + +def test_storage_system_from_config(): + this_storage = storage.storage_system_from_config( + {'base_url': 'http://example.org/moodia/', + 'base_dir': '/tmp/', + 'garbage_arg': 'garbage_arg', + 'garbage_arg': 'trash'}) + assert this_storage.base_url == 'http://example.org/moodia/' + assert this_storage.base_dir == '/tmp/' + assert this_storage.__class__ is storage.filestorage.BasicFileStorage + + this_storage = storage.storage_system_from_config( + {'foobie': 'eiboof', + 'blech': 'hcelb', + 'garbage_arg': 'garbage_arg', + 'storage_class': + 'mediagoblin.tests.test_storage:FakeStorageSystem'}) + assert this_storage.foobie == 'eiboof' + assert this_storage.blech == 'hcelb' + assert unicode(this_storage.__class__) == \ + u'mediagoblin.tests.test_storage.FakeStorageSystem' + + +########################## +# Basic file storage tests +########################## + +def get_tmp_filestorage(mount_url=None, fake_remote=False): + tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage") + if fake_remote: + this_storage = FakeRemoteStorage(tmpdir, mount_url) else: - assert "success" == "failboat" + this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url) + return tmpdir, this_storage + + +def cleanup_storage(this_storage, tmpdir, *paths): + for p in paths: + while p: + assert this_storage.delete_dir(p) == True + p.pop(-1) + os.rmdir(tmpdir) + + +def test_basic_storage__resolve_filepath(): + tmpdir, this_storage = get_tmp_filestorage() + + result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg']) + assert result == os.path.join( + tmpdir, 'dir1/dir2/filename.jpg') + + result = this_storage._resolve_filepath(['../../etc/', 'passwd']) + assert result == os.path.join( + tmpdir, 'etc/passwd') + + pytest.raises( + storage.InvalidFilepath, + this_storage._resolve_filepath, + ['../../', 'etc', 'passwd']) + + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_file_exists(): + tmpdir, this_storage = get_tmp_filestorage() + + os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) + filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') + with open(filename, 'w') as ourfile: + ourfile.write("I'm having a lovely day!") + + assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt']) + assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) + assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) + + this_storage.delete_file(['dir1', 'dir2', 'filename.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_get_unique_filepath(): + tmpdir, this_storage = get_tmp_filestorage() + + # write something that exists + os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) + filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') + with open(filename, 'w') as ourfile: + ourfile.write("I'm having a lovely day!") + + # now we want something new, with the same name! + new_filepath = this_storage.get_unique_filepath( + ['dir1', 'dir2', 'filename.txt']) + assert new_filepath[:-1] == [u'dir1', u'dir2'] + + new_filename = new_filepath[-1] + assert new_filename.endswith('filename.txt') + assert len(new_filename) > len('filename.txt') + assert new_filename == secure_filename(new_filename) + + os.remove(filename) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_get_file(): + tmpdir, this_storage = get_tmp_filestorage() + + # Write a brand new file + filepath = ['dir1', 'dir2', 'ourfile.txt'] + + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('First file') + with this_storage.get_file(filepath, 'r') as our_file: + assert our_file.read() == 'First file' + assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: + assert our_file.read() == 'First file' + + # Write to the same path but try to get a unique file. + new_filepath = this_storage.get_unique_filepath(filepath) + assert not os.path.exists(os.path.join(tmpdir, *new_filepath)) + + with this_storage.get_file(new_filepath, 'w') as our_file: + our_file.write('Second file') + with this_storage.get_file(new_filepath, 'r') as our_file: + assert our_file.read() == 'Second file' + assert os.path.exists(os.path.join(tmpdir, *new_filepath)) + with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file: + assert our_file.read() == 'Second file' + + # Read from an existing file + manually_written_file = os.makedirs( + os.path.join(tmpdir, 'testydir')) + with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: + testyfile.write('testy file! so testy.') + + with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: + assert testyfile.read() == 'testy file! so testy.' + + this_storage.delete_file(filepath) + this_storage.delete_file(new_filepath) + this_storage.delete_file(['testydir', 'testyfile.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir']) + + +def test_basic_storage_delete_file(): + tmpdir, this_storage = get_tmp_filestorage() + + assert not os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + filepath = ['dir1', 'dir2', 'ourfile.txt'] + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('Testing this file') + + assert os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + assert this_storage.delete_dir(['dir1', 'dir2']) == False + this_storage.delete_file(filepath) + assert this_storage.delete_dir(['dir1', 'dir2']) == True + + assert not os.path.exists( + os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) + + cleanup_storage(this_storage, tmpdir, ['dir1']) + + +def test_basic_storage_url_for_file(): + # Not supplying a base_url should actually just bork. + tmpdir, this_storage = get_tmp_filestorage() + pytest.raises( + storage.NoWebServing, + this_storage.file_url, + ['dir1', 'dir2', 'filename.txt']) + cleanup_storage(this_storage, tmpdir) + + # base_url without domain + tmpdir, this_storage = get_tmp_filestorage('/media/') + result = this_storage.file_url( + ['dir1', 'dir2', 'filename.txt']) + expected = '/media/dir1/dir2/filename.txt' + assert result == expected + cleanup_storage(this_storage, tmpdir) + + # base_url with domain + tmpdir, this_storage = get_tmp_filestorage( + 'http://media.example.org/ourmedia/') + result = this_storage.file_url( + ['dir1', 'dir2', 'filename.txt']) + expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' + assert result == expected + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_get_local_path(): + tmpdir, this_storage = get_tmp_filestorage() + + result = this_storage.get_local_path( + ['dir1', 'dir2', 'filename.txt']) + + expected = os.path.join( + tmpdir, 'dir1/dir2/filename.txt') + + assert result == expected + + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_is_local(): + tmpdir, this_storage = get_tmp_filestorage() + assert this_storage.local_storage is True + cleanup_storage(this_storage, tmpdir) + + +def test_basic_storage_copy_locally(): + tmpdir, this_storage = get_tmp_filestorage() + + dest_tmpdir = tempfile.mkdtemp() + + filepath = ['dir1', 'dir2', 'ourfile.txt'] + with this_storage.get_file(filepath, 'w') as our_file: + our_file.write('Testing this file') + + new_file_dest = os.path.join(dest_tmpdir, 'file2.txt') + + this_storage.copy_locally(filepath, new_file_dest) + this_storage.delete_file(filepath) + + assert file(new_file_dest).read() == 'Testing this file' + + os.remove(new_file_dest) + os.rmdir(dest_tmpdir) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def _test_copy_local_to_storage_works(tmpdir, this_storage): + local_filename = tempfile.mktemp() + with file(local_filename, 'w') as tmpfile: + tmpfile.write('haha') + + this_storage.copy_local_to_storage( + local_filename, ['dir1', 'dir2', 'copiedto.txt']) + + os.remove(local_filename) + + assert file( + os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'), + 'r').read() == 'haha' + + this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt']) + cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) + + +def test_basic_storage_copy_local_to_storage(): + tmpdir, this_storage = get_tmp_filestorage() + _test_copy_local_to_storage_works(tmpdir, this_storage) + + +def test_general_storage_copy_local_to_storage(): + tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) + _test_copy_local_to_storage_works(tmpdir, this_storage)