Update version for release.
[mediagoblin.git] / mediagoblin / tests / test_storage.py
CommitLineData
8e1e744d 1# GNU MediaGoblin -- federated, autonomous media hosting
cf29e8a8 2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
a6b378ef
CAW
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
17e7093e
CAW
18import os
19import tempfile
20
7d503a89 21import pytest
e49b7e02
BP
22import six
23
20e3ee11 24from werkzeug.utils import secure_filename
17e7093e 25
a6b378ef
CAW
26from mediagoblin import storage
27
28
ffa22935
CAW
29################
30# Test utilities
31################
32
a6b378ef
CAW
33def test_clean_listy_filepath():
34 expected = [u'dir1', u'dir2', u'linooks.jpg']
35 assert storage.clean_listy_filepath(
36 ['dir1', 'dir2', 'linooks.jpg']) == expected
37
38 expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg']
39 assert storage.clean_listy_filepath(
40 ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
41
42 expected = [u'etc', u'passwd']
43 assert storage.clean_listy_filepath(
44 ['../../../etc/', 'passwd']) == expected
770c12be 45
7d503a89
CAW
46 with pytest.raises(storage.InvalidFilepath):
47 storage.clean_listy_filepath(['../../', 'linooks.jpg'])
17e7093e
CAW
48
49
ef3badb3 50class FakeStorageSystem(object):
ffa22935
CAW
51 def __init__(self, foobie, blech, **kwargs):
52 self.foobie = foobie
53 self.blech = blech
54
a2468d18 55class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
d91b5a7c
CAW
56 # Theoretically despite this, all the methods should work but it
57 # should force copying to the workbench
58 local_storage = False
59
e56e5f8c
CAW
60 def copy_local_to_storage(self, *args, **kwargs):
61 return storage.StorageInterface.copy_local_to_storage(
62 self, *args, **kwargs)
63
ffa22935 64
3c7d11ff
CAW
65def test_storage_system_from_config():
66 this_storage = storage.storage_system_from_config(
63c9a0c7
CAW
67 {'base_url': 'http://example.org/moodia/',
68 'base_dir': '/tmp/',
69 'garbage_arg': 'garbage_arg',
70 'garbage_arg': 'trash'})
ffa22935
CAW
71 assert this_storage.base_url == 'http://example.org/moodia/'
72 assert this_storage.base_dir == '/tmp/'
a2468d18 73 assert this_storage.__class__ is storage.filestorage.BasicFileStorage
ffa22935 74
3c7d11ff 75 this_storage = storage.storage_system_from_config(
63c9a0c7
CAW
76 {'foobie': 'eiboof',
77 'blech': 'hcelb',
78 'garbage_arg': 'garbage_arg',
79 'storage_class':
80 'mediagoblin.tests.test_storage:FakeStorageSystem'})
7d503a89
CAW
81 assert this_storage.foobie == 'eiboof'
82 assert this_storage.blech == 'hcelb'
e49b7e02 83 assert six.text_type(this_storage.__class__) == \
ef3badb3 84 u"<class 'mediagoblin.tests.test_storage.FakeStorageSystem'>"
ffa22935
CAW
85
86
17e7093e
CAW
87##########################
88# Basic file storage tests
89##########################
90
d91b5a7c 91def get_tmp_filestorage(mount_url=None, fake_remote=False):
25067d81 92 tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage")
d91b5a7c
CAW
93 if fake_remote:
94 this_storage = FakeRemoteStorage(tmpdir, mount_url)
95 else:
a2468d18 96 this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
17e7093e
CAW
97 return tmpdir, this_storage
98
99
fd1202b7
E
100def cleanup_storage(this_storage, tmpdir, *paths):
101 for p in paths:
102 while p:
103 assert this_storage.delete_dir(p) == True
104 p.pop(-1)
105 os.rmdir(tmpdir)
106
107
17e7093e
CAW
108def test_basic_storage__resolve_filepath():
109 tmpdir, this_storage = get_tmp_filestorage()
110
111 result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
112 assert result == os.path.join(
113 tmpdir, 'dir1/dir2/filename.jpg')
114
115 result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
116 assert result == os.path.join(
117 tmpdir, 'etc/passwd')
118
7d503a89 119 pytest.raises(
17e7093e
CAW
120 storage.InvalidFilepath,
121 this_storage._resolve_filepath,
122 ['../../', 'etc', 'passwd'])
123
fd1202b7 124 cleanup_storage(this_storage, tmpdir)
25067d81 125
17e7093e
CAW
126
127def test_basic_storage_file_exists():
92fb87ae
CAW
128 tmpdir, this_storage = get_tmp_filestorage()
129
130 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
131 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
132 with open(filename, 'w') as ourfile:
133 ourfile.write("I'm having a lovely day!")
134
135 assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
136 assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
137 assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
138
25067d81 139 this_storage.delete_file(['dir1', 'dir2', 'filename.txt'])
fd1202b7 140 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
25067d81 141
92fb87ae 142
20e3ee11
CAW
143def test_basic_storage_get_unique_filepath():
144 tmpdir, this_storage = get_tmp_filestorage()
145
146 # write something that exists
147 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
148 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
149 with open(filename, 'w') as ourfile:
150 ourfile.write("I'm having a lovely day!")
151
152 # now we want something new, with the same name!
153 new_filepath = this_storage.get_unique_filepath(
154 ['dir1', 'dir2', 'filename.txt'])
155 assert new_filepath[:-1] == [u'dir1', u'dir2']
156
157 new_filename = new_filepath[-1]
158 assert new_filename.endswith('filename.txt')
159 assert len(new_filename) > len('filename.txt')
160 assert new_filename == secure_filename(new_filename)
17e7093e 161
25067d81 162 os.remove(filename)
fd1202b7 163 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
25067d81 164
17e7093e
CAW
165
166def test_basic_storage_get_file():
d2be0838
CAW
167 tmpdir, this_storage = get_tmp_filestorage()
168
169 # Write a brand new file
170 filepath = ['dir1', 'dir2', 'ourfile.txt']
171
172 with this_storage.get_file(filepath, 'w') as our_file:
64b989a7 173 our_file.write(b'First file')
d2be0838 174 with this_storage.get_file(filepath, 'r') as our_file:
64b989a7 175 assert our_file.read() == b'First file'
d2be0838 176 assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
d9aced73 177 with open(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
d2be0838
CAW
178 assert our_file.read() == 'First file'
179
180 # Write to the same path but try to get a unique file.
181 new_filepath = this_storage.get_unique_filepath(filepath)
182 assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
183
184 with this_storage.get_file(new_filepath, 'w') as our_file:
64b989a7 185 our_file.write(b'Second file')
d2be0838 186 with this_storage.get_file(new_filepath, 'r') as our_file:
64b989a7 187 assert our_file.read() == b'Second file'
d2be0838 188 assert os.path.exists(os.path.join(tmpdir, *new_filepath))
d9aced73 189 with open(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
d2be0838
CAW
190 assert our_file.read() == 'Second file'
191
192 # Read from an existing file
193 manually_written_file = os.makedirs(
194 os.path.join(tmpdir, 'testydir'))
d9aced73 195 with open(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
d2be0838
CAW
196 testyfile.write('testy file! so testy.')
197
198 with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
64b989a7 199 assert testyfile.read() == b'testy file! so testy.'
17e7093e 200
25067d81
E
201 this_storage.delete_file(filepath)
202 this_storage.delete_file(new_filepath)
203 this_storage.delete_file(['testydir', 'testyfile.txt'])
fd1202b7 204 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir'])
25067d81 205
17e7093e
CAW
206
207def test_basic_storage_delete_file():
d024806a
CAW
208 tmpdir, this_storage = get_tmp_filestorage()
209
210 assert not os.path.exists(
211 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
212
213 filepath = ['dir1', 'dir2', 'ourfile.txt']
214 with this_storage.get_file(filepath, 'w') as our_file:
64b989a7 215 our_file.write(b'Testing this file')
d024806a
CAW
216
217 assert os.path.exists(
218 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
219
fd1202b7 220 assert this_storage.delete_dir(['dir1', 'dir2']) == False
d024806a 221 this_storage.delete_file(filepath)
fd1202b7 222 assert this_storage.delete_dir(['dir1', 'dir2']) == True
d024806a
CAW
223
224 assert not os.path.exists(
225 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
17e7093e 226
fd1202b7
E
227 cleanup_storage(this_storage, tmpdir, ['dir1'])
228
17e7093e
CAW
229
230def test_basic_storage_url_for_file():
01da9e6a
CAW
231 # Not supplying a base_url should actually just bork.
232 tmpdir, this_storage = get_tmp_filestorage()
7d503a89 233 pytest.raises(
01da9e6a
CAW
234 storage.NoWebServing,
235 this_storage.file_url,
236 ['dir1', 'dir2', 'filename.txt'])
fd1202b7 237 cleanup_storage(this_storage, tmpdir)
01da9e6a
CAW
238
239 # base_url without domain
240 tmpdir, this_storage = get_tmp_filestorage('/media/')
241 result = this_storage.file_url(
242 ['dir1', 'dir2', 'filename.txt'])
243 expected = '/media/dir1/dir2/filename.txt'
244 assert result == expected
fd1202b7 245 cleanup_storage(this_storage, tmpdir)
01da9e6a
CAW
246
247 # base_url with domain
248 tmpdir, this_storage = get_tmp_filestorage(
249 'http://media.example.org/ourmedia/')
250 result = this_storage.file_url(
251 ['dir1', 'dir2', 'filename.txt'])
252 expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
253 assert result == expected
fd1202b7 254 cleanup_storage(this_storage, tmpdir)
3a89c23e
CAW
255
256
257def test_basic_storage_get_local_path():
258 tmpdir, this_storage = get_tmp_filestorage()
259
260 result = this_storage.get_local_path(
261 ['dir1', 'dir2', 'filename.txt'])
262
263 expected = os.path.join(
264 tmpdir, 'dir1/dir2/filename.txt')
265
266 assert result == expected
267
fd1202b7 268 cleanup_storage(this_storage, tmpdir)
25067d81 269
3a89c23e
CAW
270
271def test_basic_storage_is_local():
272 tmpdir, this_storage = get_tmp_filestorage()
273 assert this_storage.local_storage is True
fd1202b7 274 cleanup_storage(this_storage, tmpdir)
6a07362d
CAW
275
276
277def test_basic_storage_copy_locally():
278 tmpdir, this_storage = get_tmp_filestorage()
279
280 dest_tmpdir = tempfile.mkdtemp()
281
282 filepath = ['dir1', 'dir2', 'ourfile.txt']
283 with this_storage.get_file(filepath, 'w') as our_file:
64b989a7 284 our_file.write(b'Testing this file')
6a07362d
CAW
285
286 new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
287
288 this_storage.copy_locally(filepath, new_file_dest)
25067d81 289 this_storage.delete_file(filepath)
6a07362d 290
d9aced73 291 assert open(new_file_dest).read() == 'Testing this file'
e56e5f8c 292
25067d81
E
293 os.remove(new_file_dest)
294 os.rmdir(dest_tmpdir)
fd1202b7 295 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
25067d81 296
e56e5f8c
CAW
297
298def _test_copy_local_to_storage_works(tmpdir, this_storage):
299 local_filename = tempfile.mktemp()
d9aced73 300 with open(local_filename, 'w') as tmpfile:
e56e5f8c
CAW
301 tmpfile.write('haha')
302
303 this_storage.copy_local_to_storage(
304 local_filename, ['dir1', 'dir2', 'copiedto.txt'])
305
25067d81
E
306 os.remove(local_filename)
307
d9aced73 308 assert open(
e56e5f8c
CAW
309 os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
310 'r').read() == 'haha'
311
25067d81 312 this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt'])
fd1202b7 313 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
25067d81 314
e56e5f8c
CAW
315
316def test_basic_storage_copy_local_to_storage():
317 tmpdir, this_storage = get_tmp_filestorage()
318 _test_copy_local_to_storage_works(tmpdir, this_storage)
319
320
321def test_general_storage_copy_local_to_storage():
322 tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
323 _test_copy_local_to_storage_works(tmpdir, this_storage)