It's 2012 all up in here
[mediagoblin.git] / mediagoblin / tests / test_storage.py
CommitLineData
8e1e744d 1# GNU MediaGoblin -- federated, autonomous media hosting
cf29e8a8 2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
a6b378ef
CAW
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
17e7093e
CAW
18import os
19import tempfile
20
21from nose.tools import assert_raises
20e3ee11 22from werkzeug.utils import secure_filename
17e7093e 23
a6b378ef
CAW
24from mediagoblin import storage
25
26
ffa22935
CAW
27################
28# Test utilities
29################
30
a6b378ef
CAW
31def test_clean_listy_filepath():
32 expected = [u'dir1', u'dir2', u'linooks.jpg']
33 assert storage.clean_listy_filepath(
34 ['dir1', 'dir2', 'linooks.jpg']) == expected
35
36 expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg']
37 assert storage.clean_listy_filepath(
38 ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
39
40 expected = [u'etc', u'passwd']
41 assert storage.clean_listy_filepath(
42 ['../../../etc/', 'passwd']) == expected
770c12be 43
17e7093e
CAW
44 assert_raises(
45 storage.InvalidFilepath,
46 storage.clean_listy_filepath,
47 ['../../', 'linooks.jpg'])
48
49
ffa22935
CAW
50class FakeStorageSystem():
51 def __init__(self, foobie, blech, **kwargs):
52 self.foobie = foobie
53 self.blech = blech
54
a2468d18 55class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
d91b5a7c
CAW
56 # Theoretically despite this, all the methods should work but it
57 # should force copying to the workbench
58 local_storage = False
59
e56e5f8c
CAW
60 def copy_local_to_storage(self, *args, **kwargs):
61 return storage.StorageInterface.copy_local_to_storage(
62 self, *args, **kwargs)
63
ffa22935 64
3c7d11ff
CAW
65def test_storage_system_from_config():
66 this_storage = storage.storage_system_from_config(
63c9a0c7
CAW
67 {'base_url': 'http://example.org/moodia/',
68 'base_dir': '/tmp/',
69 'garbage_arg': 'garbage_arg',
70 'garbage_arg': 'trash'})
ffa22935
CAW
71 assert this_storage.base_url == 'http://example.org/moodia/'
72 assert this_storage.base_dir == '/tmp/'
a2468d18 73 assert this_storage.__class__ is storage.filestorage.BasicFileStorage
ffa22935 74
3c7d11ff 75 this_storage = storage.storage_system_from_config(
63c9a0c7
CAW
76 {'foobie': 'eiboof',
77 'blech': 'hcelb',
78 'garbage_arg': 'garbage_arg',
79 'storage_class':
80 'mediagoblin.tests.test_storage:FakeStorageSystem'})
ffa22935
CAW
81 assert this_storage.foobie == 'eiboof'
82 assert this_storage.blech == 'hcelb'
83 assert this_storage.__class__ is FakeStorageSystem
84
85
17e7093e
CAW
86##########################
87# Basic file storage tests
88##########################
89
d91b5a7c 90def get_tmp_filestorage(mount_url=None, fake_remote=False):
17e7093e 91 tmpdir = tempfile.mkdtemp()
d91b5a7c
CAW
92 if fake_remote:
93 this_storage = FakeRemoteStorage(tmpdir, mount_url)
94 else:
a2468d18 95 this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
17e7093e
CAW
96 return tmpdir, this_storage
97
98
99def test_basic_storage__resolve_filepath():
100 tmpdir, this_storage = get_tmp_filestorage()
101
102 result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
103 assert result == os.path.join(
104 tmpdir, 'dir1/dir2/filename.jpg')
105
106 result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
107 assert result == os.path.join(
108 tmpdir, 'etc/passwd')
109
110 assert_raises(
111 storage.InvalidFilepath,
112 this_storage._resolve_filepath,
113 ['../../', 'etc', 'passwd'])
114
115
116def test_basic_storage_file_exists():
92fb87ae
CAW
117 tmpdir, this_storage = get_tmp_filestorage()
118
119 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
120 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
121 with open(filename, 'w') as ourfile:
122 ourfile.write("I'm having a lovely day!")
123
124 assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
125 assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
126 assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
127
128
20e3ee11
CAW
129def test_basic_storage_get_unique_filepath():
130 tmpdir, this_storage = get_tmp_filestorage()
131
132 # write something that exists
133 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
134 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
135 with open(filename, 'w') as ourfile:
136 ourfile.write("I'm having a lovely day!")
137
138 # now we want something new, with the same name!
139 new_filepath = this_storage.get_unique_filepath(
140 ['dir1', 'dir2', 'filename.txt'])
141 assert new_filepath[:-1] == [u'dir1', u'dir2']
142
143 new_filename = new_filepath[-1]
144 assert new_filename.endswith('filename.txt')
145 assert len(new_filename) > len('filename.txt')
146 assert new_filename == secure_filename(new_filename)
17e7093e
CAW
147
148
149def test_basic_storage_get_file():
d2be0838
CAW
150 tmpdir, this_storage = get_tmp_filestorage()
151
152 # Write a brand new file
153 filepath = ['dir1', 'dir2', 'ourfile.txt']
154
155 with this_storage.get_file(filepath, 'w') as our_file:
156 our_file.write('First file')
157 with this_storage.get_file(filepath, 'r') as our_file:
158 assert our_file.read() == 'First file'
159 assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
160 with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
161 assert our_file.read() == 'First file'
162
163 # Write to the same path but try to get a unique file.
164 new_filepath = this_storage.get_unique_filepath(filepath)
165 assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
166
167 with this_storage.get_file(new_filepath, 'w') as our_file:
168 our_file.write('Second file')
169 with this_storage.get_file(new_filepath, 'r') as our_file:
170 assert our_file.read() == 'Second file'
171 assert os.path.exists(os.path.join(tmpdir, *new_filepath))
172 with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
173 assert our_file.read() == 'Second file'
174
175 # Read from an existing file
176 manually_written_file = os.makedirs(
177 os.path.join(tmpdir, 'testydir'))
178 with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
179 testyfile.write('testy file! so testy.')
180
181 with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
182 assert testyfile.read() == 'testy file! so testy.'
17e7093e
CAW
183
184
185def test_basic_storage_delete_file():
d024806a
CAW
186 tmpdir, this_storage = get_tmp_filestorage()
187
188 assert not os.path.exists(
189 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
190
191 filepath = ['dir1', 'dir2', 'ourfile.txt']
192 with this_storage.get_file(filepath, 'w') as our_file:
193 our_file.write('Testing this file')
194
195 assert os.path.exists(
196 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
197
198 this_storage.delete_file(filepath)
199
200 assert not os.path.exists(
201 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
17e7093e
CAW
202
203
204def test_basic_storage_url_for_file():
01da9e6a
CAW
205 # Not supplying a base_url should actually just bork.
206 tmpdir, this_storage = get_tmp_filestorage()
207 assert_raises(
208 storage.NoWebServing,
209 this_storage.file_url,
210 ['dir1', 'dir2', 'filename.txt'])
211
212 # base_url without domain
213 tmpdir, this_storage = get_tmp_filestorage('/media/')
214 result = this_storage.file_url(
215 ['dir1', 'dir2', 'filename.txt'])
216 expected = '/media/dir1/dir2/filename.txt'
217 assert result == expected
218
219 # base_url with domain
220 tmpdir, this_storage = get_tmp_filestorage(
221 'http://media.example.org/ourmedia/')
222 result = this_storage.file_url(
223 ['dir1', 'dir2', 'filename.txt'])
224 expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
225 assert result == expected
3a89c23e
CAW
226
227
228def test_basic_storage_get_local_path():
229 tmpdir, this_storage = get_tmp_filestorage()
230
231 result = this_storage.get_local_path(
232 ['dir1', 'dir2', 'filename.txt'])
233
234 expected = os.path.join(
235 tmpdir, 'dir1/dir2/filename.txt')
236
237 assert result == expected
238
239
240def test_basic_storage_is_local():
241 tmpdir, this_storage = get_tmp_filestorage()
242 assert this_storage.local_storage is True
6a07362d
CAW
243
244
245def test_basic_storage_copy_locally():
246 tmpdir, this_storage = get_tmp_filestorage()
247
248 dest_tmpdir = tempfile.mkdtemp()
249
250 filepath = ['dir1', 'dir2', 'ourfile.txt']
251 with this_storage.get_file(filepath, 'w') as our_file:
252 our_file.write('Testing this file')
253
254 new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
255
256 this_storage.copy_locally(filepath, new_file_dest)
257
258 assert file(new_file_dest).read() == 'Testing this file'
e56e5f8c
CAW
259
260
261def _test_copy_local_to_storage_works(tmpdir, this_storage):
262 local_filename = tempfile.mktemp()
263 with file(local_filename, 'w') as tmpfile:
264 tmpfile.write('haha')
265
266 this_storage.copy_local_to_storage(
267 local_filename, ['dir1', 'dir2', 'copiedto.txt'])
268
269 assert file(
270 os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
271 'r').read() == 'haha'
272
273
274def test_basic_storage_copy_local_to_storage():
275 tmpdir, this_storage = get_tmp_filestorage()
276 _test_copy_local_to_storage_works(tmpdir, this_storage)
277
278
279def test_general_storage_copy_local_to_storage():
280 tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
281 _test_copy_local_to_storage_works(tmpdir, this_storage)