Commit | Line | Data |
---|---|---|
8e1e744d | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
a6b378ef CAW |
2 | # Copyright (C) 2011 Free Software Foundation, Inc |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | ||
17e7093e CAW |
18 | import os |
19 | import tempfile | |
20 | ||
21 | from nose.tools import assert_raises | |
20e3ee11 | 22 | from werkzeug.utils import secure_filename |
17e7093e | 23 | |
a6b378ef CAW |
24 | from mediagoblin import storage |
25 | ||
26 | ||
27 | def test_clean_listy_filepath(): | |
28 | expected = [u'dir1', u'dir2', u'linooks.jpg'] | |
29 | assert storage.clean_listy_filepath( | |
30 | ['dir1', 'dir2', 'linooks.jpg']) == expected | |
31 | ||
32 | expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg'] | |
33 | assert storage.clean_listy_filepath( | |
34 | ['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected | |
35 | ||
36 | expected = [u'etc', u'passwd'] | |
37 | assert storage.clean_listy_filepath( | |
38 | ['../../../etc/', 'passwd']) == expected | |
770c12be | 39 | |
17e7093e CAW |
40 | assert_raises( |
41 | storage.InvalidFilepath, | |
42 | storage.clean_listy_filepath, | |
43 | ['../../', 'linooks.jpg']) | |
44 | ||
45 | ||
46 | ########################## | |
47 | # Basic file storage tests | |
48 | ########################## | |
49 | ||
50 | def get_tmp_filestorage(mount_url=None): | |
51 | tmpdir = tempfile.mkdtemp() | |
52 | this_storage = storage.BasicFileStorage(tmpdir, mount_url) | |
53 | return tmpdir, this_storage | |
54 | ||
55 | ||
56 | def test_basic_storage__resolve_filepath(): | |
57 | tmpdir, this_storage = get_tmp_filestorage() | |
58 | ||
59 | result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg']) | |
60 | assert result == os.path.join( | |
61 | tmpdir, 'dir1/dir2/filename.jpg') | |
62 | ||
63 | result = this_storage._resolve_filepath(['../../etc/', 'passwd']) | |
64 | assert result == os.path.join( | |
65 | tmpdir, 'etc/passwd') | |
66 | ||
67 | assert_raises( | |
68 | storage.InvalidFilepath, | |
69 | this_storage._resolve_filepath, | |
70 | ['../../', 'etc', 'passwd']) | |
71 | ||
72 | ||
73 | def test_basic_storage_file_exists(): | |
92fb87ae CAW |
74 | tmpdir, this_storage = get_tmp_filestorage() |
75 | ||
76 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
77 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
78 | with open(filename, 'w') as ourfile: | |
79 | ourfile.write("I'm having a lovely day!") | |
80 | ||
81 | assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt']) | |
82 | assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol']) | |
83 | assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol']) | |
84 | ||
85 | ||
20e3ee11 CAW |
86 | def test_basic_storage_get_unique_filepath(): |
87 | tmpdir, this_storage = get_tmp_filestorage() | |
88 | ||
89 | # write something that exists | |
90 | os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2')) | |
91 | filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt') | |
92 | with open(filename, 'w') as ourfile: | |
93 | ourfile.write("I'm having a lovely day!") | |
94 | ||
95 | # now we want something new, with the same name! | |
96 | new_filepath = this_storage.get_unique_filepath( | |
97 | ['dir1', 'dir2', 'filename.txt']) | |
98 | assert new_filepath[:-1] == [u'dir1', u'dir2'] | |
99 | ||
100 | new_filename = new_filepath[-1] | |
101 | assert new_filename.endswith('filename.txt') | |
102 | assert len(new_filename) > len('filename.txt') | |
103 | assert new_filename == secure_filename(new_filename) | |
17e7093e CAW |
104 | |
105 | ||
106 | def test_basic_storage_get_file(): | |
d2be0838 CAW |
107 | tmpdir, this_storage = get_tmp_filestorage() |
108 | ||
109 | # Write a brand new file | |
110 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
111 | ||
112 | with this_storage.get_file(filepath, 'w') as our_file: | |
113 | our_file.write('First file') | |
114 | with this_storage.get_file(filepath, 'r') as our_file: | |
115 | assert our_file.read() == 'First file' | |
116 | assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
117 | with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file: | |
118 | assert our_file.read() == 'First file' | |
119 | ||
120 | # Write to the same path but try to get a unique file. | |
121 | new_filepath = this_storage.get_unique_filepath(filepath) | |
122 | assert not os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
123 | ||
124 | with this_storage.get_file(new_filepath, 'w') as our_file: | |
125 | our_file.write('Second file') | |
126 | with this_storage.get_file(new_filepath, 'r') as our_file: | |
127 | assert our_file.read() == 'Second file' | |
128 | assert os.path.exists(os.path.join(tmpdir, *new_filepath)) | |
129 | with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file: | |
130 | assert our_file.read() == 'Second file' | |
131 | ||
132 | # Read from an existing file | |
133 | manually_written_file = os.makedirs( | |
134 | os.path.join(tmpdir, 'testydir')) | |
135 | with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile: | |
136 | testyfile.write('testy file! so testy.') | |
137 | ||
138 | with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile: | |
139 | assert testyfile.read() == 'testy file! so testy.' | |
17e7093e CAW |
140 | |
141 | ||
142 | def test_basic_storage_delete_file(): | |
d024806a CAW |
143 | tmpdir, this_storage = get_tmp_filestorage() |
144 | ||
145 | assert not os.path.exists( | |
146 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
147 | ||
148 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
149 | with this_storage.get_file(filepath, 'w') as our_file: | |
150 | our_file.write('Testing this file') | |
151 | ||
152 | assert os.path.exists( | |
153 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
154 | ||
155 | this_storage.delete_file(filepath) | |
156 | ||
157 | assert not os.path.exists( | |
158 | os.path.join(tmpdir, 'dir1/dir2/ourfile.txt')) | |
17e7093e CAW |
159 | |
160 | ||
161 | def test_basic_storage_url_for_file(): | |
01da9e6a CAW |
162 | # Not supplying a base_url should actually just bork. |
163 | tmpdir, this_storage = get_tmp_filestorage() | |
164 | assert_raises( | |
165 | storage.NoWebServing, | |
166 | this_storage.file_url, | |
167 | ['dir1', 'dir2', 'filename.txt']) | |
168 | ||
169 | # base_url without domain | |
170 | tmpdir, this_storage = get_tmp_filestorage('/media/') | |
171 | result = this_storage.file_url( | |
172 | ['dir1', 'dir2', 'filename.txt']) | |
173 | expected = '/media/dir1/dir2/filename.txt' | |
174 | assert result == expected | |
175 | ||
176 | # base_url with domain | |
177 | tmpdir, this_storage = get_tmp_filestorage( | |
178 | 'http://media.example.org/ourmedia/') | |
179 | result = this_storage.file_url( | |
180 | ['dir1', 'dir2', 'filename.txt']) | |
181 | expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt' | |
182 | assert result == expected |