Commit | Line | Data |
---|---|---|
8e1e744d | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
a6b378ef CAW |
2 | # Copyright (C) 2011 Free Software Foundation, Inc |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | ||
17e7093e CAW |
18 | import os |
19 | import tempfile | |
20 | ||
21 | from nose.tools import assert_raises | |
20e3ee11 | 22 | from werkzeug.utils import secure_filename |
17e7093e | 23 | |
a6b378ef CAW |
24 | from mediagoblin import storage |
25 | ||
26 | ||
ffa22935 CAW |
27 | ################ |
28 | # Test utilities | |
29 | ################ | |
30 | ||
a6b378ef CAW |
31 | def test_clean_listy_filepath(): |
32 | expected = [u'dir1', u'dir2', u'linooks.jpg'] | |
33 | assert storage.clean_listy_filepath( | |
34 | ['dir1', 'dir2', 'linooks.jpg']) == expected | |
35 | ||
36 | expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg'] | |
37 | assert storage.clean_listy_filepath( | |
38 | ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected | |
39 | ||
40 | expected = [u'etc', u'passwd'] | |
41 | assert storage.clean_listy_filepath( | |
42 | ['../../../etc/', 'passwd']) == expected | |
770c12be | 43 | |
17e7093e CAW |
44 | assert_raises( |
45 | storage.InvalidFilepath, | |
46 | storage.clean_listy_filepath, | |
47 | ['../../', 'linooks.jpg']) | |
48 | ||
49 | ||
ffa22935 CAW |
50 | class FakeStorageSystem(): |
51 | def __init__(self, foobie, blech, **kwargs): | |
52 | self.foobie = foobie | |
53 | self.blech = blech | |
54 | ||
d91b5a7c CAW |
55 | class FakeRemoteStorage(storage.BasicFileStorage): |
56 | # Theoretically despite this, all the methods should work but it | |
57 | # should force copying to the workbench | |
58 | local_storage = False | |
59 | ||
ffa22935 | 60 | |
3c7d11ff CAW |
61 | def test_storage_system_from_config(): |
62 | this_storage = storage.storage_system_from_config( | |
ffa22935 CAW |
63 | {'somestorage_base_url': 'http://example.org/moodia/', |
64 | 'somestorage_base_dir': '/tmp/', | |
65 | 'somestorage_garbage_arg': 'garbage_arg', | |
66 | 'garbage_arg': 'trash'}, | |
67 | 'somestorage') | |
68 | assert this_storage.base_url == 'http://example.org/moodia/' | |
69 | assert this_storage.base_dir == '/tmp/' | |
70 | assert this_storage.__class__ is storage.BasicFileStorage | |
71 | ||
3c7d11ff | 72 | this_storage = storage.storage_system_from_config( |
ffa22935 CAW |
73 | {'somestorage_foobie': 'eiboof', |
74 | 'somestorage_blech': 'hcelb', | |
75 | 'somestorage_garbage_arg': 'garbage_arg', | |
76 | 'garbage_arg': 'trash', | |
77 | 'somestorage_storage_class': | |
78 | 'mediagoblin.tests.test_storage:FakeStorageSystem'}, | |
79 | 'somestorage') | |
80 | assert this_storage.foobie == 'eiboof' | |
81 | assert this_storage.blech == 'hcelb' | |
82 | assert this_storage.__class__ is FakeStorageSystem | |
83 | ||
84 | ||
17e7093e CAW |
85 | ########################## |
86 | # Basic file storage tests | |
87 | ########################## | |
88 | ||
d91b5a7c | 89 | def get_tmp_filestorage(mount_url=None, fake_remote=False): |
17e7093e | 90 | tmpdir = tempfile.mkdtemp() |
d91b5a7c CAW |
91 | if fake_remote: |
92 | this_storage = FakeRemoteStorage(tmpdir, mount_url) | |
93 | else: | |
94 | this_storage = storage.BasicFileStorage(tmpdir, mount_url) | |
17e7093e CAW |
95 | return tmpdir, this_storage |
96 | ||
97 | ||
98 | def test_basic_storage__resolve_filepath(): | |
99 | tmpdir, this_storage = get_tmp_filestorage() | |
100 | ||
101 | result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg']) | |
102 | assert result == os.path.join( | |
103 | tmpdir, 'dir1/dir2/filename.jpg') | |
104 | ||
105 | result = this_storage._resolve_filepath(['../../etc/', 'passwd']) | |
106 | assert result == os.path.join( | |
107 | tmpdir, 'etc/passwd') | |
108 | ||
109 | assert_raises( | |
110 | storage.InvalidFilepath, | |
111 | this_storage._resolve_filepath, | |
112 | ['../../', 'etc', 'passwd']) | |
113 | ||
114 | ||
115 | def test_basic_storage_file_exists(): | |
92fb87ae CAW |
116 | tmpdir, this_storage = get_tmp_filestorage() |
117 | ||
118 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
119 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
120 | with open(filename, 'w') as ourfile: | |
121 | ourfile.write("I'm having a lovely day!") | |
122 | ||
123 | assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt']) | |
124 | assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) | |
125 | assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) | |
126 | ||
127 | ||
20e3ee11 CAW |
128 | def test_basic_storage_get_unique_filepath(): |
129 | tmpdir, this_storage = get_tmp_filestorage() | |
130 | ||
131 | # write something that exists | |
132 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
133 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
134 | with open(filename, 'w') as ourfile: | |
135 | ourfile.write("I'm having a lovely day!") | |
136 | ||
137 | # now we want something new, with the same name! | |
138 | new_filepath = this_storage.get_unique_filepath( | |
139 | ['dir1', 'dir2', 'filename.txt']) | |
140 | assert new_filepath[:-1] == [u'dir1', u'dir2'] | |
141 | ||
142 | new_filename = new_filepath[-1] | |
143 | assert new_filename.endswith('filename.txt') | |
144 | assert len(new_filename) > len('filename.txt') | |
145 | assert new_filename == secure_filename(new_filename) | |
17e7093e CAW |
146 | |
147 | ||
148 | def test_basic_storage_get_file(): | |
d2be0838 CAW |
149 | tmpdir, this_storage = get_tmp_filestorage() |
150 | ||
151 | # Write a brand new file | |
152 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
153 | ||
154 | with this_storage.get_file(filepath, 'w') as our_file: | |
155 | our_file.write('First file') | |
156 | with this_storage.get_file(filepath, 'r') as our_file: | |
157 | assert our_file.read() == 'First file' | |
158 | assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
159 | with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: | |
160 | assert our_file.read() == 'First file' | |
161 | ||
162 | # Write to the same path but try to get a unique file. | |
163 | new_filepath = this_storage.get_unique_filepath(filepath) | |
164 | assert not os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
165 | ||
166 | with this_storage.get_file(new_filepath, 'w') as our_file: | |
167 | our_file.write('Second file') | |
168 | with this_storage.get_file(new_filepath, 'r') as our_file: | |
169 | assert our_file.read() == 'Second file' | |
170 | assert os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
171 | with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file: | |
172 | assert our_file.read() == 'Second file' | |
173 | ||
174 | # Read from an existing file | |
175 | manually_written_file = os.makedirs( | |
176 | os.path.join(tmpdir, 'testydir')) | |
177 | with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: | |
178 | testyfile.write('testy file! so testy.') | |
179 | ||
180 | with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: | |
181 | assert testyfile.read() == 'testy file! so testy.' | |
17e7093e CAW |
182 | |
183 | ||
184 | def test_basic_storage_delete_file(): | |
d024806a CAW |
185 | tmpdir, this_storage = get_tmp_filestorage() |
186 | ||
187 | assert not os.path.exists( | |
188 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
189 | ||
190 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
191 | with this_storage.get_file(filepath, 'w') as our_file: | |
192 | our_file.write('Testing this file') | |
193 | ||
194 | assert os.path.exists( | |
195 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
196 | ||
197 | this_storage.delete_file(filepath) | |
198 | ||
199 | assert not os.path.exists( | |
200 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
17e7093e CAW |
201 | |
202 | ||
203 | def test_basic_storage_url_for_file(): | |
01da9e6a CAW |
204 | # Not supplying a base_url should actually just bork. |
205 | tmpdir, this_storage = get_tmp_filestorage() | |
206 | assert_raises( | |
207 | storage.NoWebServing, | |
208 | this_storage.file_url, | |
209 | ['dir1', 'dir2', 'filename.txt']) | |
210 | ||
211 | # base_url without domain | |
212 | tmpdir, this_storage = get_tmp_filestorage('/media/') | |
213 | result = this_storage.file_url( | |
214 | ['dir1', 'dir2', 'filename.txt']) | |
215 | expected = '/media/dir1/dir2/filename.txt' | |
216 | assert result == expected | |
217 | ||
218 | # base_url with domain | |
219 | tmpdir, this_storage = get_tmp_filestorage( | |
220 | 'http://media.example.org/ourmedia/') | |
221 | result = this_storage.file_url( | |
222 | ['dir1', 'dir2', 'filename.txt']) | |
223 | expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' | |
224 | assert result == expected | |
3a89c23e CAW |
225 | |
226 | ||
227 | def test_basic_storage_get_local_path(): | |
228 | tmpdir, this_storage = get_tmp_filestorage() | |
229 | ||
230 | result = this_storage.get_local_path( | |
231 | ['dir1', 'dir2', 'filename.txt']) | |
232 | ||
233 | expected = os.path.join( | |
234 | tmpdir, 'dir1/dir2/filename.txt') | |
235 | ||
236 | assert result == expected | |
237 | ||
238 | ||
239 | def test_basic_storage_is_local(): | |
240 | tmpdir, this_storage = get_tmp_filestorage() | |
241 | assert this_storage.local_storage is True | |
6a07362d CAW |
242 | |
243 | ||
244 | def test_basic_storage_copy_locally(): | |
245 | tmpdir, this_storage = get_tmp_filestorage() | |
246 | ||
247 | dest_tmpdir = tempfile.mkdtemp() | |
248 | ||
249 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
250 | with this_storage.get_file(filepath, 'w') as our_file: | |
251 | our_file.write('Testing this file') | |
252 | ||
253 | new_file_dest = os.path.join(dest_tmpdir, 'file2.txt') | |
254 | ||
255 | this_storage.copy_locally(filepath, new_file_dest) | |
256 | ||
257 | assert file(new_file_dest).read() == 'Testing this file' |