Commit | Line | Data |
---|---|---|
8e1e744d | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
cf29e8a8 | 2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
a6b378ef CAW |
3 | # |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | ||
17e7093e CAW |
18 | import os |
19 | import tempfile | |
20 | ||
21 | from nose.tools import assert_raises | |
20e3ee11 | 22 | from werkzeug.utils import secure_filename |
17e7093e | 23 | |
a6b378ef CAW |
24 | from mediagoblin import storage |
25 | ||
26 | ||
ffa22935 CAW |
27 | ################ |
28 | # Test utilities | |
29 | ################ | |
30 | ||
a6b378ef CAW |
31 | def test_clean_listy_filepath(): |
32 | expected = [u'dir1', u'dir2', u'linooks.jpg'] | |
33 | assert storage.clean_listy_filepath( | |
34 | ['dir1', 'dir2', 'linooks.jpg']) == expected | |
35 | ||
36 | expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg'] | |
37 | assert storage.clean_listy_filepath( | |
38 | ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected | |
39 | ||
40 | expected = [u'etc', u'passwd'] | |
41 | assert storage.clean_listy_filepath( | |
42 | ['../../../etc/', 'passwd']) == expected | |
770c12be | 43 | |
17e7093e CAW |
44 | assert_raises( |
45 | storage.InvalidFilepath, | |
46 | storage.clean_listy_filepath, | |
47 | ['../../', 'linooks.jpg']) | |
48 | ||
49 | ||
ffa22935 CAW |
50 | class FakeStorageSystem(): |
51 | def __init__(self, foobie, blech, **kwargs): | |
52 | self.foobie = foobie | |
53 | self.blech = blech | |
54 | ||
a2468d18 | 55 | class FakeRemoteStorage(storage.filestorage.BasicFileStorage): |
d91b5a7c CAW |
56 | # Theoretically despite this, all the methods should work but it |
57 | # should force copying to the workbench | |
58 | local_storage = False | |
59 | ||
e56e5f8c CAW |
60 | def copy_local_to_storage(self, *args, **kwargs): |
61 | return storage.StorageInterface.copy_local_to_storage( | |
62 | self, *args, **kwargs) | |
63 | ||
ffa22935 | 64 | |
3c7d11ff CAW |
65 | def test_storage_system_from_config(): |
66 | this_storage = storage.storage_system_from_config( | |
63c9a0c7 CAW |
67 | {'base_url': 'http://example.org/moodia/', |
68 | 'base_dir': '/tmp/', | |
69 | 'garbage_arg': 'garbage_arg', | |
70 | 'garbage_arg': 'trash'}) | |
ffa22935 CAW |
71 | assert this_storage.base_url == 'http://example.org/moodia/' |
72 | assert this_storage.base_dir == '/tmp/' | |
a2468d18 | 73 | assert this_storage.__class__ is storage.filestorage.BasicFileStorage |
ffa22935 | 74 | |
3c7d11ff | 75 | this_storage = storage.storage_system_from_config( |
63c9a0c7 CAW |
76 | {'foobie': 'eiboof', |
77 | 'blech': 'hcelb', | |
78 | 'garbage_arg': 'garbage_arg', | |
79 | 'storage_class': | |
80 | 'mediagoblin.tests.test_storage:FakeStorageSystem'}) | |
ffa22935 CAW |
81 | assert this_storage.foobie == 'eiboof' |
82 | assert this_storage.blech == 'hcelb' | |
83 | assert this_storage.__class__ is FakeStorageSystem | |
84 | ||
85 | ||
17e7093e CAW |
86 | ########################## |
87 | # Basic file storage tests | |
88 | ########################## | |
89 | ||
d91b5a7c | 90 | def get_tmp_filestorage(mount_url=None, fake_remote=False): |
17e7093e | 91 | tmpdir = tempfile.mkdtemp() |
d91b5a7c CAW |
92 | if fake_remote: |
93 | this_storage = FakeRemoteStorage(tmpdir, mount_url) | |
94 | else: | |
a2468d18 | 95 | this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url) |
17e7093e CAW |
96 | return tmpdir, this_storage |
97 | ||
98 | ||
99 | def test_basic_storage__resolve_filepath(): | |
100 | tmpdir, this_storage = get_tmp_filestorage() | |
101 | ||
102 | result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg']) | |
103 | assert result == os.path.join( | |
104 | tmpdir, 'dir1/dir2/filename.jpg') | |
105 | ||
106 | result = this_storage._resolve_filepath(['../../etc/', 'passwd']) | |
107 | assert result == os.path.join( | |
108 | tmpdir, 'etc/passwd') | |
109 | ||
110 | assert_raises( | |
111 | storage.InvalidFilepath, | |
112 | this_storage._resolve_filepath, | |
113 | ['../../', 'etc', 'passwd']) | |
114 | ||
115 | ||
116 | def test_basic_storage_file_exists(): | |
92fb87ae CAW |
117 | tmpdir, this_storage = get_tmp_filestorage() |
118 | ||
119 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
120 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
121 | with open(filename, 'w') as ourfile: | |
122 | ourfile.write("I'm having a lovely day!") | |
123 | ||
124 | assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt']) | |
125 | assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) | |
126 | assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) | |
127 | ||
128 | ||
20e3ee11 CAW |
129 | def test_basic_storage_get_unique_filepath(): |
130 | tmpdir, this_storage = get_tmp_filestorage() | |
131 | ||
132 | # write something that exists | |
133 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
134 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
135 | with open(filename, 'w') as ourfile: | |
136 | ourfile.write("I'm having a lovely day!") | |
137 | ||
138 | # now we want something new, with the same name! | |
139 | new_filepath = this_storage.get_unique_filepath( | |
140 | ['dir1', 'dir2', 'filename.txt']) | |
141 | assert new_filepath[:-1] == [u'dir1', u'dir2'] | |
142 | ||
143 | new_filename = new_filepath[-1] | |
144 | assert new_filename.endswith('filename.txt') | |
145 | assert len(new_filename) > len('filename.txt') | |
146 | assert new_filename == secure_filename(new_filename) | |
17e7093e CAW |
147 | |
148 | ||
149 | def test_basic_storage_get_file(): | |
d2be0838 CAW |
150 | tmpdir, this_storage = get_tmp_filestorage() |
151 | ||
152 | # Write a brand new file | |
153 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
154 | ||
155 | with this_storage.get_file(filepath, 'w') as our_file: | |
156 | our_file.write('First file') | |
157 | with this_storage.get_file(filepath, 'r') as our_file: | |
158 | assert our_file.read() == 'First file' | |
159 | assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
160 | with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: | |
161 | assert our_file.read() == 'First file' | |
162 | ||
163 | # Write to the same path but try to get a unique file. | |
164 | new_filepath = this_storage.get_unique_filepath(filepath) | |
165 | assert not os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
166 | ||
167 | with this_storage.get_file(new_filepath, 'w') as our_file: | |
168 | our_file.write('Second file') | |
169 | with this_storage.get_file(new_filepath, 'r') as our_file: | |
170 | assert our_file.read() == 'Second file' | |
171 | assert os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
172 | with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file: | |
173 | assert our_file.read() == 'Second file' | |
174 | ||
175 | # Read from an existing file | |
176 | manually_written_file = os.makedirs( | |
177 | os.path.join(tmpdir, 'testydir')) | |
178 | with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: | |
179 | testyfile.write('testy file! so testy.') | |
180 | ||
181 | with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: | |
182 | assert testyfile.read() == 'testy file! so testy.' | |
17e7093e CAW |
183 | |
184 | ||
185 | def test_basic_storage_delete_file(): | |
d024806a CAW |
186 | tmpdir, this_storage = get_tmp_filestorage() |
187 | ||
188 | assert not os.path.exists( | |
189 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
190 | ||
191 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
192 | with this_storage.get_file(filepath, 'w') as our_file: | |
193 | our_file.write('Testing this file') | |
194 | ||
195 | assert os.path.exists( | |
196 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
197 | ||
198 | this_storage.delete_file(filepath) | |
199 | ||
200 | assert not os.path.exists( | |
201 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
17e7093e CAW |
202 | |
203 | ||
204 | def test_basic_storage_url_for_file(): | |
01da9e6a CAW |
205 | # Not supplying a base_url should actually just bork. |
206 | tmpdir, this_storage = get_tmp_filestorage() | |
207 | assert_raises( | |
208 | storage.NoWebServing, | |
209 | this_storage.file_url, | |
210 | ['dir1', 'dir2', 'filename.txt']) | |
211 | ||
212 | # base_url without domain | |
213 | tmpdir, this_storage = get_tmp_filestorage('/media/') | |
214 | result = this_storage.file_url( | |
215 | ['dir1', 'dir2', 'filename.txt']) | |
216 | expected = '/media/dir1/dir2/filename.txt' | |
217 | assert result == expected | |
218 | ||
219 | # base_url with domain | |
220 | tmpdir, this_storage = get_tmp_filestorage( | |
221 | 'http://media.example.org/ourmedia/') | |
222 | result = this_storage.file_url( | |
223 | ['dir1', 'dir2', 'filename.txt']) | |
224 | expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' | |
225 | assert result == expected | |
3a89c23e CAW |
226 | |
227 | ||
228 | def test_basic_storage_get_local_path(): | |
229 | tmpdir, this_storage = get_tmp_filestorage() | |
230 | ||
231 | result = this_storage.get_local_path( | |
232 | ['dir1', 'dir2', 'filename.txt']) | |
233 | ||
234 | expected = os.path.join( | |
235 | tmpdir, 'dir1/dir2/filename.txt') | |
236 | ||
237 | assert result == expected | |
238 | ||
239 | ||
240 | def test_basic_storage_is_local(): | |
241 | tmpdir, this_storage = get_tmp_filestorage() | |
242 | assert this_storage.local_storage is True | |
6a07362d CAW |
243 | |
244 | ||
245 | def test_basic_storage_copy_locally(): | |
246 | tmpdir, this_storage = get_tmp_filestorage() | |
247 | ||
248 | dest_tmpdir = tempfile.mkdtemp() | |
249 | ||
250 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
251 | with this_storage.get_file(filepath, 'w') as our_file: | |
252 | our_file.write('Testing this file') | |
253 | ||
254 | new_file_dest = os.path.join(dest_tmpdir, 'file2.txt') | |
255 | ||
256 | this_storage.copy_locally(filepath, new_file_dest) | |
257 | ||
258 | assert file(new_file_dest).read() == 'Testing this file' | |
e56e5f8c CAW |
259 | |
260 | ||
261 | def _test_copy_local_to_storage_works(tmpdir, this_storage): | |
262 | local_filename = tempfile.mktemp() | |
263 | with file(local_filename, 'w') as tmpfile: | |
264 | tmpfile.write('haha') | |
265 | ||
266 | this_storage.copy_local_to_storage( | |
267 | local_filename, ['dir1', 'dir2', 'copiedto.txt']) | |
268 | ||
269 | assert file( | |
270 | os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'), | |
271 | 'r').read() == 'haha' | |
272 | ||
273 | ||
274 | def test_basic_storage_copy_local_to_storage(): | |
275 | tmpdir, this_storage = get_tmp_filestorage() | |
276 | _test_copy_local_to_storage_works(tmpdir, this_storage) | |
277 | ||
278 | ||
279 | def test_general_storage_copy_local_to_storage(): | |
280 | tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) | |
281 | _test_copy_local_to_storage_works(tmpdir, this_storage) |