Merge remote-tracking branch 'refs/remotes/tsyesika/master'
[mediagoblin.git] / mediagoblin / tests / test_storage.py
index 45cb35c1b85ee61d0d5873b93fdcb9b55a0e92a8..f6f1d18f532004b9e4280638fab10e1dfc70554c 100644 (file)
@@ -1,5 +1,5 @@
 # GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011 Free Software Foundation, Inc
+# Copyright (C) 2011, 2012 MediaGoblin contributors.  See AUTHORS.
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published by
@@ -18,7 +18,7 @@
 import os
 import tempfile
 
-from nose.tools import assert_raises
+import pytest
 from werkzeug.utils import secure_filename
 
 from mediagoblin import storage
@@ -41,10 +41,8 @@ def test_clean_listy_filepath():
     assert storage.clean_listy_filepath(
         ['../../../etc/', 'passwd']) == expected
 
-    assert_raises(
-        storage.InvalidFilepath,
-        storage.clean_listy_filepath,
-        ['../../', 'linooks.jpg'])
+    with pytest.raises(storage.InvalidFilepath):
+        storage.clean_listy_filepath(['../../', 'linooks.jpg'])
 
 
 class FakeStorageSystem():
@@ -52,11 +50,15 @@ class FakeStorageSystem():
         self.foobie = foobie
         self.blech = blech
 
-class FakeRemoteStorage(storage.BasicFileStorage):
+class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
     # Theoretically despite this, all the methods should work but it
     # should force copying to the workbench
     local_storage = False
 
+    def copy_local_to_storage(self, *args, **kwargs):
+        return storage.StorageInterface.copy_local_to_storage(
+            self, *args, **kwargs)
+
 
 def test_storage_system_from_config():
     this_storage = storage.storage_system_from_config(
@@ -66,7 +68,7 @@ def test_storage_system_from_config():
          'garbage_arg': 'trash'})
     assert this_storage.base_url == 'http://example.org/moodia/'
     assert this_storage.base_dir == '/tmp/'
-    assert this_storage.__class__ is storage.BasicFileStorage
+    assert this_storage.__class__ is storage.filestorage.BasicFileStorage
 
     this_storage = storage.storage_system_from_config(
         {'foobie': 'eiboof',
@@ -76,7 +78,8 @@ def test_storage_system_from_config():
              'mediagoblin.tests.test_storage:FakeStorageSystem'})
     assert this_storage.foobie == 'eiboof'
     assert this_storage.blech == 'hcelb'
-    assert this_storage.__class__ is FakeStorageSystem
+    assert unicode(this_storage.__class__) == \
+        u'mediagoblin.tests.test_storage.FakeStorageSystem'
 
 
 ##########################
@@ -84,14 +87,22 @@ def test_storage_system_from_config():
 ##########################
 
 def get_tmp_filestorage(mount_url=None, fake_remote=False):
-    tmpdir = tempfile.mkdtemp()
+    tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage")
     if fake_remote:
         this_storage = FakeRemoteStorage(tmpdir, mount_url)
     else:
-        this_storage = storage.BasicFileStorage(tmpdir, mount_url)
+        this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
     return tmpdir, this_storage
 
 
+def cleanup_storage(this_storage, tmpdir, *paths):
+    for p in paths:
+        while p:
+            assert this_storage.delete_dir(p) == True
+            p.pop(-1)
+    os.rmdir(tmpdir)
+
+
 def test_basic_storage__resolve_filepath():
     tmpdir, this_storage = get_tmp_filestorage()
 
@@ -103,11 +114,13 @@ def test_basic_storage__resolve_filepath():
     assert result == os.path.join(
         tmpdir, 'etc/passwd')
 
-    assert_raises(
+    pytest.raises(
         storage.InvalidFilepath,
         this_storage._resolve_filepath,
         ['../../', 'etc', 'passwd'])
 
+    cleanup_storage(this_storage, tmpdir)
+
 
 def test_basic_storage_file_exists():
     tmpdir, this_storage = get_tmp_filestorage()
@@ -121,6 +134,9 @@ def test_basic_storage_file_exists():
     assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
     assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
 
+    this_storage.delete_file(['dir1', 'dir2', 'filename.txt'])
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
 
 def test_basic_storage_get_unique_filepath():
     tmpdir, this_storage = get_tmp_filestorage()
@@ -141,6 +157,9 @@ def test_basic_storage_get_unique_filepath():
     assert len(new_filename) > len('filename.txt')
     assert new_filename == secure_filename(new_filename)
 
+    os.remove(filename)
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
 
 def test_basic_storage_get_file():
     tmpdir, this_storage = get_tmp_filestorage()
@@ -177,6 +196,11 @@ def test_basic_storage_get_file():
     with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
         assert testyfile.read() == 'testy file!  so testy.'
 
+    this_storage.delete_file(filepath)
+    this_storage.delete_file(new_filepath)
+    this_storage.delete_file(['testydir', 'testyfile.txt'])
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir'])
+
 
 def test_basic_storage_delete_file():
     tmpdir, this_storage = get_tmp_filestorage()
@@ -191,19 +215,24 @@ def test_basic_storage_delete_file():
     assert os.path.exists(
         os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
 
+    assert this_storage.delete_dir(['dir1', 'dir2']) == False
     this_storage.delete_file(filepath)
+    assert this_storage.delete_dir(['dir1', 'dir2']) == True
     
     assert not os.path.exists(
         os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
 
+    cleanup_storage(this_storage, tmpdir, ['dir1'])
+
 
 def test_basic_storage_url_for_file():
     # Not supplying a base_url should actually just bork.
     tmpdir, this_storage = get_tmp_filestorage()
-    assert_raises(
+    pytest.raises(
         storage.NoWebServing,
         this_storage.file_url,
         ['dir1', 'dir2', 'filename.txt'])
+    cleanup_storage(this_storage, tmpdir)
 
     # base_url without domain
     tmpdir, this_storage = get_tmp_filestorage('/media/')
@@ -211,6 +240,7 @@ def test_basic_storage_url_for_file():
         ['dir1', 'dir2', 'filename.txt'])
     expected = '/media/dir1/dir2/filename.txt'
     assert result == expected
+    cleanup_storage(this_storage, tmpdir)
 
     # base_url with domain
     tmpdir, this_storage = get_tmp_filestorage(
@@ -219,6 +249,7 @@ def test_basic_storage_url_for_file():
         ['dir1', 'dir2', 'filename.txt'])
     expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
     assert result == expected
+    cleanup_storage(this_storage, tmpdir)
 
 
 def test_basic_storage_get_local_path():
@@ -232,10 +263,13 @@ def test_basic_storage_get_local_path():
 
     assert result == expected
 
+    cleanup_storage(this_storage, tmpdir)
+
 
 def test_basic_storage_is_local():
     tmpdir, this_storage = get_tmp_filestorage()
     assert this_storage.local_storage is True
+    cleanup_storage(this_storage, tmpdir)
 
 
 def test_basic_storage_copy_locally():
@@ -250,5 +284,38 @@ def test_basic_storage_copy_locally():
     new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
 
     this_storage.copy_locally(filepath, new_file_dest)
+    this_storage.delete_file(filepath)
     
     assert file(new_file_dest).read() == 'Testing this file'
+
+    os.remove(new_file_dest)
+    os.rmdir(dest_tmpdir)
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def _test_copy_local_to_storage_works(tmpdir, this_storage):
+    local_filename = tempfile.mktemp()
+    with file(local_filename, 'w') as tmpfile:
+        tmpfile.write('haha')
+
+    this_storage.copy_local_to_storage(
+        local_filename, ['dir1', 'dir2', 'copiedto.txt'])
+
+    os.remove(local_filename)
+
+    assert file(
+        os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
+        'r').read() == 'haha'
+
+    this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt'])
+    cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
+
+
+def test_basic_storage_copy_local_to_storage():
+    tmpdir, this_storage = get_tmp_filestorage()
+    _test_copy_local_to_storage_works(tmpdir, this_storage)
+
+
+def test_general_storage_copy_local_to_storage():
+    tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
+    _test_copy_local_to_storage_works(tmpdir, this_storage)