e5bc00deefe4f56834b301587d5bdbfea124f3a7
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from werkzeug
.utils
import secure_filename
24 from mediagoblin
import storage
25 from mediagoblin
.tools
.testing
import _activate_testing
34 def test_clean_listy_filepath():
35 expected
= [u
'dir1', u
'dir2', u
'linooks.jpg']
36 assert storage
.clean_listy_filepath(
37 ['dir1', 'dir2', 'linooks.jpg']) == expected
39 expected
= [u
'dir1', u
'foo_.._nasty', u
'linooks.jpg']
40 assert storage
.clean_listy_filepath(
41 ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
43 expected
= [u
'etc', u
'passwd']
44 assert storage
.clean_listy_filepath(
45 ['../../../etc/', 'passwd']) == expected
47 with pytest
.raises(storage
.InvalidFilepath
):
48 storage
.clean_listy_filepath(['../../', 'linooks.jpg'])
51 class FakeStorageSystem():
52 def __init__(self
, foobie
, blech
, **kwargs
):
56 class FakeRemoteStorage(storage
.filestorage
.BasicFileStorage
):
57 # Theoretically despite this, all the methods should work but it
58 # should force copying to the workbench
61 def copy_local_to_storage(self
, *args
, **kwargs
):
62 return storage
.StorageInterface
.copy_local_to_storage(
63 self
, *args
, **kwargs
)
66 def test_storage_system_from_config():
67 this_storage
= storage
.storage_system_from_config(
68 {'base_url': 'http://example.org/moodia/',
70 'garbage_arg': 'garbage_arg',
71 'garbage_arg': 'trash'})
72 assert this_storage
.base_url
== 'http://example.org/moodia/'
73 assert this_storage
.base_dir
== '/tmp/'
74 assert this_storage
.__class
__ is storage
.filestorage
.BasicFileStorage
76 this_storage
= storage
.storage_system_from_config(
79 'garbage_arg': 'garbage_arg',
81 'mediagoblin.tests.test_storage:FakeStorageSystem'})
82 assert this_storage
.foobie
== 'eiboof'
83 assert this_storage
.blech
== 'hcelb'
84 assert unicode(this_storage
.__class
__) == \
85 u
'mediagoblin.tests.test_storage.FakeStorageSystem'
88 ##########################
89 # Basic file storage tests
90 ##########################
92 def get_tmp_filestorage(mount_url
=None, fake_remote
=False):
93 tmpdir
= tempfile
.mkdtemp(prefix
="test_gmg_storage")
95 this_storage
= FakeRemoteStorage(tmpdir
, mount_url
)
97 this_storage
= storage
.filestorage
.BasicFileStorage(tmpdir
, mount_url
)
98 return tmpdir
, this_storage
101 def cleanup_storage(this_storage
, tmpdir
, *paths
):
104 assert this_storage
.delete_dir(p
) == True
109 def test_basic_storage__resolve_filepath():
110 tmpdir
, this_storage
= get_tmp_filestorage()
112 result
= this_storage
._resolve
_filepath
(['dir1', 'dir2', 'filename.jpg'])
113 assert result
== os
.path
.join(
114 tmpdir
, 'dir1/dir2/filename.jpg')
116 result
= this_storage
._resolve
_filepath
(['../../etc/', 'passwd'])
117 assert result
== os
.path
.join(
118 tmpdir
, 'etc/passwd')
121 storage
.InvalidFilepath
,
122 this_storage
._resolve
_filepath
,
123 ['../../', 'etc', 'passwd'])
125 cleanup_storage(this_storage
, tmpdir
)
128 def test_basic_storage_file_exists():
129 tmpdir
, this_storage
= get_tmp_filestorage()
131 os
.makedirs(os
.path
.join(tmpdir
, 'dir1', 'dir2'))
132 filename
= os
.path
.join(tmpdir
, 'dir1', 'dir2', 'filename.txt')
133 with
open(filename
, 'w') as ourfile
:
134 ourfile
.write("I'm having a lovely day!")
136 assert this_storage
.file_exists(['dir1', 'dir2', 'filename.txt'])
137 assert not this_storage
.file_exists(['dir1', 'dir2', 'thisfile.lol'])
138 assert not this_storage
.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
140 this_storage
.delete_file(['dir1', 'dir2', 'filename.txt'])
141 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'])
144 def test_basic_storage_get_unique_filepath():
145 tmpdir
, this_storage
= get_tmp_filestorage()
147 # write something that exists
148 os
.makedirs(os
.path
.join(tmpdir
, 'dir1', 'dir2'))
149 filename
= os
.path
.join(tmpdir
, 'dir1', 'dir2', 'filename.txt')
150 with
open(filename
, 'w') as ourfile
:
151 ourfile
.write("I'm having a lovely day!")
153 # now we want something new, with the same name!
154 new_filepath
= this_storage
.get_unique_filepath(
155 ['dir1', 'dir2', 'filename.txt'])
156 assert new_filepath
[:-1] == [u
'dir1', u
'dir2']
158 new_filename
= new_filepath
[-1]
159 assert new_filename
.endswith('filename.txt')
160 assert len(new_filename
) > len('filename.txt')
161 assert new_filename
== secure_filename(new_filename
)
164 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'])
167 def test_basic_storage_get_file():
168 tmpdir
, this_storage
= get_tmp_filestorage()
170 # Write a brand new file
171 filepath
= ['dir1', 'dir2', 'ourfile.txt']
173 with this_storage
.get_file(filepath
, 'w') as our_file
:
174 our_file
.write('First file')
175 with this_storage
.get_file(filepath
, 'r') as our_file
:
176 assert our_file
.read() == 'First file'
177 assert os
.path
.exists(os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'))
178 with
file(os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'), 'r') as our_file
:
179 assert our_file
.read() == 'First file'
181 # Write to the same path but try to get a unique file.
182 new_filepath
= this_storage
.get_unique_filepath(filepath
)
183 assert not os
.path
.exists(os
.path
.join(tmpdir
, *new_filepath
))
185 with this_storage
.get_file(new_filepath
, 'w') as our_file
:
186 our_file
.write('Second file')
187 with this_storage
.get_file(new_filepath
, 'r') as our_file
:
188 assert our_file
.read() == 'Second file'
189 assert os
.path
.exists(os
.path
.join(tmpdir
, *new_filepath
))
190 with
file(os
.path
.join(tmpdir
, *new_filepath
), 'r') as our_file
:
191 assert our_file
.read() == 'Second file'
193 # Read from an existing file
194 manually_written_file
= os
.makedirs(
195 os
.path
.join(tmpdir
, 'testydir'))
196 with
file(os
.path
.join(tmpdir
, 'testydir/testyfile.txt'), 'w') as testyfile
:
197 testyfile
.write('testy file! so testy.')
199 with this_storage
.get_file(['testydir', 'testyfile.txt']) as testyfile
:
200 assert testyfile
.read() == 'testy file! so testy.'
202 this_storage
.delete_file(filepath
)
203 this_storage
.delete_file(new_filepath
)
204 this_storage
.delete_file(['testydir', 'testyfile.txt'])
205 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'], ['testydir'])
208 def test_basic_storage_delete_file():
209 tmpdir
, this_storage
= get_tmp_filestorage()
211 assert not os
.path
.exists(
212 os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'))
214 filepath
= ['dir1', 'dir2', 'ourfile.txt']
215 with this_storage
.get_file(filepath
, 'w') as our_file
:
216 our_file
.write('Testing this file')
218 assert os
.path
.exists(
219 os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'))
221 assert this_storage
.delete_dir(['dir1', 'dir2']) == False
222 this_storage
.delete_file(filepath
)
223 assert this_storage
.delete_dir(['dir1', 'dir2']) == True
225 assert not os
.path
.exists(
226 os
.path
.join(tmpdir
, 'dir1/dir2/ourfile.txt'))
228 cleanup_storage(this_storage
, tmpdir
, ['dir1'])
231 def test_basic_storage_url_for_file():
232 # Not supplying a base_url should actually just bork.
233 tmpdir
, this_storage
= get_tmp_filestorage()
235 storage
.NoWebServing
,
236 this_storage
.file_url
,
237 ['dir1', 'dir2', 'filename.txt'])
238 cleanup_storage(this_storage
, tmpdir
)
240 # base_url without domain
241 tmpdir
, this_storage
= get_tmp_filestorage('/media/')
242 result
= this_storage
.file_url(
243 ['dir1', 'dir2', 'filename.txt'])
244 expected
= '/media/dir1/dir2/filename.txt'
245 assert result
== expected
246 cleanup_storage(this_storage
, tmpdir
)
248 # base_url with domain
249 tmpdir
, this_storage
= get_tmp_filestorage(
250 'http://media.example.org/ourmedia/')
251 result
= this_storage
.file_url(
252 ['dir1', 'dir2', 'filename.txt'])
253 expected
= 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
254 assert result
== expected
255 cleanup_storage(this_storage
, tmpdir
)
258 def test_basic_storage_get_local_path():
259 tmpdir
, this_storage
= get_tmp_filestorage()
261 result
= this_storage
.get_local_path(
262 ['dir1', 'dir2', 'filename.txt'])
264 expected
= os
.path
.join(
265 tmpdir
, 'dir1/dir2/filename.txt')
267 assert result
== expected
269 cleanup_storage(this_storage
, tmpdir
)
272 def test_basic_storage_is_local():
273 tmpdir
, this_storage
= get_tmp_filestorage()
274 assert this_storage
.local_storage
is True
275 cleanup_storage(this_storage
, tmpdir
)
278 def test_basic_storage_copy_locally():
279 tmpdir
, this_storage
= get_tmp_filestorage()
281 dest_tmpdir
= tempfile
.mkdtemp()
283 filepath
= ['dir1', 'dir2', 'ourfile.txt']
284 with this_storage
.get_file(filepath
, 'w') as our_file
:
285 our_file
.write('Testing this file')
287 new_file_dest
= os
.path
.join(dest_tmpdir
, 'file2.txt')
289 this_storage
.copy_locally(filepath
, new_file_dest
)
290 this_storage
.delete_file(filepath
)
292 assert file(new_file_dest
).read() == 'Testing this file'
294 os
.remove(new_file_dest
)
295 os
.rmdir(dest_tmpdir
)
296 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'])
299 def _test_copy_local_to_storage_works(tmpdir
, this_storage
):
300 local_filename
= tempfile
.mktemp()
301 with
file(local_filename
, 'w') as tmpfile
:
302 tmpfile
.write('haha')
304 this_storage
.copy_local_to_storage(
305 local_filename
, ['dir1', 'dir2', 'copiedto.txt'])
307 os
.remove(local_filename
)
310 os
.path
.join(tmpdir
, 'dir1/dir2/copiedto.txt'),
311 'r').read() == 'haha'
313 this_storage
.delete_file(['dir1', 'dir2', 'copiedto.txt'])
314 cleanup_storage(this_storage
, tmpdir
, ['dir1', 'dir2'])
317 def test_basic_storage_copy_local_to_storage():
318 tmpdir
, this_storage
= get_tmp_filestorage()
319 _test_copy_local_to_storage_works(tmpdir
, this_storage
)
322 def test_general_storage_copy_local_to_storage():
323 tmpdir
, this_storage
= get_tmp_filestorage(fake_remote
=True)
324 _test_copy_local_to_storage_works(tmpdir
, this_storage
)