Merge remote branch 'remotes/aaronw/feature410_markdown_bio'
[mediagoblin.git] / mediagoblin / tests / test_storage.py
CommitLineData
8e1e744d 1# GNU MediaGoblin -- federated, autonomous media hosting
a6b378ef
CAW
2# Copyright (C) 2011 Free Software Foundation, Inc
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
17e7093e
CAW
18import os
19import tempfile
20
21from nose.tools import assert_raises
20e3ee11 22from werkzeug.utils import secure_filename
17e7093e 23
a6b378ef
CAW
24from mediagoblin import storage
25
26
ffa22935
CAW
27################
28# Test utilities
29################
30
a6b378ef
CAW
31def test_clean_listy_filepath():
32 expected = [u'dir1', u'dir2', u'linooks.jpg']
33 assert storage.clean_listy_filepath(
34 ['dir1', 'dir2', 'linooks.jpg']) == expected
35
36 expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg']
37 assert storage.clean_listy_filepath(
38 ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
39
40 expected = [u'etc', u'passwd']
41 assert storage.clean_listy_filepath(
42 ['../../../etc/', 'passwd']) == expected
770c12be 43
17e7093e
CAW
44 assert_raises(
45 storage.InvalidFilepath,
46 storage.clean_listy_filepath,
47 ['../../', 'linooks.jpg'])
48
49
ffa22935
CAW
50class FakeStorageSystem():
51 def __init__(self, foobie, blech, **kwargs):
52 self.foobie = foobie
53 self.blech = blech
54
d91b5a7c
CAW
55class FakeRemoteStorage(storage.BasicFileStorage):
56 # Theoretically despite this, all the methods should work but it
57 # should force copying to the workbench
58 local_storage = False
59
ffa22935 60
3c7d11ff
CAW
61def test_storage_system_from_config():
62 this_storage = storage.storage_system_from_config(
ffa22935
CAW
63 {'somestorage_base_url': 'http://example.org/moodia/',
64 'somestorage_base_dir': '/tmp/',
65 'somestorage_garbage_arg': 'garbage_arg',
66 'garbage_arg': 'trash'},
67 'somestorage')
68 assert this_storage.base_url == 'http://example.org/moodia/'
69 assert this_storage.base_dir == '/tmp/'
70 assert this_storage.__class__ is storage.BasicFileStorage
71
3c7d11ff 72 this_storage = storage.storage_system_from_config(
ffa22935
CAW
73 {'somestorage_foobie': 'eiboof',
74 'somestorage_blech': 'hcelb',
75 'somestorage_garbage_arg': 'garbage_arg',
76 'garbage_arg': 'trash',
77 'somestorage_storage_class':
78 'mediagoblin.tests.test_storage:FakeStorageSystem'},
79 'somestorage')
80 assert this_storage.foobie == 'eiboof'
81 assert this_storage.blech == 'hcelb'
82 assert this_storage.__class__ is FakeStorageSystem
83
84
17e7093e
CAW
85##########################
86# Basic file storage tests
87##########################
88
d91b5a7c 89def get_tmp_filestorage(mount_url=None, fake_remote=False):
17e7093e 90 tmpdir = tempfile.mkdtemp()
d91b5a7c
CAW
91 if fake_remote:
92 this_storage = FakeRemoteStorage(tmpdir, mount_url)
93 else:
94 this_storage = storage.BasicFileStorage(tmpdir, mount_url)
17e7093e
CAW
95 return tmpdir, this_storage
96
97
98def test_basic_storage__resolve_filepath():
99 tmpdir, this_storage = get_tmp_filestorage()
100
101 result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
102 assert result == os.path.join(
103 tmpdir, 'dir1/dir2/filename.jpg')
104
105 result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
106 assert result == os.path.join(
107 tmpdir, 'etc/passwd')
108
109 assert_raises(
110 storage.InvalidFilepath,
111 this_storage._resolve_filepath,
112 ['../../', 'etc', 'passwd'])
113
114
115def test_basic_storage_file_exists():
92fb87ae
CAW
116 tmpdir, this_storage = get_tmp_filestorage()
117
118 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
119 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
120 with open(filename, 'w') as ourfile:
121 ourfile.write("I'm having a lovely day!")
122
123 assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
124 assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
125 assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
126
127
20e3ee11
CAW
128def test_basic_storage_get_unique_filepath():
129 tmpdir, this_storage = get_tmp_filestorage()
130
131 # write something that exists
132 os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
133 filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
134 with open(filename, 'w') as ourfile:
135 ourfile.write("I'm having a lovely day!")
136
137 # now we want something new, with the same name!
138 new_filepath = this_storage.get_unique_filepath(
139 ['dir1', 'dir2', 'filename.txt'])
140 assert new_filepath[:-1] == [u'dir1', u'dir2']
141
142 new_filename = new_filepath[-1]
143 assert new_filename.endswith('filename.txt')
144 assert len(new_filename) > len('filename.txt')
145 assert new_filename == secure_filename(new_filename)
17e7093e
CAW
146
147
148def test_basic_storage_get_file():
d2be0838
CAW
149 tmpdir, this_storage = get_tmp_filestorage()
150
151 # Write a brand new file
152 filepath = ['dir1', 'dir2', 'ourfile.txt']
153
154 with this_storage.get_file(filepath, 'w') as our_file:
155 our_file.write('First file')
156 with this_storage.get_file(filepath, 'r') as our_file:
157 assert our_file.read() == 'First file'
158 assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
159 with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
160 assert our_file.read() == 'First file'
161
162 # Write to the same path but try to get a unique file.
163 new_filepath = this_storage.get_unique_filepath(filepath)
164 assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
165
166 with this_storage.get_file(new_filepath, 'w') as our_file:
167 our_file.write('Second file')
168 with this_storage.get_file(new_filepath, 'r') as our_file:
169 assert our_file.read() == 'Second file'
170 assert os.path.exists(os.path.join(tmpdir, *new_filepath))
171 with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
172 assert our_file.read() == 'Second file'
173
174 # Read from an existing file
175 manually_written_file = os.makedirs(
176 os.path.join(tmpdir, 'testydir'))
177 with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
178 testyfile.write('testy file! so testy.')
179
180 with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
181 assert testyfile.read() == 'testy file! so testy.'
17e7093e
CAW
182
183
184def test_basic_storage_delete_file():
d024806a
CAW
185 tmpdir, this_storage = get_tmp_filestorage()
186
187 assert not os.path.exists(
188 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
189
190 filepath = ['dir1', 'dir2', 'ourfile.txt']
191 with this_storage.get_file(filepath, 'w') as our_file:
192 our_file.write('Testing this file')
193
194 assert os.path.exists(
195 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
196
197 this_storage.delete_file(filepath)
198
199 assert not os.path.exists(
200 os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
17e7093e
CAW
201
202
203def test_basic_storage_url_for_file():
01da9e6a
CAW
204 # Not supplying a base_url should actually just bork.
205 tmpdir, this_storage = get_tmp_filestorage()
206 assert_raises(
207 storage.NoWebServing,
208 this_storage.file_url,
209 ['dir1', 'dir2', 'filename.txt'])
210
211 # base_url without domain
212 tmpdir, this_storage = get_tmp_filestorage('/media/')
213 result = this_storage.file_url(
214 ['dir1', 'dir2', 'filename.txt'])
215 expected = '/media/dir1/dir2/filename.txt'
216 assert result == expected
217
218 # base_url with domain
219 tmpdir, this_storage = get_tmp_filestorage(
220 'http://media.example.org/ourmedia/')
221 result = this_storage.file_url(
222 ['dir1', 'dir2', 'filename.txt'])
223 expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
224 assert result == expected
3a89c23e
CAW
225
226
227def test_basic_storage_get_local_path():
228 tmpdir, this_storage = get_tmp_filestorage()
229
230 result = this_storage.get_local_path(
231 ['dir1', 'dir2', 'filename.txt'])
232
233 expected = os.path.join(
234 tmpdir, 'dir1/dir2/filename.txt')
235
236 assert result == expected
237
238
239def test_basic_storage_is_local():
240 tmpdir, this_storage = get_tmp_filestorage()
241 assert this_storage.local_storage is True
6a07362d
CAW
242
243
244def test_basic_storage_copy_locally():
245 tmpdir, this_storage = get_tmp_filestorage()
246
247 dest_tmpdir = tempfile.mkdtemp()
248
249 filepath = ['dir1', 'dir2', 'ourfile.txt']
250 with this_storage.get_file(filepath, 'w') as our_file:
251 our_file.write('Testing this file')
252
253 new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
254
255 this_storage.copy_locally(filepath, new_file_dest)
256
257 assert file(new_file_dest).read() == 'Testing this file'