Merge remote-tracking branch 'refs/remotes/upstream/master' into 569-application...
[mediagoblin.git] / mediagoblin / tests / test_storage.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18 import os
19 import tempfile
20
21 from nose.tools import assert_raises
22 from werkzeug.utils import secure_filename
23
24 from mediagoblin import storage
25
26
27 ################
28 # Test utilities
29 ################
30
31 def test_clean_listy_filepath():
32 expected = [u'dir1', u'dir2', u'linooks.jpg']
33 assert storage.clean_listy_filepath(
34 ['dir1', 'dir2', 'linooks.jpg']) == expected
35
36 expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg']
37 assert storage.clean_listy_filepath(
38 ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
39
40 expected = [u'etc', u'passwd']
41 assert storage.clean_listy_filepath(
42 ['../../../etc/', 'passwd']) == expected
43
44 assert_raises(
45 storage.InvalidFilepath,
46 storage.clean_listy_filepath,
47 ['../../', 'linooks.jpg'])
48
49
50 class FakeStorageSystem():
51 def __init__(self, foobie, blech, **kwargs):
52 self.foobie = foobie
53 self.blech = blech
54
55 class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
56 # Theoretically despite this, all the methods should work but it
57 # should force copying to the workbench
58 local_storage = False
59
60
61 def test_storage_system_from_config():
62 this_storage = storage.storage_system_from_config(
63 {'base_url': 'http://example.org/moodia/',
64 'base_dir': '/tmp/',
65 'garbage_arg': 'garbage_arg',
66 'garbage_arg': 'trash'})
67 assert this_storage.base_url == 'http://example.org/moodia/'
68 assert this_storage.base_dir == '/tmp/'
69 assert this_storage.__class__ is storage.filestorage.BasicFileStorage
70
71 this_storage = storage.storage_system_from_config(
72 {'foobie': 'eiboof',
73 'blech': 'hcelb',
74 'garbage_arg': 'garbage_arg',
75 'storage_class':
76 'mediagoblin.tests.test_storage:FakeStorageSystem'})
77 assert this_storage.foobie == 'eiboof'
78 assert this_storage.blech == 'hcelb'
79 assert this_storage.__class__ is FakeStorageSystem
80
81
82 ##########################
83 # Basic file storage tests
84 ##########################
85
86 def get_tmp_filestorage(mount_url=None, fake_remote=False):
87 tmpdir = tempfile.mkdtemp()
88 if fake_remote:
89 this_storage = FakeRemoteStorage(tmpdir, mount_url)
90 else:
91 this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
92 return tmpdir, this_storage
93
94
95 def test_basic_storage__resolve_filepath():
96 tmpdir, this_storage = get_tmp_filestorage()
97
98 result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
99 assert result == os.path.join(
100 tmpdir, 'dir1/dir2/filename.jpg')
101
102 result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
103 assert result == os.path.join(
104 tmpdir, 'etc/passwd')
105
106 assert_raises(
107 storage.InvalidFilepath,
108 this_storage._resolve_filepath,
109 ['../../', 'etc', 'passwd'])
110
111
112 def test_basic_storage_file_exists():
113 tmpdir, this_storage = get_tmp_filestorage()
114
115 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
116 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
117 with open(filename, 'w') as ourfile:
118 ourfile.write("I'm having a lovely day!")
119
120 assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
121 assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
122 assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
123
124
125 def test_basic_storage_get_unique_filepath():
126 tmpdir, this_storage = get_tmp_filestorage()
127
128 # write something that exists
129 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
130 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
131 with open(filename, 'w') as ourfile:
132 ourfile.write("I'm having a lovely day!")
133
134 # now we want something new, with the same name!
135 new_filepath = this_storage.get_unique_filepath(
136 ['dir1', 'dir2', 'filename.txt'])
137 assert new_filepath[:-1] == [u'dir1', u'dir2']
138
139 new_filename = new_filepath[-1]
140 assert new_filename.endswith('filename.txt')
141 assert len(new_filename) > len('filename.txt')
142 assert new_filename == secure_filename(new_filename)
143
144
145 def test_basic_storage_get_file():
146 tmpdir, this_storage = get_tmp_filestorage()
147
148 # Write a brand new file
149 filepath = ['dir1', 'dir2', 'ourfile.txt']
150
151 with this_storage.get_file(filepath, 'w') as our_file:
152 our_file.write('First file')
153 with this_storage.get_file(filepath, 'r') as our_file:
154 assert our_file.read() == 'First file'
155 assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
156 with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
157 assert our_file.read() == 'First file'
158
159 # Write to the same path but try to get a unique file.
160 new_filepath = this_storage.get_unique_filepath(filepath)
161 assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
162
163 with this_storage.get_file(new_filepath, 'w') as our_file:
164 our_file.write('Second file')
165 with this_storage.get_file(new_filepath, 'r') as our_file:
166 assert our_file.read() == 'Second file'
167 assert os.path.exists(os.path.join(tmpdir, *new_filepath))
168 with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
169 assert our_file.read() == 'Second file'
170
171 # Read from an existing file
172 manually_written_file = os.makedirs(
173 os.path.join(tmpdir, 'testydir'))
174 with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
175 testyfile.write('testy file! so testy.')
176
177 with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
178 assert testyfile.read() == 'testy file! so testy.'
179
180
181 def test_basic_storage_delete_file():
182 tmpdir, this_storage = get_tmp_filestorage()
183
184 assert not os.path.exists(
185 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
186
187 filepath = ['dir1', 'dir2', 'ourfile.txt']
188 with this_storage.get_file(filepath, 'w') as our_file:
189 our_file.write('Testing this file')
190
191 assert os.path.exists(
192 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
193
194 this_storage.delete_file(filepath)
195
196 assert not os.path.exists(
197 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
198
199
200 def test_basic_storage_url_for_file():
201 # Not supplying a base_url should actually just bork.
202 tmpdir, this_storage = get_tmp_filestorage()
203 assert_raises(
204 storage.NoWebServing,
205 this_storage.file_url,
206 ['dir1', 'dir2', 'filename.txt'])
207
208 # base_url without domain
209 tmpdir, this_storage = get_tmp_filestorage('/media/')
210 result = this_storage.file_url(
211 ['dir1', 'dir2', 'filename.txt'])
212 expected = '/media/dir1/dir2/filename.txt'
213 assert result == expected
214
215 # base_url with domain
216 tmpdir, this_storage = get_tmp_filestorage(
217 'http://media.example.org/ourmedia/')
218 result = this_storage.file_url(
219 ['dir1', 'dir2', 'filename.txt'])
220 expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
221 assert result == expected
222
223
224 def test_basic_storage_get_local_path():
225 tmpdir, this_storage = get_tmp_filestorage()
226
227 result = this_storage.get_local_path(
228 ['dir1', 'dir2', 'filename.txt'])
229
230 expected = os.path.join(
231 tmpdir, 'dir1/dir2/filename.txt')
232
233 assert result == expected
234
235
236 def test_basic_storage_is_local():
237 tmpdir, this_storage = get_tmp_filestorage()
238 assert this_storage.local_storage is True
239
240
241 def test_basic_storage_copy_locally():
242 tmpdir, this_storage = get_tmp_filestorage()
243
244 dest_tmpdir = tempfile.mkdtemp()
245
246 filepath = ['dir1', 'dir2', 'ourfile.txt']
247 with this_storage.get_file(filepath, 'w') as our_file:
248 our_file.write('Testing this file')
249
250 new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
251
252 this_storage.copy_locally(filepath, new_file_dest)
253
254 assert file(new_file_dest).read() == 'Testing this file'