Commit | Line | Data |
---|---|---|
8e1e744d | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
cf29e8a8 | 2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
a6b378ef CAW |
3 | # |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | ||
17e7093e CAW |
18 | import os |
19 | import tempfile | |
20 | ||
7d503a89 | 21 | import pytest |
e49b7e02 BP |
22 | import six |
23 | ||
20e3ee11 | 24 | from werkzeug.utils import secure_filename |
17e7093e | 25 | |
a6b378ef CAW |
26 | from mediagoblin import storage |
27 | ||
28 | ||
ffa22935 CAW |
29 | ################ |
30 | # Test utilities | |
31 | ################ | |
32 | ||
a6b378ef CAW |
33 | def test_clean_listy_filepath(): |
34 | expected = [u'dir1', u'dir2', u'linooks.jpg'] | |
35 | assert storage.clean_listy_filepath( | |
36 | ['dir1', 'dir2', 'linooks.jpg']) == expected | |
37 | ||
38 | expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg'] | |
39 | assert storage.clean_listy_filepath( | |
40 | ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected | |
41 | ||
42 | expected = [u'etc', u'passwd'] | |
43 | assert storage.clean_listy_filepath( | |
44 | ['../../../etc/', 'passwd']) == expected | |
770c12be | 45 | |
7d503a89 CAW |
46 | with pytest.raises(storage.InvalidFilepath): |
47 | storage.clean_listy_filepath(['../../', 'linooks.jpg']) | |
17e7093e CAW |
48 | |
49 | ||
ef3badb3 | 50 | class FakeStorageSystem(object): |
ffa22935 CAW |
51 | def __init__(self, foobie, blech, **kwargs): |
52 | self.foobie = foobie | |
53 | self.blech = blech | |
54 | ||
a2468d18 | 55 | class FakeRemoteStorage(storage.filestorage.BasicFileStorage): |
d91b5a7c CAW |
56 | # Theoretically despite this, all the methods should work but it |
57 | # should force copying to the workbench | |
58 | local_storage = False | |
59 | ||
e56e5f8c CAW |
60 | def copy_local_to_storage(self, *args, **kwargs): |
61 | return storage.StorageInterface.copy_local_to_storage( | |
62 | self, *args, **kwargs) | |
63 | ||
ffa22935 | 64 | |
3c7d11ff CAW |
65 | def test_storage_system_from_config(): |
66 | this_storage = storage.storage_system_from_config( | |
63c9a0c7 CAW |
67 | {'base_url': 'http://example.org/moodia/', |
68 | 'base_dir': '/tmp/', | |
69 | 'garbage_arg': 'garbage_arg', | |
70 | 'garbage_arg': 'trash'}) | |
ffa22935 CAW |
71 | assert this_storage.base_url == 'http://example.org/moodia/' |
72 | assert this_storage.base_dir == '/tmp/' | |
a2468d18 | 73 | assert this_storage.__class__ is storage.filestorage.BasicFileStorage |
ffa22935 | 74 | |
3c7d11ff | 75 | this_storage = storage.storage_system_from_config( |
63c9a0c7 CAW |
76 | {'foobie': 'eiboof', |
77 | 'blech': 'hcelb', | |
78 | 'garbage_arg': 'garbage_arg', | |
79 | 'storage_class': | |
80 | 'mediagoblin.tests.test_storage:FakeStorageSystem'}) | |
7d503a89 CAW |
81 | assert this_storage.foobie == 'eiboof' |
82 | assert this_storage.blech == 'hcelb' | |
e49b7e02 | 83 | assert six.text_type(this_storage.__class__) == \ |
ef3badb3 | 84 | u"<class 'mediagoblin.tests.test_storage.FakeStorageSystem'>" |
ffa22935 CAW |
85 | |
86 | ||
17e7093e CAW |
87 | ########################## |
88 | # Basic file storage tests | |
89 | ########################## | |
90 | ||
d91b5a7c | 91 | def get_tmp_filestorage(mount_url=None, fake_remote=False): |
25067d81 | 92 | tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage") |
d91b5a7c CAW |
93 | if fake_remote: |
94 | this_storage = FakeRemoteStorage(tmpdir, mount_url) | |
95 | else: | |
a2468d18 | 96 | this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url) |
17e7093e CAW |
97 | return tmpdir, this_storage |
98 | ||
99 | ||
fd1202b7 E |
100 | def cleanup_storage(this_storage, tmpdir, *paths): |
101 | for p in paths: | |
102 | while p: | |
103 | assert this_storage.delete_dir(p) == True | |
104 | p.pop(-1) | |
105 | os.rmdir(tmpdir) | |
106 | ||
107 | ||
17e7093e CAW |
108 | def test_basic_storage__resolve_filepath(): |
109 | tmpdir, this_storage = get_tmp_filestorage() | |
110 | ||
111 | result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg']) | |
112 | assert result == os.path.join( | |
113 | tmpdir, 'dir1/dir2/filename.jpg') | |
114 | ||
115 | result = this_storage._resolve_filepath(['../../etc/', 'passwd']) | |
116 | assert result == os.path.join( | |
117 | tmpdir, 'etc/passwd') | |
118 | ||
7d503a89 | 119 | pytest.raises( |
17e7093e CAW |
120 | storage.InvalidFilepath, |
121 | this_storage._resolve_filepath, | |
122 | ['../../', 'etc', 'passwd']) | |
123 | ||
fd1202b7 | 124 | cleanup_storage(this_storage, tmpdir) |
25067d81 | 125 | |
17e7093e CAW |
126 | |
127 | def test_basic_storage_file_exists(): | |
92fb87ae CAW |
128 | tmpdir, this_storage = get_tmp_filestorage() |
129 | ||
130 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
131 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
132 | with open(filename, 'w') as ourfile: | |
133 | ourfile.write("I'm having a lovely day!") | |
134 | ||
135 | assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt']) | |
136 | assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) | |
137 | assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) | |
138 | ||
25067d81 | 139 | this_storage.delete_file(['dir1', 'dir2', 'filename.txt']) |
fd1202b7 | 140 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 | 141 | |
92fb87ae | 142 | |
20e3ee11 CAW |
143 | def test_basic_storage_get_unique_filepath(): |
144 | tmpdir, this_storage = get_tmp_filestorage() | |
145 | ||
146 | # write something that exists | |
147 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
148 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
149 | with open(filename, 'w') as ourfile: | |
150 | ourfile.write("I'm having a lovely day!") | |
151 | ||
152 | # now we want something new, with the same name! | |
153 | new_filepath = this_storage.get_unique_filepath( | |
154 | ['dir1', 'dir2', 'filename.txt']) | |
155 | assert new_filepath[:-1] == [u'dir1', u'dir2'] | |
156 | ||
157 | new_filename = new_filepath[-1] | |
158 | assert new_filename.endswith('filename.txt') | |
159 | assert len(new_filename) > len('filename.txt') | |
160 | assert new_filename == secure_filename(new_filename) | |
17e7093e | 161 | |
25067d81 | 162 | os.remove(filename) |
fd1202b7 | 163 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 | 164 | |
17e7093e CAW |
165 | |
166 | def test_basic_storage_get_file(): | |
d2be0838 CAW |
167 | tmpdir, this_storage = get_tmp_filestorage() |
168 | ||
169 | # Write a brand new file | |
170 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
171 | ||
172 | with this_storage.get_file(filepath, 'w') as our_file: | |
173 | our_file.write('First file') | |
174 | with this_storage.get_file(filepath, 'r') as our_file: | |
175 | assert our_file.read() == 'First file' | |
176 | assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
d9aced73 | 177 | with open(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: |
d2be0838 CAW |
178 | assert our_file.read() == 'First file' |
179 | ||
180 | # Write to the same path but try to get a unique file. | |
181 | new_filepath = this_storage.get_unique_filepath(filepath) | |
182 | assert not os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
183 | ||
184 | with this_storage.get_file(new_filepath, 'w') as our_file: | |
185 | our_file.write('Second file') | |
186 | with this_storage.get_file(new_filepath, 'r') as our_file: | |
187 | assert our_file.read() == 'Second file' | |
188 | assert os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
d9aced73 | 189 | with open(os.path.join(tmpdir, *new_filepath), 'r') as our_file: |
d2be0838 CAW |
190 | assert our_file.read() == 'Second file' |
191 | ||
192 | # Read from an existing file | |
193 | manually_written_file = os.makedirs( | |
194 | os.path.join(tmpdir, 'testydir')) | |
d9aced73 | 195 | with open(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: |
d2be0838 CAW |
196 | testyfile.write('testy file! so testy.') |
197 | ||
198 | with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: | |
199 | assert testyfile.read() == 'testy file! so testy.' | |
17e7093e | 200 | |
25067d81 E |
201 | this_storage.delete_file(filepath) |
202 | this_storage.delete_file(new_filepath) | |
203 | this_storage.delete_file(['testydir', 'testyfile.txt']) | |
fd1202b7 | 204 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir']) |
25067d81 | 205 | |
17e7093e CAW |
206 | |
207 | def test_basic_storage_delete_file(): | |
d024806a CAW |
208 | tmpdir, this_storage = get_tmp_filestorage() |
209 | ||
210 | assert not os.path.exists( | |
211 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
212 | ||
213 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
214 | with this_storage.get_file(filepath, 'w') as our_file: | |
215 | our_file.write('Testing this file') | |
216 | ||
217 | assert os.path.exists( | |
218 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
219 | ||
fd1202b7 | 220 | assert this_storage.delete_dir(['dir1', 'dir2']) == False |
d024806a | 221 | this_storage.delete_file(filepath) |
fd1202b7 | 222 | assert this_storage.delete_dir(['dir1', 'dir2']) == True |
d024806a CAW |
223 | |
224 | assert not os.path.exists( | |
225 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
17e7093e | 226 | |
fd1202b7 E |
227 | cleanup_storage(this_storage, tmpdir, ['dir1']) |
228 | ||
17e7093e CAW |
229 | |
230 | def test_basic_storage_url_for_file(): | |
01da9e6a CAW |
231 | # Not supplying a base_url should actually just bork. |
232 | tmpdir, this_storage = get_tmp_filestorage() | |
7d503a89 | 233 | pytest.raises( |
01da9e6a CAW |
234 | storage.NoWebServing, |
235 | this_storage.file_url, | |
236 | ['dir1', 'dir2', 'filename.txt']) | |
fd1202b7 | 237 | cleanup_storage(this_storage, tmpdir) |
01da9e6a CAW |
238 | |
239 | # base_url without domain | |
240 | tmpdir, this_storage = get_tmp_filestorage('/media/') | |
241 | result = this_storage.file_url( | |
242 | ['dir1', 'dir2', 'filename.txt']) | |
243 | expected = '/media/dir1/dir2/filename.txt' | |
244 | assert result == expected | |
fd1202b7 | 245 | cleanup_storage(this_storage, tmpdir) |
01da9e6a CAW |
246 | |
247 | # base_url with domain | |
248 | tmpdir, this_storage = get_tmp_filestorage( | |
249 | 'http://media.example.org/ourmedia/') | |
250 | result = this_storage.file_url( | |
251 | ['dir1', 'dir2', 'filename.txt']) | |
252 | expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' | |
253 | assert result == expected | |
fd1202b7 | 254 | cleanup_storage(this_storage, tmpdir) |
3a89c23e CAW |
255 | |
256 | ||
257 | def test_basic_storage_get_local_path(): | |
258 | tmpdir, this_storage = get_tmp_filestorage() | |
259 | ||
260 | result = this_storage.get_local_path( | |
261 | ['dir1', 'dir2', 'filename.txt']) | |
262 | ||
263 | expected = os.path.join( | |
264 | tmpdir, 'dir1/dir2/filename.txt') | |
265 | ||
266 | assert result == expected | |
267 | ||
fd1202b7 | 268 | cleanup_storage(this_storage, tmpdir) |
25067d81 | 269 | |
3a89c23e CAW |
270 | |
271 | def test_basic_storage_is_local(): | |
272 | tmpdir, this_storage = get_tmp_filestorage() | |
273 | assert this_storage.local_storage is True | |
fd1202b7 | 274 | cleanup_storage(this_storage, tmpdir) |
6a07362d CAW |
275 | |
276 | ||
277 | def test_basic_storage_copy_locally(): | |
278 | tmpdir, this_storage = get_tmp_filestorage() | |
279 | ||
280 | dest_tmpdir = tempfile.mkdtemp() | |
281 | ||
282 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
283 | with this_storage.get_file(filepath, 'w') as our_file: | |
284 | our_file.write('Testing this file') | |
285 | ||
286 | new_file_dest = os.path.join(dest_tmpdir, 'file2.txt') | |
287 | ||
288 | this_storage.copy_locally(filepath, new_file_dest) | |
25067d81 | 289 | this_storage.delete_file(filepath) |
6a07362d | 290 | |
d9aced73 | 291 | assert open(new_file_dest).read() == 'Testing this file' |
e56e5f8c | 292 | |
25067d81 E |
293 | os.remove(new_file_dest) |
294 | os.rmdir(dest_tmpdir) | |
fd1202b7 | 295 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 | 296 | |
e56e5f8c CAW |
297 | |
298 | def _test_copy_local_to_storage_works(tmpdir, this_storage): | |
299 | local_filename = tempfile.mktemp() | |
d9aced73 | 300 | with open(local_filename, 'w') as tmpfile: |
e56e5f8c CAW |
301 | tmpfile.write('haha') |
302 | ||
303 | this_storage.copy_local_to_storage( | |
304 | local_filename, ['dir1', 'dir2', 'copiedto.txt']) | |
305 | ||
25067d81 E |
306 | os.remove(local_filename) |
307 | ||
d9aced73 | 308 | assert open( |
e56e5f8c CAW |
309 | os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'), |
310 | 'r').read() == 'haha' | |
311 | ||
25067d81 | 312 | this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt']) |
fd1202b7 | 313 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 | 314 | |
e56e5f8c CAW |
315 | |
316 | def test_basic_storage_copy_local_to_storage(): | |
317 | tmpdir, this_storage = get_tmp_filestorage() | |
318 | _test_copy_local_to_storage_works(tmpdir, this_storage) | |
319 | ||
320 | ||
321 | def test_general_storage_copy_local_to_storage(): | |
322 | tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) | |
323 | _test_copy_local_to_storage_works(tmpdir, this_storage) |