Commit | Line | Data |
---|---|---|
8e1e744d | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
cf29e8a8 | 2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
a6b378ef CAW |
3 | # |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | ||
17e7093e CAW |
18 | import os |
19 | import tempfile | |
20 | ||
7d503a89 | 21 | import pytest |
20e3ee11 | 22 | from werkzeug.utils import secure_filename |
17e7093e | 23 | |
a6b378ef CAW |
24 | from mediagoblin import storage |
25 | ||
26 | ||
ffa22935 CAW |
27 | ################ |
28 | # Test utilities | |
29 | ################ | |
30 | ||
a6b378ef CAW |
31 | def test_clean_listy_filepath(): |
32 | expected = [u'dir1', u'dir2', u'linooks.jpg'] | |
33 | assert storage.clean_listy_filepath( | |
34 | ['dir1', 'dir2', 'linooks.jpg']) == expected | |
35 | ||
36 | expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg'] | |
37 | assert storage.clean_listy_filepath( | |
38 | ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected | |
39 | ||
40 | expected = [u'etc', u'passwd'] | |
41 | assert storage.clean_listy_filepath( | |
42 | ['../../../etc/', 'passwd']) == expected | |
770c12be | 43 | |
7d503a89 CAW |
44 | with pytest.raises(storage.InvalidFilepath): |
45 | storage.clean_listy_filepath(['../../', 'linooks.jpg']) | |
17e7093e CAW |
46 | |
47 | ||
ffa22935 CAW |
48 | class FakeStorageSystem(): |
49 | def __init__(self, foobie, blech, **kwargs): | |
50 | self.foobie = foobie | |
51 | self.blech = blech | |
52 | ||
a2468d18 | 53 | class FakeRemoteStorage(storage.filestorage.BasicFileStorage): |
d91b5a7c CAW |
54 | # Theoretically despite this, all the methods should work but it |
55 | # should force copying to the workbench | |
56 | local_storage = False | |
57 | ||
e56e5f8c CAW |
58 | def copy_local_to_storage(self, *args, **kwargs): |
59 | return storage.StorageInterface.copy_local_to_storage( | |
60 | self, *args, **kwargs) | |
61 | ||
ffa22935 | 62 | |
3c7d11ff CAW |
63 | def test_storage_system_from_config(): |
64 | this_storage = storage.storage_system_from_config( | |
63c9a0c7 CAW |
65 | {'base_url': 'http://example.org/moodia/', |
66 | 'base_dir': '/tmp/', | |
67 | 'garbage_arg': 'garbage_arg', | |
68 | 'garbage_arg': 'trash'}) | |
ffa22935 CAW |
69 | assert this_storage.base_url == 'http://example.org/moodia/' |
70 | assert this_storage.base_dir == '/tmp/' | |
a2468d18 | 71 | assert this_storage.__class__ is storage.filestorage.BasicFileStorage |
ffa22935 | 72 | |
3c7d11ff | 73 | this_storage = storage.storage_system_from_config( |
63c9a0c7 CAW |
74 | {'foobie': 'eiboof', |
75 | 'blech': 'hcelb', | |
76 | 'garbage_arg': 'garbage_arg', | |
77 | 'storage_class': | |
78 | 'mediagoblin.tests.test_storage:FakeStorageSystem'}) | |
7d503a89 CAW |
79 | assert this_storage.foobie == 'eiboof' |
80 | assert this_storage.blech == 'hcelb' | |
81 | assert unicode(this_storage.__class__) == \ | |
82 | u'mediagoblin.tests.test_storage.FakeStorageSystem' | |
ffa22935 CAW |
83 | |
84 | ||
17e7093e CAW |
85 | ########################## |
86 | # Basic file storage tests | |
87 | ########################## | |
88 | ||
d91b5a7c | 89 | def get_tmp_filestorage(mount_url=None, fake_remote=False): |
25067d81 | 90 | tmpdir = tempfile.mkdtemp(prefix="test_gmg_storage") |
d91b5a7c CAW |
91 | if fake_remote: |
92 | this_storage = FakeRemoteStorage(tmpdir, mount_url) | |
93 | else: | |
a2468d18 | 94 | this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url) |
17e7093e CAW |
95 | return tmpdir, this_storage |
96 | ||
97 | ||
fd1202b7 E |
98 | def cleanup_storage(this_storage, tmpdir, *paths): |
99 | for p in paths: | |
100 | while p: | |
101 | assert this_storage.delete_dir(p) == True | |
102 | p.pop(-1) | |
103 | os.rmdir(tmpdir) | |
104 | ||
105 | ||
17e7093e CAW |
106 | def test_basic_storage__resolve_filepath(): |
107 | tmpdir, this_storage = get_tmp_filestorage() | |
108 | ||
109 | result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg']) | |
110 | assert result == os.path.join( | |
111 | tmpdir, 'dir1/dir2/filename.jpg') | |
112 | ||
113 | result = this_storage._resolve_filepath(['../../etc/', 'passwd']) | |
114 | assert result == os.path.join( | |
115 | tmpdir, 'etc/passwd') | |
116 | ||
7d503a89 | 117 | pytest.raises( |
17e7093e CAW |
118 | storage.InvalidFilepath, |
119 | this_storage._resolve_filepath, | |
120 | ['../../', 'etc', 'passwd']) | |
121 | ||
fd1202b7 | 122 | cleanup_storage(this_storage, tmpdir) |
25067d81 | 123 | |
17e7093e CAW |
124 | |
125 | def test_basic_storage_file_exists(): | |
92fb87ae CAW |
126 | tmpdir, this_storage = get_tmp_filestorage() |
127 | ||
128 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
129 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
130 | with open(filename, 'w') as ourfile: | |
131 | ourfile.write("I'm having a lovely day!") | |
132 | ||
133 | assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt']) | |
134 | assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) | |
135 | assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) | |
136 | ||
25067d81 | 137 | this_storage.delete_file(['dir1', 'dir2', 'filename.txt']) |
fd1202b7 | 138 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 | 139 | |
92fb87ae | 140 | |
20e3ee11 CAW |
141 | def test_basic_storage_get_unique_filepath(): |
142 | tmpdir, this_storage = get_tmp_filestorage() | |
143 | ||
144 | # write something that exists | |
145 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
146 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
147 | with open(filename, 'w') as ourfile: | |
148 | ourfile.write("I'm having a lovely day!") | |
149 | ||
150 | # now we want something new, with the same name! | |
151 | new_filepath = this_storage.get_unique_filepath( | |
152 | ['dir1', 'dir2', 'filename.txt']) | |
153 | assert new_filepath[:-1] == [u'dir1', u'dir2'] | |
154 | ||
155 | new_filename = new_filepath[-1] | |
156 | assert new_filename.endswith('filename.txt') | |
157 | assert len(new_filename) > len('filename.txt') | |
158 | assert new_filename == secure_filename(new_filename) | |
17e7093e | 159 | |
25067d81 | 160 | os.remove(filename) |
fd1202b7 | 161 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 | 162 | |
17e7093e CAW |
163 | |
164 | def test_basic_storage_get_file(): | |
d2be0838 CAW |
165 | tmpdir, this_storage = get_tmp_filestorage() |
166 | ||
167 | # Write a brand new file | |
168 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
169 | ||
170 | with this_storage.get_file(filepath, 'w') as our_file: | |
171 | our_file.write('First file') | |
172 | with this_storage.get_file(filepath, 'r') as our_file: | |
173 | assert our_file.read() == 'First file' | |
174 | assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
175 | with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: | |
176 | assert our_file.read() == 'First file' | |
177 | ||
178 | # Write to the same path but try to get a unique file. | |
179 | new_filepath = this_storage.get_unique_filepath(filepath) | |
180 | assert not os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
181 | ||
182 | with this_storage.get_file(new_filepath, 'w') as our_file: | |
183 | our_file.write('Second file') | |
184 | with this_storage.get_file(new_filepath, 'r') as our_file: | |
185 | assert our_file.read() == 'Second file' | |
186 | assert os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
187 | with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file: | |
188 | assert our_file.read() == 'Second file' | |
189 | ||
190 | # Read from an existing file | |
191 | manually_written_file = os.makedirs( | |
192 | os.path.join(tmpdir, 'testydir')) | |
193 | with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: | |
194 | testyfile.write('testy file! so testy.') | |
195 | ||
196 | with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: | |
197 | assert testyfile.read() == 'testy file! so testy.' | |
17e7093e | 198 | |
25067d81 E |
199 | this_storage.delete_file(filepath) |
200 | this_storage.delete_file(new_filepath) | |
201 | this_storage.delete_file(['testydir', 'testyfile.txt']) | |
fd1202b7 | 202 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2'], ['testydir']) |
25067d81 | 203 | |
17e7093e CAW |
204 | |
205 | def test_basic_storage_delete_file(): | |
d024806a CAW |
206 | tmpdir, this_storage = get_tmp_filestorage() |
207 | ||
208 | assert not os.path.exists( | |
209 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
210 | ||
211 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
212 | with this_storage.get_file(filepath, 'w') as our_file: | |
213 | our_file.write('Testing this file') | |
214 | ||
215 | assert os.path.exists( | |
216 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
217 | ||
fd1202b7 | 218 | assert this_storage.delete_dir(['dir1', 'dir2']) == False |
d024806a | 219 | this_storage.delete_file(filepath) |
fd1202b7 | 220 | assert this_storage.delete_dir(['dir1', 'dir2']) == True |
d024806a CAW |
221 | |
222 | assert not os.path.exists( | |
223 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
17e7093e | 224 | |
fd1202b7 E |
225 | cleanup_storage(this_storage, tmpdir, ['dir1']) |
226 | ||
17e7093e CAW |
227 | |
228 | def test_basic_storage_url_for_file(): | |
01da9e6a CAW |
229 | # Not supplying a base_url should actually just bork. |
230 | tmpdir, this_storage = get_tmp_filestorage() | |
7d503a89 | 231 | pytest.raises( |
01da9e6a CAW |
232 | storage.NoWebServing, |
233 | this_storage.file_url, | |
234 | ['dir1', 'dir2', 'filename.txt']) | |
fd1202b7 | 235 | cleanup_storage(this_storage, tmpdir) |
01da9e6a CAW |
236 | |
237 | # base_url without domain | |
238 | tmpdir, this_storage = get_tmp_filestorage('/media/') | |
239 | result = this_storage.file_url( | |
240 | ['dir1', 'dir2', 'filename.txt']) | |
241 | expected = '/media/dir1/dir2/filename.txt' | |
242 | assert result == expected | |
fd1202b7 | 243 | cleanup_storage(this_storage, tmpdir) |
01da9e6a CAW |
244 | |
245 | # base_url with domain | |
246 | tmpdir, this_storage = get_tmp_filestorage( | |
247 | 'http://media.example.org/ourmedia/') | |
248 | result = this_storage.file_url( | |
249 | ['dir1', 'dir2', 'filename.txt']) | |
250 | expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' | |
251 | assert result == expected | |
fd1202b7 | 252 | cleanup_storage(this_storage, tmpdir) |
3a89c23e CAW |
253 | |
254 | ||
255 | def test_basic_storage_get_local_path(): | |
256 | tmpdir, this_storage = get_tmp_filestorage() | |
257 | ||
258 | result = this_storage.get_local_path( | |
259 | ['dir1', 'dir2', 'filename.txt']) | |
260 | ||
261 | expected = os.path.join( | |
262 | tmpdir, 'dir1/dir2/filename.txt') | |
263 | ||
264 | assert result == expected | |
265 | ||
fd1202b7 | 266 | cleanup_storage(this_storage, tmpdir) |
25067d81 | 267 | |
3a89c23e CAW |
268 | |
269 | def test_basic_storage_is_local(): | |
270 | tmpdir, this_storage = get_tmp_filestorage() | |
271 | assert this_storage.local_storage is True | |
fd1202b7 | 272 | cleanup_storage(this_storage, tmpdir) |
6a07362d CAW |
273 | |
274 | ||
275 | def test_basic_storage_copy_locally(): | |
276 | tmpdir, this_storage = get_tmp_filestorage() | |
277 | ||
278 | dest_tmpdir = tempfile.mkdtemp() | |
279 | ||
280 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
281 | with this_storage.get_file(filepath, 'w') as our_file: | |
282 | our_file.write('Testing this file') | |
283 | ||
284 | new_file_dest = os.path.join(dest_tmpdir, 'file2.txt') | |
285 | ||
286 | this_storage.copy_locally(filepath, new_file_dest) | |
25067d81 | 287 | this_storage.delete_file(filepath) |
6a07362d CAW |
288 | |
289 | assert file(new_file_dest).read() == 'Testing this file' | |
e56e5f8c | 290 | |
25067d81 E |
291 | os.remove(new_file_dest) |
292 | os.rmdir(dest_tmpdir) | |
fd1202b7 | 293 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 | 294 | |
e56e5f8c CAW |
295 | |
296 | def _test_copy_local_to_storage_works(tmpdir, this_storage): | |
297 | local_filename = tempfile.mktemp() | |
298 | with file(local_filename, 'w') as tmpfile: | |
299 | tmpfile.write('haha') | |
300 | ||
301 | this_storage.copy_local_to_storage( | |
302 | local_filename, ['dir1', 'dir2', 'copiedto.txt']) | |
303 | ||
25067d81 E |
304 | os.remove(local_filename) |
305 | ||
e56e5f8c CAW |
306 | assert file( |
307 | os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'), | |
308 | 'r').read() == 'haha' | |
309 | ||
25067d81 | 310 | this_storage.delete_file(['dir1', 'dir2', 'copiedto.txt']) |
fd1202b7 | 311 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 | 312 | |
e56e5f8c CAW |
313 | |
314 | def test_basic_storage_copy_local_to_storage(): | |
315 | tmpdir, this_storage = get_tmp_filestorage() | |
316 | _test_copy_local_to_storage_works(tmpdir, this_storage) | |
317 | ||
318 | ||
319 | def test_general_storage_copy_local_to_storage(): | |
320 | tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) | |
321 | _test_copy_local_to_storage_works(tmpdir, this_storage) |