e5bc00deefe4f56834b301587d5bdbfea124f3a7
[mediagoblin.git] / mediagoblin / tests / test_storage.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18 import os
19 import tempfile
20
21 import pytest
22 from werkzeug.utils import secure_filename
23
24 from mediagoblin import storage
25 from mediagoblin.tools.testing import _activate_testing
26
27 _activate_testing()
28
29
30 ################
31 # Test utilities
32 ################
33
34 def test_clean_listy_filepath():
35 expected = [u'dir1', u'dir2', u'linooks.jpg']
36 assert storage.clean_listy_filepath(
37 ['dir1', 'dir2', 'linooks.jpg']) == expected
38
39 expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg']
40 assert storage.clean_listy_filepath(
41 ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
42
43 expected = [u'etc', u'passwd']
44 assert storage.clean_listy_filepath(
45 ['../../../etc/', 'passwd']) == expected
46
47 with pytest.raises(storage.InvalidFilepath):
48 storage.clean_listy_filepath(['../../', 'linooks.jpg'])
49
50
51 class FakeStorageSystem():
52 def __init__(self, foobie, blech, **kwargs):
53 self.foobie = foobie
54 self.blech = blech
55
56 class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
57 # Theoretically despite this, all the methods should work but it
58 # should force copying to the workbench
59 local_storage = False
60
61 def copy_local_to_storage(self, *args, **kwargs):
62 return storage.StorageInterface.copy_local_to_storage(
63 self, *args, **kwargs)
64
65
66 def test_storage_system_from_config():
67 this_storage = storage.storage_system_from_config(
68 {'base_url': 'http://example.org/moodia/',
69 'base_dir': '/tmp/',
70 'garbage_arg': 'garbage_arg',
71 'garbage_arg': 'trash'})
72 assert this_storage.base_url == 'http://example.org/moodia/'
73 assert this_storage.base_dir == '/tmp/'
74 assert this_storage.__class__ is storage.filestorage.BasicFileStorage
75
76 this_storage = storage.storage_system_from_config(
77 {'foobie': 'eiboof',
78 'blech': 'hcelb',
79 'garbage_arg': 'garbage_arg',
80 'storage_class':
81 'mediagoblin.tests.test_storage:FakeStorageSystem'})
82 assert this_storage.foobie == 'eiboof'
83 assert this_storage.blech == 'hcelb'
84 assert unicode(this_storage.__class__) == \
85 u'mediagoblin.tests.test_storage.FakeStorageSystem'
86
87
88 ##########################
89 # Basic file storage tests
90 ##########################
91
92 def get_tmp_filestorage(mount_url=None, fake_remote=False):
93 tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage")
94 if fake_remote:
95 this_storage = FakeRemoteStorage(tmpdir, mount_url)
96 else:
97 this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
98 return tmpdir, this_storage
99
100
101 def cleanup_storage(this_storage, tmpdir, *paths):
102 for p in paths:
103 while p:
104 assert this_storage.delete_dir(p) == True
105 p.pop(-1)
106 os.rmdir(tmpdir)
107
108
109 def test_basic_storage__resolve_filepath():
110 tmpdir, this_storage = get_tmp_filestorage()
111
112 result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
113 assert result == os.path.join(
114 tmpdir, 'dir1/dir2/filename.jpg')
115
116 result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
117 assert result == os.path.join(
118 tmpdir, 'etc/passwd')
119
120 pytest.raises(
121 storage.InvalidFilepath,
122 this_storage._resolve_filepath,
123 ['../../', 'etc', 'passwd'])
124
125 cleanup_storage(this_storage, tmpdir)
126
127
128 def test_basic_storage_file_exists():
129 tmpdir, this_storage = get_tmp_filestorage()
130
131 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
132 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
133 with open(filename, 'w') as ourfile:
134 ourfile.write("I'm having a lovely day!")
135
136 assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
137 assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
138 assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
139
140 this_storage.delete_file(['dir1', 'dir2', 'filename.txt'])
141 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
142
143
144 def test_basic_storage_get_unique_filepath():
145 tmpdir, this_storage = get_tmp_filestorage()
146
147 # write something that exists
148 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
149 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
150 with open(filename, 'w') as ourfile:
151 ourfile.write("I'm having a lovely day!")
152
153 # now we want something new, with the same name!
154 new_filepath = this_storage.get_unique_filepath(
155 ['dir1', 'dir2', 'filename.txt'])
156 assert new_filepath[:-1] == [u'dir1', u'dir2']
157
158 new_filename = new_filepath[-1]
159 assert new_filename.endswith('filename.txt')
160 assert len(new_filename) > len('filename.txt')
161 assert new_filename == secure_filename(new_filename)
162
163 os.remove(filename)
164 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
165
166
167 def test_basic_storage_get_file():
168 tmpdir, this_storage = get_tmp_filestorage()
169
170 # Write a brand new file
171 filepath = ['dir1', 'dir2', 'ourfile.txt']
172
173 with this_storage.get_file(filepath, 'w') as our_file:
174 our_file.write('First file')
175 with this_storage.get_file(filepath, 'r') as our_file:
176 assert our_file.read() == 'First file'
177 assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
178 with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
179 assert our_file.read() == 'First file'
180
181 # Write to the same path but try to get a unique file.
182 new_filepath = this_storage.get_unique_filepath(filepath)
183 assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
184
185 with this_storage.get_file(new_filepath, 'w') as our_file:
186 our_file.write('Second file')
187 with this_storage.get_file(new_filepath, 'r') as our_file:
188 assert our_file.read() == 'Second file'
189 assert os.path.exists(os.path.join(tmpdir, *new_filepath))
190 with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
191 assert our_file.read() == 'Second file'
192
193 # Read from an existing file
194 manually_written_file = os.makedirs(
195 os.path.join(tmpdir, 'testydir'))
196 with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
197 testyfile.write('testy file! so testy.')
198
199 with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
200 assert testyfile.read() == 'testy file! so testy.'
201
202 this_storage.delete_file(filepath)
203 this_storage.delete_file(new_filepath)
204 this_storage.delete_file(['testydir', 'testyfile.txt'])
205 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir'])
206
207
208 def test_basic_storage_delete_file():
209 tmpdir, this_storage = get_tmp_filestorage()
210
211 assert not os.path.exists(
212 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
213
214 filepath = ['dir1', 'dir2', 'ourfile.txt']
215 with this_storage.get_file(filepath, 'w') as our_file:
216 our_file.write('Testing this file')
217
218 assert os.path.exists(
219 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
220
221 assert this_storage.delete_dir(['dir1', 'dir2']) == False
222 this_storage.delete_file(filepath)
223 assert this_storage.delete_dir(['dir1', 'dir2']) == True
224
225 assert not os.path.exists(
226 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
227
228 cleanup_storage(this_storage, tmpdir, ['dir1'])
229
230
231 def test_basic_storage_url_for_file():
232 # Not supplying a base_url should actually just bork.
233 tmpdir, this_storage = get_tmp_filestorage()
234 pytest.raises(
235 storage.NoWebServing,
236 this_storage.file_url,
237 ['dir1', 'dir2', 'filename.txt'])
238 cleanup_storage(this_storage, tmpdir)
239
240 # base_url without domain
241 tmpdir, this_storage = get_tmp_filestorage('/media/')
242 result = this_storage.file_url(
243 ['dir1', 'dir2', 'filename.txt'])
244 expected = '/media/dir1/dir2/filename.txt'
245 assert result == expected
246 cleanup_storage(this_storage, tmpdir)
247
248 # base_url with domain
249 tmpdir, this_storage = get_tmp_filestorage(
250 'http://media.example.org/ourmedia/')
251 result = this_storage.file_url(
252 ['dir1', 'dir2', 'filename.txt'])
253 expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
254 assert result == expected
255 cleanup_storage(this_storage, tmpdir)
256
257
258 def test_basic_storage_get_local_path():
259 tmpdir, this_storage = get_tmp_filestorage()
260
261 result = this_storage.get_local_path(
262 ['dir1', 'dir2', 'filename.txt'])
263
264 expected = os.path.join(
265 tmpdir, 'dir1/dir2/filename.txt')
266
267 assert result == expected
268
269 cleanup_storage(this_storage, tmpdir)
270
271
272 def test_basic_storage_is_local():
273 tmpdir, this_storage = get_tmp_filestorage()
274 assert this_storage.local_storage is True
275 cleanup_storage(this_storage, tmpdir)
276
277
278 def test_basic_storage_copy_locally():
279 tmpdir, this_storage = get_tmp_filestorage()
280
281 dest_tmpdir = tempfile.mkdtemp()
282
283 filepath = ['dir1', 'dir2', 'ourfile.txt']
284 with this_storage.get_file(filepath, 'w') as our_file:
285 our_file.write('Testing this file')
286
287 new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
288
289 this_storage.copy_locally(filepath, new_file_dest)
290 this_storage.delete_file(filepath)
291
292 assert file(new_file_dest).read() == 'Testing this file'
293
294 os.remove(new_file_dest)
295 os.rmdir(dest_tmpdir)
296 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
297
298
299 def _test_copy_local_to_storage_works(tmpdir, this_storage):
300 local_filename = tempfile.mktemp()
301 with file(local_filename, 'w') as tmpfile:
302 tmpfile.write('haha')
303
304 this_storage.copy_local_to_storage(
305 local_filename, ['dir1', 'dir2', 'copiedto.txt'])
306
307 os.remove(local_filename)
308
309 assert file(
310 os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
311 'r').read() == 'haha'
312
313 this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt'])
314 cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'])
315
316
317 def test_basic_storage_copy_local_to_storage():
318 tmpdir, this_storage = get_tmp_filestorage()
319 _test_copy_local_to_storage_works(tmpdir, this_storage)
320
321
322 def test_general_storage_copy_local_to_storage():
323 tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
324 _test_copy_local_to_storage_works(tmpdir, this_storage)