Merge remote-tracking branch 'refs/remotes/tsyesika/master'
[mediagoblin.git] / mediagoblin / tests / test_storage.py
index b7da467c8c99769bdcbda03c16e509993a236100..f6f1d18f532004b9e4280638fab10e1dfc70554c 100644 (file)
@@ -1,5 +1,5 @@
-# GNU Mediagoblin -- federated, autonomous media hosting
-# Copyright (C) 2011 Free Software Foundation, Inc
+# GNU MediaGoblin -- federated, autonomous media hosting
+# Copyright (C) 2011, 2012 MediaGoblin contributors.  See AUTHORS.
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published by
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
+import os
+import tempfile
+
+import pytest
+from werkzeug.utils import secure_filename
+
 from mediagoblin import storage
 
 
+################
+# Test utilities
+################
+
 def test_clean_listy_filepath():
     expected = [u'dir1', u'dir2', u'linooks.jpg']
     assert storage.clean_listy_filepath(
@@ -30,3 +40,282 @@ def test_clean_listy_filepath():
     expected = [u'etc', u'passwd']
     assert storage.clean_listy_filepath(
         ['../../../etc/', 'passwd']) == expected
+
+    with pytest.raises(storage.InvalidFilepath):
+        storage.clean_listy_filepath(['../../', 'linooks.jpg'])
+
+
+class FakeStorageSystem():
+    def __init__(self, foobie, blech, **kwargs):
+        self.foobie = foobie
+        self.blech = blech
+
+class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
+    # Theoretically despite this, all the methods should work but it
+    # should force copying to the workbench
+    local_storage = False
+
+    def copy_local_to_storage(self, *args, **kwargs):
+        return storage.StorageInterface.copy_local_to_storage(
+            self, *args, **kwargs)
+
+
+def test_storage_system_from_config():
+    this_storage = storage.storage_system_from_config(
+        {'base_url': 'http://example.org/moodia/',
+         'base_dir': '/tmp/',
+         'garbage_arg': 'garbage_arg',
+         'garbage_arg': 'trash'})
+    assert this_storage.base_url == 'http://example.org/moodia/'
+    assert this_storage.base_dir == '/tmp/'
+    assert this_storage.__class__ is storage.filestorage.BasicFileStorage
+
+    this_storage = storage.storage_system_from_config(
+        {'foobie': 'eiboof',
+         'blech': 'hcelb',
+         'garbage_arg': 'garbage_arg',
+         'storage_class':
+             'mediagoblin.tests.test_storage:FakeStorageSystem'})
+    assert this_storage.foobie == 'eiboof'
+    assert this_storage.blech == 'hcelb'
+    assert unicode(this_storage.__class__) == \
+        u'mediagoblin.tests.test_storage.FakeStorageSystem'
+
+
+##########################
+# Basic file storage tests
+##########################
+
+def get_tmp_filestorage(mount_url=None, fake_remote=False):
+    tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage")
+    if fake_remote:
+        this_storage = FakeRemoteStorage(tmpdir, mount_url)
+    else:
+        this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
+    return tmpdir, this_storage
+
+
+def cleanup_storage(this_storage, tmpdir, *paths):
+    for p in paths:
+        while p:
+            assert this_storage.delete_dir(p) == True
+            p.pop(-1)
+    os.rmdir(tmpdir)
+
+
+def test_basic_storage__resolve_filepath():
+    tmpdir, this_storage = get_tmp_filestorage()
+
+    result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
+    assert result == os.path.join(
+        tmpdir, 'dir1/dir2/filename.jpg')
+
+    result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
+    assert result == os.path.join(
+        tmpdir, 'etc/passwd')
+
+    pytest.raises(
+        storage.InvalidFilepath,
+        this_storage._resolve_filepath,
+        ['../../', 'etc', 'passwd'])
+
+    cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_file_exists():
+    tmpdir, this_storage = get_tmp_filestorage()
+
+    os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
+    filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
+    with open(filename, 'w') as ourfile:
+        ourfile.write("I'm having a lovely day!")
+
+    assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
+    assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
+    assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
+
+    this_storage.delete_file(['dir1', 'dir2', 'filename.txt'])
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_get_unique_filepath():
+    tmpdir, this_storage = get_tmp_filestorage()
+    
+    # write something that exists
+    os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
+    filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
+    with open(filename, 'w') as ourfile:
+        ourfile.write("I'm having a lovely day!")
+
+    # now we want something new, with the same name!
+    new_filepath = this_storage.get_unique_filepath(
+        ['dir1', 'dir2', 'filename.txt'])
+    assert new_filepath[:-1] == [u'dir1', u'dir2']
+
+    new_filename = new_filepath[-1]
+    assert new_filename.endswith('filename.txt')
+    assert len(new_filename) > len('filename.txt')
+    assert new_filename == secure_filename(new_filename)
+
+    os.remove(filename)
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_get_file():
+    tmpdir, this_storage = get_tmp_filestorage()
+
+    # Write a brand new file
+    filepath = ['dir1', 'dir2', 'ourfile.txt']
+
+    with this_storage.get_file(filepath, 'w') as our_file:
+        our_file.write('First file')
+    with this_storage.get_file(filepath, 'r') as our_file:
+        assert our_file.read() == 'First file'
+    assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+    with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
+        assert our_file.read() == 'First file'
+
+    # Write to the same path but try to get a unique file.
+    new_filepath = this_storage.get_unique_filepath(filepath)
+    assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
+
+    with this_storage.get_file(new_filepath, 'w') as our_file:
+        our_file.write('Second file')
+    with this_storage.get_file(new_filepath, 'r') as our_file:
+        assert our_file.read() == 'Second file'
+    assert os.path.exists(os.path.join(tmpdir, *new_filepath))
+    with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
+        assert our_file.read() == 'Second file'
+
+    # Read from an existing file
+    manually_written_file = os.makedirs(
+        os.path.join(tmpdir, 'testydir'))
+    with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
+        testyfile.write('testy file!  so testy.')
+
+    with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
+        assert testyfile.read() == 'testy file!  so testy.'
+
+    this_storage.delete_file(filepath)
+    this_storage.delete_file(new_filepath)
+    this_storage.delete_file(['testydir', 'testyfile.txt'])
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir'])
+
+
+def test_basic_storage_delete_file():
+    tmpdir, this_storage = get_tmp_filestorage()
+
+    assert not os.path.exists(
+        os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+    filepath = ['dir1', 'dir2', 'ourfile.txt']
+    with this_storage.get_file(filepath, 'w') as our_file:
+        our_file.write('Testing this file')
+
+    assert os.path.exists(
+        os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+    assert this_storage.delete_dir(['dir1', 'dir2']) == False
+    this_storage.delete_file(filepath)
+    assert this_storage.delete_dir(['dir1', 'dir2']) == True
+    
+    assert not os.path.exists(
+        os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
+
+    cleanup_storage(this_storage, tmpdir, ['dir1'])
+
+
+def test_basic_storage_url_for_file():
+    # Not supplying a base_url should actually just bork.
+    tmpdir, this_storage = get_tmp_filestorage()
+    pytest.raises(
+        storage.NoWebServing,
+        this_storage.file_url,
+        ['dir1', 'dir2', 'filename.txt'])
+    cleanup_storage(this_storage, tmpdir)
+
+    # base_url without domain
+    tmpdir, this_storage = get_tmp_filestorage('/media/')
+    result = this_storage.file_url(
+        ['dir1', 'dir2', 'filename.txt'])
+    expected = '/media/dir1/dir2/filename.txt'
+    assert result == expected
+    cleanup_storage(this_storage, tmpdir)
+
+    # base_url with domain
+    tmpdir, this_storage = get_tmp_filestorage(
+        'http://media.example.org/ourmedia/')
+    result = this_storage.file_url(
+        ['dir1', 'dir2', 'filename.txt'])
+    expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
+    assert result == expected
+    cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_get_local_path():
+    tmpdir, this_storage = get_tmp_filestorage()
+    
+    result = this_storage.get_local_path(
+        ['dir1', 'dir2', 'filename.txt'])
+
+    expected = os.path.join(
+        tmpdir, 'dir1/dir2/filename.txt')
+
+    assert result == expected
+
+    cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_is_local():
+    tmpdir, this_storage = get_tmp_filestorage()
+    assert this_storage.local_storage is True
+    cleanup_storage(this_storage, tmpdir)
+
+
+def test_basic_storage_copy_locally():
+    tmpdir, this_storage = get_tmp_filestorage()
+
+    dest_tmpdir = tempfile.mkdtemp()
+
+    filepath = ['dir1', 'dir2', 'ourfile.txt']
+    with this_storage.get_file(filepath, 'w') as our_file:
+        our_file.write('Testing this file')
+
+    new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
+
+    this_storage.copy_locally(filepath, new_file_dest)
+    this_storage.delete_file(filepath)
+    
+    assert file(new_file_dest).read() == 'Testing this file'
+
+    os.remove(new_file_dest)
+    os.rmdir(dest_tmpdir)
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def _test_copy_local_to_storage_works(tmpdir, this_storage):
+    local_filename = tempfile.mktemp()
+    with file(local_filename, 'w') as tmpfile:
+        tmpfile.write('haha')
+
+    this_storage.copy_local_to_storage(
+        local_filename, ['dir1', 'dir2', 'copiedto.txt'])
+
+    os.remove(local_filename)
+
+    assert file(
+        os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
+        'r').read() == 'haha'
+
+    this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt'])
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_copy_local_to_storage():
+    tmpdir, this_storage = get_tmp_filestorage()
+    _test_copy_local_to_storage_works(tmpdir, this_storage)
+
+
+def test_general_storage_copy_local_to_storage():
+    tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
+    _test_copy_local_to_storage_works(tmpdir, this_storage)