-# GNU Mediagoblin -- federated, autonomous media hosting
-# Copyright (C) 2011 Free Software Foundation, Inc
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import tempfile
+
+import pytest
+from werkzeug.utils import secure_filename
+
from mediagoblin import storage
+################
+# Test utilities
+################
+
def test_clean_listy_filepath():
expected = [u'dir1', u'dir2', u'linooks.jpg']
assert storage.clean_listy_filepath(
expected = [u'etc', u'passwd']
assert storage.clean_listy_filepath(
['../../../etc/', 'passwd']) == expected
+
+ with pytest.raises(storage.InvalidFilepath):
+ storage.clean_listy_filepath(['../../', 'linooks.jpg'])
+
+
+class FakeStorageSystem():
+ def __init__(self, foobie, blech, **kwargs):
+ self.foobie = foobie
+ self.blech = blech
+
+class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
+ # Theoretically despite this, all the methods should work but it
+ # should force copying to the workbench
+ local_storage = False
+
+ def copy_local_to_storage(self, *args, **kwargs):
+ return storage.StorageInterface.copy_local_to_storage(
+ self, *args, **kwargs)
+
+
+def test_storage_system_from_config():
+ this_storage = storage.storage_system_from_config(
+ {'base_url': 'http://example.org/moodia/',
+ 'base_dir': '/tmp/',
+ 'garbage_arg': 'garbage_arg',
+ 'garbage_arg': 'trash'})
+ assert this_storage.base_url == 'http://example.org/moodia/'
+ assert this_storage.base_dir == '/tmp/'
+ assert this_storage.__class__ is storage.filestorage.BasicFileStorage
+
+ this_storage = storage.storage_system_from_config(
+ {'foobie': 'eiboof',
+ 'blech': 'hcelb',
+ 'garbage_arg': 'garbage_arg',
+ 'storage_class':
+ 'mediagoblin.tests.test_storage:FakeStorageSystem'})
+ assert this_storage.foobie == 'eiboof'
+ assert this_storage.blech == 'hcelb'
+ assert unicode(this_storage.__class__) == \
+ u'mediagoblin.tests.test_storage.FakeStorageSystem'
+
+
+##########################
+# Basic file storage tests
+##########################
+
+def get_tmp_filestorage(mount_url=None, fake_remote=False):
+ tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage")
+ if fake_remote:
+ this_storage = FakeRemoteStorage(tmpdir, mount_url)
+ else:
+ this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
+ return tmpdir, this_storage
+
+
+def cleanup_storage(this_storage, tmpdir, *paths):
+ for p in paths:
+ while p:
+ assert this_storage.delete_dir(p) == True
+ p.pop(-1)
+ os.rmdir(tmpdir)
+
+
+def test_basic_storage__resolve_filepath():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
+ assert result == os.path.join(
+ tmpdir, 'dir1/dir2/filename.jpg')
+
+ result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
+ assert result == os.path.join(
+ tmpdir, 'etc/passwd')
+
+ pytest.raises(
+ storage.InvalidFilepath,
+ this_storage._resolve_filepath,
+ ['../../', 'etc', 'passwd'])
+
+ cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_file_exists():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
+ filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
+ with open(filename, 'w') as ourfile:
+ ourfile.write("I'm having a lovely day!")
+
+ assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
+ assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
+ assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
+
+ this_storage.delete_file(['dir1', 'dir2', 'filename.txt'])
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_get_unique_filepath():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ # write something that exists
+ os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
+ filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
+ with open(filename, 'w') as ourfile:
+ ourfile.write("I'm having a lovely day!")
+
+ # now we want something new, with the same name!
+ new_filepath = this_storage.get_unique_filepath(
+ ['dir1', 'dir2', 'filename.txt'])
+ assert new_filepath[:-1] == [u'dir1', u'dir2']
+
+ new_filename = new_filepath[-1]
+ assert new_filename.endswith('filename.txt')
+ assert len(new_filename) > len('filename.txt')
+ assert new_filename == secure_filename(new_filename)
+
+ os.remove(filename)
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_get_file():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ # Write a brand new file
+ filepath = ['dir1', 'dir2', 'ourfile.txt']
+
+ with this_storage.get_file(filepath, 'w') as our_file:
+ our_file.write('First file')
+ with this_storage.get_file(filepath, 'r') as our_file:
+ assert our_file.read() == 'First file'
+ assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+ with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
+ assert our_file.read() == 'First file'
+
+ # Write to the same path but try to get a unique file.
+ new_filepath = this_storage.get_unique_filepath(filepath)
+ assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
+
+ with this_storage.get_file(new_filepath, 'w') as our_file:
+ our_file.write('Second file')
+ with this_storage.get_file(new_filepath, 'r') as our_file:
+ assert our_file.read() == 'Second file'
+ assert os.path.exists(os.path.join(tmpdir, *new_filepath))
+ with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
+ assert our_file.read() == 'Second file'
+
+ # Read from an existing file
+ manually_written_file = os.makedirs(
+ os.path.join(tmpdir, 'testydir'))
+ with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
+ testyfile.write('testy file! so testy.')
+
+ with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
+ assert testyfile.read() == 'testy file! so testy.'
+
+ this_storage.delete_file(filepath)
+ this_storage.delete_file(new_filepath)
+ this_storage.delete_file(['testydir', 'testyfile.txt'])
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir'])
+
+
+def test_basic_storage_delete_file():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ assert not os.path.exists(
+ os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+ filepath = ['dir1', 'dir2', 'ourfile.txt']
+ with this_storage.get_file(filepath, 'w') as our_file:
+ our_file.write('Testing this file')
+
+ assert os.path.exists(
+ os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+ assert this_storage.delete_dir(['dir1', 'dir2']) == False
+ this_storage.delete_file(filepath)
+ assert this_storage.delete_dir(['dir1', 'dir2']) == True
+
+ assert not os.path.exists(
+ os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+ cleanup_storage(this_storage, tmpdir, ['dir1'])
+
+
+def test_basic_storage_url_for_file():
+ # Not supplying a base_url should actually just bork.
+ tmpdir, this_storage = get_tmp_filestorage()
+ pytest.raises(
+ storage.NoWebServing,
+ this_storage.file_url,
+ ['dir1', 'dir2', 'filename.txt'])
+ cleanup_storage(this_storage, tmpdir)
+
+ # base_url without domain
+ tmpdir, this_storage = get_tmp_filestorage('/media/')
+ result = this_storage.file_url(
+ ['dir1', 'dir2', 'filename.txt'])
+ expected = '/media/dir1/dir2/filename.txt'
+ assert result == expected
+ cleanup_storage(this_storage, tmpdir)
+
+ # base_url with domain
+ tmpdir, this_storage = get_tmp_filestorage(
+ 'http://media.example.org/ourmedia/')
+ result = this_storage.file_url(
+ ['dir1', 'dir2', 'filename.txt'])
+ expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
+ assert result == expected
+ cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_get_local_path():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ result = this_storage.get_local_path(
+ ['dir1', 'dir2', 'filename.txt'])
+
+ expected = os.path.join(
+ tmpdir, 'dir1/dir2/filename.txt')
+
+ assert result == expected
+
+ cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_is_local():
+ tmpdir, this_storage = get_tmp_filestorage()
+ assert this_storage.local_storage is True
+ cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_copy_locally():
+ tmpdir, this_storage = get_tmp_filestorage()
+
+ dest_tmpdir = tempfile.mkdtemp()
+
+ filepath = ['dir1', 'dir2', 'ourfile.txt']
+ with this_storage.get_file(filepath, 'w') as our_file:
+ our_file.write('Testing this file')
+
+ new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
+
+ this_storage.copy_locally(filepath, new_file_dest)
+ this_storage.delete_file(filepath)
+
+ assert file(new_file_dest).read() == 'Testing this file'
+
+ os.remove(new_file_dest)
+ os.rmdir(dest_tmpdir)
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def _test_copy_local_to_storage_works(tmpdir, this_storage):
+ local_filename = tempfile.mktemp()
+ with file(local_filename, 'w') as tmpfile:
+ tmpfile.write('haha')
+
+ this_storage.copy_local_to_storage(
+ local_filename, ['dir1', 'dir2', 'copiedto.txt'])
+
+ os.remove(local_filename)
+
+ assert file(
+ os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
+ 'r').read() == 'haha'
+
+ this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt'])
+ cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_copy_local_to_storage():
+ tmpdir, this_storage = get_tmp_filestorage()
+ _test_copy_local_to_storage_works(tmpdir, this_storage)
+
+
+def test_general_storage_copy_local_to_storage():
+ tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
+ _test_copy_local_to_storage_works(tmpdir, this_storage)