1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from collections import namedtuple
21 from aria.modeling import models
22 from aria.orchestrator import exceptions
23 from aria.orchestrator.execution_plugin import common
26 class TestDownloadScript(object):
28 @pytest.fixture(autouse=True)
29 def patch_requests(self, mocker):
30 def _mock_requests_get(url):
31 response = namedtuple('Response', 'text status_code')
32 return response(url, self.status_code)
33 self.status_code = 200
34 mocker.patch.object(requests, 'get', _mock_requests_get)
36 def _test_url(self, url):
41 result = common.download_script(Ctx, script_path)
42 with open(result) as f:
43 assert script_path == f.read()
44 assert result.endswith('-some_script.py')
46 def test_http_url(self):
47 self._test_url('http://localhost/some_script.py')
49 def test_https_url(self):
50 self._test_url('https://localhost/some_script.py')
52 def test_url_status_code_404(self):
53 self.status_code = 404
54 with pytest.raises(exceptions.TaskAbortException) as exc_ctx:
56 exception = exc_ctx.value
57 assert 'status code: 404' in str(exception)
59 def test_blueprint_resource(self):
60 test_script_path = 'my_script.py'
64 def download_resource(destination, path):
65 assert path == test_script_path
67 result = common.download_script(Ctx, test_script_path)
68 assert result.endswith(test_script_path)
71 class TestCreateProcessConfig(object):
73 def test_plain_command(self):
75 process = common.create_process_config(
76 script_path=script_path,
79 assert process['command'] == script_path
81 def test_command_with_args(self):
83 process = {'args': [1, 2, 3]}
84 process = common.create_process_config(
85 script_path=script_path,
88 assert process['command'] == '{0} 1 2 3'.format(script_path)
90 def test_command_prefix(self):
92 command_prefix = 'prefix'
93 process = {'command_prefix': command_prefix}
94 process = common.create_process_config(
95 script_path=script_path,
98 assert process['command'] == '{0} {1}'.format(command_prefix, script_path)
100 def test_command_with_args_and_prefix(self):
102 command_prefix = 'prefix'
103 process = {'command_prefix': command_prefix,
105 process = common.create_process_config(
106 script_path=script_path,
109 assert process['command'] == '{0} {1} 1 2 3'.format(command_prefix, script_path)
111 def test_ctx_is_removed(self):
112 process = common.create_process_config(
115 operation_kwargs={'ctx': 1})
116 assert 'ctx' not in process['env']
118 def test_env_passed_explicitly(self):
119 env = {'one': '1', 'two': '2'}
120 process = common.create_process_config(
122 process={'env': env},
124 assert process['env'] == env
126 def test_env_populated_from_operation_kwargs(self):
127 operation_kwargs = {'one': '1', 'two': '2'}
128 process = common.create_process_config(
131 operation_kwargs=operation_kwargs)
132 assert process['env'] == operation_kwargs
134 def test_env_merged_from_operation_kwargs_and_process(self):
135 operation_kwargs = {'one': '1', 'two': '2'}
136 env = {'three': '3', 'four': '4'}
137 process = common.create_process_config(
139 process={'env': env},
140 operation_kwargs=operation_kwargs)
141 assert process['env'] == dict(operation_kwargs.items() + env.items())
143 def test_process_env_gets_precedence_over_operation_kwargs(self):
144 operation_kwargs = {'one': 'from_kwargs'}
145 env = {'one': 'from_env_process'}
146 process = common.create_process_config(
148 process={'env': env},
149 operation_kwargs=operation_kwargs)
150 assert process['env'] == env
152 def test_json_env_vars(self, mocker):
153 mocker.patch.object(common, 'is_windows', lambda: False)
154 operation_kwargs = {'a_dict': {'key': 'value'},
155 'a_list': ['a', 'b', 'c'],
156 'a_tuple': (4, 5, 6),
158 process = common.create_process_config(
161 operation_kwargs=operation_kwargs)
162 assert process['env'] == {'a_dict': '{"key": "value"}',
163 'a_list': '["a", "b", "c"]',
164 'a_tuple': '[4, 5, 6]',
167 def test_quote_json_env_vars(self):
168 operation_kwargs = {'one': []}
169 process = common.create_process_config(
172 operation_kwargs=operation_kwargs,
173 quote_json_env_vars=True)
174 assert process['env']['one'] == "'[]'"
176 def test_env_keys_converted_to_string_on_windows(self, mocker):
177 mocker.patch.object(common, 'is_windows', lambda: True)
179 process = common.create_process_config(
181 process={'env': env},
183 print type(process['env'].keys()[0])
184 assert isinstance(process['env'].keys()[0], str)
186 def test_env_values_quotes_are_escaped_on_windows(self, mocker):
187 mocker.patch.object(common, 'is_windows', lambda: True)
188 env = {'one': '"hello"'}
189 process = common.create_process_config(
191 process={'env': env},
193 assert process['env']['one'] == '\\"hello\\"'