Commit | Line | Data |
---|---|---|
2616d709 | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
cf29e8a8 | 2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
2616d709 CAW |
3 | # |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | import os | |
18 | import tempfile | |
19 | ||
2ecee34f | 20 | |
626a093c | 21 | from mediagoblin.tools import workbench |
9408938b SS |
22 | from mediagoblin.mg_globals import setup_globals |
23 | from mediagoblin.decorators import get_workbench | |
fd1202b7 | 24 | from mediagoblin.tests.test_storage import get_tmp_filestorage, cleanup_storage |
2616d709 CAW |
25 | |
26 | ||
27 | class TestWorkbench(object): | |
958080be | 28 | def setup(self): |
25067d81 | 29 | self.workbench_base = tempfile.mkdtemp(prefix='gmg_workbench_testing') |
2616d709 | 30 | self.workbench_manager = workbench.WorkbenchManager( |
25067d81 E |
31 | self.workbench_base) |
32 | ||
33 | def teardown(self): | |
34 | # If the workbench is empty, this should work. | |
35 | os.rmdir(self.workbench_base) | |
2616d709 CAW |
36 | |
37 | def test_create_workbench(self): | |
bd6fe977 | 38 | workbench = self.workbench_manager.create() |
52426ae0 E |
39 | assert os.path.isdir(workbench.dir) |
40 | assert workbench.dir.startswith(self.workbench_manager.base_workbench_dir) | |
bd6fe977 | 41 | workbench.destroy() |
52426ae0 E |
42 | |
43 | def test_joinpath(self): | |
bd6fe977 | 44 | this_workbench = self.workbench_manager.create() |
52426ae0 E |
45 | tmpname = this_workbench.joinpath('temp.txt') |
46 | assert tmpname == os.path.join(this_workbench.dir, 'temp.txt') | |
bd6fe977 | 47 | this_workbench.destroy() |
2616d709 CAW |
48 | |
49 | def test_destroy_workbench(self): | |
50 | # kill a workbench | |
bd6fe977 | 51 | this_workbench = self.workbench_manager.create() |
52426ae0 E |
52 | tmpfile_name = this_workbench.joinpath('temp.txt') |
53 | tmpfile = file(tmpfile_name, 'w') | |
2616d709 CAW |
54 | with tmpfile: |
55 | tmpfile.write('lollerskates') | |
56 | ||
52426ae0 | 57 | assert os.path.exists(tmpfile_name) |
2ecee34f | 58 | |
b67a983a | 59 | wb_dir = this_workbench.dir |
bd6fe977 | 60 | this_workbench.destroy() |
52426ae0 | 61 | assert not os.path.exists(tmpfile_name) |
b67a983a | 62 | assert not os.path.exists(wb_dir) |
f43ecb0f | 63 | |
68ffb136 | 64 | def test_localized_file(self): |
f43ecb0f | 65 | tmpdir, this_storage = get_tmp_filestorage() |
bd6fe977 | 66 | this_workbench = self.workbench_manager.create() |
9408938b | 67 | |
f43ecb0f CAW |
68 | # Write a brand new file |
69 | filepath = ['dir1', 'dir2', 'ourfile.txt'] | |
70 | ||
71 | with this_storage.get_file(filepath, 'w') as our_file: | |
72 | our_file.write('Our file') | |
73 | ||
74 | # with a local file storage | |
52426ae0 | 75 | filename = this_workbench.localized_file(this_storage, filepath) |
f43ecb0f CAW |
76 | assert filename == os.path.join( |
77 | tmpdir, 'dir1/dir2/ourfile.txt') | |
25067d81 | 78 | this_storage.delete_file(filepath) |
fd1202b7 | 79 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
f43ecb0f CAW |
80 | |
81 | # with a fake remote file storage | |
82 | tmpdir, this_storage = get_tmp_filestorage(fake_remote=True) | |
83 | ||
84 | # ... write a brand new file, again ;) | |
85 | with this_storage.get_file(filepath, 'w') as our_file: | |
86 | our_file.write('Our file') | |
87 | ||
8bfa533f | 88 | filename = this_workbench.localized_file(this_storage, filepath) |
f43ecb0f | 89 | assert filename == os.path.join( |
52426ae0 | 90 | this_workbench.dir, 'ourfile.txt') |
9408938b | 91 | |
f43ecb0f | 92 | # fake remote file storage, filename_if_copying set |
8bfa533f E |
93 | filename = this_workbench.localized_file( |
94 | this_storage, filepath, 'thisfile') | |
f43ecb0f | 95 | assert filename == os.path.join( |
52426ae0 | 96 | this_workbench.dir, 'thisfile.txt') |
f43ecb0f CAW |
97 | |
98 | # fake remote file storage, filename_if_copying set, | |
99 | # keep_extension_if_copying set to false | |
8bfa533f E |
100 | filename = this_workbench.localized_file( |
101 | this_storage, filepath, 'thisfile.text', False) | |
f43ecb0f | 102 | assert filename == os.path.join( |
52426ae0 | 103 | this_workbench.dir, 'thisfile.text') |
9408938b | 104 | |
25067d81 | 105 | this_storage.delete_file(filepath) |
fd1202b7 | 106 | cleanup_storage(this_storage, tmpdir, ['dir1', 'dir2']) |
25067d81 E |
107 | this_workbench.destroy() |
108 | ||
9408938b SS |
109 | def test_workbench_decorator(self): |
110 | """Test @get_workbench decorator and automatic cleanup""" | |
111 | # The decorator needs mg_globals.workbench_manager | |
112 | setup_globals(workbench_manager=self.workbench_manager) | |
113 | ||
114 | @get_workbench | |
115 | def create_it(workbench=None): | |
116 | # workbench dir exists? | |
117 | assert os.path.isdir(workbench.dir) | |
118 | return workbench.dir | |
119 | ||
120 | benchdir = create_it() | |
121 | # workbench dir has been cleaned up automatically? | |
122 | assert not os.path.isdir(benchdir) |