Merge remote-tracking branch 'refs/remotes/tilly-q/ticket-874' into mergetest
[mediagoblin.git] / mediagoblin / tests / test_storage.py
CommitLineData
8e1e744d 1# GNU MediaGoblin -- federated, autonomous media hosting
cf29e8a8 2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
a6b378ef
CAW
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
17e7093e
CAW
18import os
19import tempfile
20
7d503a89 21import pytest
20e3ee11 22from werkzeug.utils import secure_filename
17e7093e 23
a6b378ef
CAW
24from mediagoblin import storage
25
26
ffa22935
CAW
27################
28# Test utilities
29################
30
a6b378ef
CAW
31def test_clean_listy_filepath():
32 expected = [u'dir1', u'dir2', u'linooks.jpg']
33 assert storage.clean_listy_filepath(
34 ['dir1', 'dir2', 'linooks.jpg']) == expected
35
36 expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg']
37 assert storage.clean_listy_filepath(
38 ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
39
40 expected = [u'etc', u'passwd']
41 assert storage.clean_listy_filepath(
42 ['../../../etc/', 'passwd']) == expected
770c12be 43
7d503a89
CAW
44 with pytest.raises(storage.InvalidFilepath):
45 storage.clean_listy_filepath(['../../', 'linooks.jpg'])
17e7093e
CAW
46
47
ffa22935
CAW
48class FakeStorageSystem():
49 def __init__(self, foobie, blech, **kwargs):
50 self.foobie = foobie
51 self.blech = blech
52
a2468d18 53class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
d91b5a7c
CAW
54 # Theoretically despite this, all the methods should work but it
55 # should force copying to the workbench
56 local_storage = False
57
e56e5f8c
CAW
58 def copy_local_to_storage(self, *args, **kwargs):
59 return storage.StorageInterface.copy_local_to_storage(
60 self, *args, **kwargs)
61
ffa22935 62
3c7d11ff
CAW
63def test_storage_system_from_config():
64 this_storage = storage.storage_system_from_config(
63c9a0c7
CAW
65 {'base_url': 'http://example.org/moodia/',
66 'base_dir': '/tmp/',
67 'garbage_arg': 'garbage_arg',
68 'garbage_arg': 'trash'})
ffa22935
CAW
69 assert this_storage.base_url == 'http://example.org/moodia/'
70 assert this_storage.base_dir == '/tmp/'
a2468d18 71 assert this_storage.__class__ is storage.filestorage.BasicFileStorage
ffa22935 72
3c7d11ff 73 this_storage = storage.storage_system_from_config(
63c9a0c7
CAW
74 {'foobie': 'eiboof',
75 'blech': 'hcelb',
76 'garbage_arg': 'garbage_arg',
77 'storage_class':
78 'mediagoblin.tests.test_storage:FakeStorageSystem'})
7d503a89
CAW
79 assert this_storage.foobie == 'eiboof'
80 assert this_storage.blech == 'hcelb'
81 assert unicode(this_storage.__class__) == \
82 u'mediagoblin.tests.test_storage.FakeStorageSystem'
ffa22935
CAW
83
84
17e7093e
CAW
85##########################
86# Basic file storage tests
87##########################
88
d91b5a7c 89def get_tmp_filestorage(mount_url=None, fake_remote=False):
25067d81 90 tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage")
d91b5a7c
CAW
91 if fake_remote:
92 this_storage = FakeRemoteStorage(tmpdir, mount_url)
93 else:
a2468d18 94 this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
17e7093e
CAW
95 return tmpdir, this_storage
96
97
fd1202b7
E
98def cleanup_storage(this_storage, tmpdir, *paths):
99 for p in paths:
100 while p:
101 assert this_storage.delete_dir(p) == True
102 p.pop(-1)
103 os.rmdir(tmpdir)
104
105
17e7093e
CAW
106def test_basic_storage__resolve_filepath():
107 tmpdir, this_storage = get_tmp_filestorage()
108
109 result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
110 assert result == os.path.join(
111 tmpdir, 'dir1/dir2/filename.jpg')
112
113 result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
114 assert result == os.path.join(
115 tmpdir, 'etc/passwd')
116
7d503a89 117 pytest.raises(
17e7093e
CAW
118 storage.InvalidFilepath,
119 this_storage._resolve_filepath,
120 ['../../', 'etc', 'passwd'])
121
fd1202b7 122 cleanup_storage(this_storage, tmpdir)
25067d81 123
17e7093e
CAW
124
125def test_basic_storage_file_exists():
92fb87ae
CAW
126 tmpdir, this_storage = get_tmp_filestorage()
127
128 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
129 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
130 with open(filename, 'w') as ourfile:
131 ourfile.write("I'm having a lovely day!")
132
133 assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
134 assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
135 assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
136
25067d81 137 this_storage.delete_file(['dir1', 'dir2', 'filename.txt'])
fd1202b7 138 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
25067d81 139
92fb87ae 140
20e3ee11
CAW
141def test_basic_storage_get_unique_filepath():
142 tmpdir, this_storage = get_tmp_filestorage()
143
144 # write something that exists
145 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
146 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
147 with open(filename, 'w') as ourfile:
148 ourfile.write("I'm having a lovely day!")
149
150 # now we want something new, with the same name!
151 new_filepath = this_storage.get_unique_filepath(
152 ['dir1', 'dir2', 'filename.txt'])
153 assert new_filepath[:-1] == [u'dir1', u'dir2']
154
155 new_filename = new_filepath[-1]
156 assert new_filename.endswith('filename.txt')
157 assert len(new_filename) > len('filename.txt')
158 assert new_filename == secure_filename(new_filename)
17e7093e 159
25067d81 160 os.remove(filename)
fd1202b7 161 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
25067d81 162
17e7093e
CAW
163
164def test_basic_storage_get_file():
d2be0838
CAW
165 tmpdir, this_storage = get_tmp_filestorage()
166
167 # Write a brand new file
168 filepath = ['dir1', 'dir2', 'ourfile.txt']
169
170 with this_storage.get_file(filepath, 'w') as our_file:
171 our_file.write('First file')
172 with this_storage.get_file(filepath, 'r') as our_file:
173 assert our_file.read() == 'First file'
174 assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
175 with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
176 assert our_file.read() == 'First file'
177
178 # Write to the same path but try to get a unique file.
179 new_filepath = this_storage.get_unique_filepath(filepath)
180 assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
181
182 with this_storage.get_file(new_filepath, 'w') as our_file:
183 our_file.write('Second file')
184 with this_storage.get_file(new_filepath, 'r') as our_file:
185 assert our_file.read() == 'Second file'
186 assert os.path.exists(os.path.join(tmpdir, *new_filepath))
187 with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
188 assert our_file.read() == 'Second file'
189
190 # Read from an existing file
191 manually_written_file = os.makedirs(
192 os.path.join(tmpdir, 'testydir'))
193 with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
194 testyfile.write('testy file! so testy.')
195
196 with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
197 assert testyfile.read() == 'testy file! so testy.'
17e7093e 198
25067d81
E
199 this_storage.delete_file(filepath)
200 this_storage.delete_file(new_filepath)
201 this_storage.delete_file(['testydir', 'testyfile.txt'])
fd1202b7 202 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir'])
25067d81 203
17e7093e
CAW
204
205def test_basic_storage_delete_file():
d024806a
CAW
206 tmpdir, this_storage = get_tmp_filestorage()
207
208 assert not os.path.exists(
209 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
210
211 filepath = ['dir1', 'dir2', 'ourfile.txt']
212 with this_storage.get_file(filepath, 'w') as our_file:
213 our_file.write('Testing this file')
214
215 assert os.path.exists(
216 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
217
fd1202b7 218 assert this_storage.delete_dir(['dir1', 'dir2']) == False
d024806a 219 this_storage.delete_file(filepath)
fd1202b7 220 assert this_storage.delete_dir(['dir1', 'dir2']) == True
d024806a
CAW
221
222 assert not os.path.exists(
223 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
17e7093e 224
fd1202b7
E
225 cleanup_storage(this_storage, tmpdir, ['dir1'])
226
17e7093e
CAW
227
228def test_basic_storage_url_for_file():
01da9e6a
CAW
229 # Not supplying a base_url should actually just bork.
230 tmpdir, this_storage = get_tmp_filestorage()
7d503a89 231 pytest.raises(
01da9e6a
CAW
232 storage.NoWebServing,
233 this_storage.file_url,
234 ['dir1', 'dir2', 'filename.txt'])
fd1202b7 235 cleanup_storage(this_storage, tmpdir)
01da9e6a
CAW
236
237 # base_url without domain
238 tmpdir, this_storage = get_tmp_filestorage('/media/')
239 result = this_storage.file_url(
240 ['dir1', 'dir2', 'filename.txt'])
241 expected = '/media/dir1/dir2/filename.txt'
242 assert result == expected
fd1202b7 243 cleanup_storage(this_storage, tmpdir)
01da9e6a
CAW
244
245 # base_url with domain
246 tmpdir, this_storage = get_tmp_filestorage(
247 'http://media.example.org/ourmedia/')
248 result = this_storage.file_url(
249 ['dir1', 'dir2', 'filename.txt'])
250 expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
251 assert result == expected
fd1202b7 252 cleanup_storage(this_storage, tmpdir)
3a89c23e
CAW
253
254
255def test_basic_storage_get_local_path():
256 tmpdir, this_storage = get_tmp_filestorage()
257
258 result = this_storage.get_local_path(
259 ['dir1', 'dir2', 'filename.txt'])
260
261 expected = os.path.join(
262 tmpdir, 'dir1/dir2/filename.txt')
263
264 assert result == expected
265
fd1202b7 266 cleanup_storage(this_storage, tmpdir)
25067d81 267
3a89c23e
CAW
268
269def test_basic_storage_is_local():
270 tmpdir, this_storage = get_tmp_filestorage()
271 assert this_storage.local_storage is True
fd1202b7 272 cleanup_storage(this_storage, tmpdir)
6a07362d
CAW
273
274
275def test_basic_storage_copy_locally():
276 tmpdir, this_storage = get_tmp_filestorage()
277
278 dest_tmpdir = tempfile.mkdtemp()
279
280 filepath = ['dir1', 'dir2', 'ourfile.txt']
281 with this_storage.get_file(filepath, 'w') as our_file:
282 our_file.write('Testing this file')
283
284 new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
285
286 this_storage.copy_locally(filepath, new_file_dest)
25067d81 287 this_storage.delete_file(filepath)
6a07362d
CAW
288
289 assert file(new_file_dest).read() == 'Testing this file'
e56e5f8c 290
25067d81
E
291 os.remove(new_file_dest)
292 os.rmdir(dest_tmpdir)
fd1202b7 293 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
25067d81 294
e56e5f8c
CAW
295
296def _test_copy_local_to_storage_works(tmpdir, this_storage):
297 local_filename = tempfile.mktemp()
298 with file(local_filename, 'w') as tmpfile:
299 tmpfile.write('haha')
300
301 this_storage.copy_local_to_storage(
302 local_filename, ['dir1', 'dir2', 'copiedto.txt'])
303
25067d81
E
304 os.remove(local_filename)
305
e56e5f8c
CAW
306 assert file(
307 os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
308 'r').read() == 'haha'
309
25067d81 310 this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt'])
fd1202b7 311 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
25067d81 312
e56e5f8c
CAW
313
314def test_basic_storage_copy_local_to_storage():
315 tmpdir, this_storage = get_tmp_filestorage()
316 _test_copy_local_to_storage_works(tmpdir, this_storage)
317
318
319def test_general_storage_copy_local_to_storage():
320 tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
321 _test_copy_local_to_storage_works(tmpdir, this_storage)