-# -------------------------------------------------------------------------
-# Copyright (C) 2020 Wipro Limited.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# -------------------------------------------------------------------------
-#
-
-import json
-import unittest
-from requests import RequestException
-
-from apps.slice_selection.optimizers.conductor.remote_opt_processor import process_nsi_selection_opt
-from osdf.adapters.local_data import local_policies
-from osdf.utils.interfaces import json_from_file, yaml_from_file
-from osdf.utils.programming_utils import DotDict
-import osdf.config.loader as config_loader
-from mock import patch, MagicMock
-import json
-from osdf.logging.osdf_logging import error_log, debug_log
-from osdf.adapters.policy.interface import get_policies
-
-
-class TestRemoteOptProcessor(unittest.TestCase):
- def setUp(self):
- self.config_spec = {
- "deployment": "config/osdf_config.yaml",
- "core": "config/common_config.yaml"
- }
- self.osdf_config = DotDict(config_loader.all_configs(**self.config_spec))
-
- def tearDown(self):
- patch.stopall()
-
- def test_process_nsi_selection_opt(self):
- main_dir = ""
- request_file = main_dir + 'test/apps/slice_selection/nsi_request.json'
- new_solution_response_file = main_dir + 'test/apps/slice_selection/new_solution_nsi_response.json'
- shared_solution_response_file = main_dir + 'test/apps/slice_selection/shared_solution_nsi_response.json'
- error_response_file = main_dir + 'test/apps/slice_selection/nsi_error_response.json'
-
- request_json = json_from_file(request_file)
- new_solution_response_json = json_from_file(new_solution_response_file)
- shared_solution_response_json = json_from_file(shared_solution_response_file)
- error_response_json = json_from_file(error_response_file)
-
- policies_path = main_dir + 'test/policy-local-files'
- slice_policies_file = main_dir + 'test/apps/slice_selection/slice_policies.txt'
-
- valid_policies_files = local_policies.get_policy_names_from_file(slice_policies_file)
- policies = [json_from_file(policies_path + '/' + name) for name in valid_policies_files]
- self.patcher_get_policies = patch('osdf.adapters.policy.interface.remote_api',
- return_value=policies)
- self.Mock_get_policies = self.patcher_get_policies.start()
-
- new_solution_conductor_response_file = 'test/apps/slice_selection/new_solution_conductor_response.json'
- new_solution_conductor_response = json_from_file(new_solution_conductor_response_file)
- self.patcher_req = patch('osdf.adapters.conductor.conductor.request',
- return_value=new_solution_conductor_response)
- self.Mock_req = self.patcher_req.start()
- self.assertEquals(new_solution_response_json, process_nsi_selection_opt(request_json, self.osdf_config))
- self.patcher_req.stop()
-
- shared_solution_conductor_response_file = 'test/apps/slice_selection/shared_solution_conductor_response.json'
- shared_solution_conductor_response = json_from_file(shared_solution_conductor_response_file)
- self.patcher_req = patch('osdf.adapters.conductor.conductor.request',
- return_value=shared_solution_conductor_response)
- self.Mock_req = self.patcher_req.start()
- self.assertEquals(shared_solution_response_json,
- process_nsi_selection_opt(request_json, self.osdf_config))
- self.patcher_req.stop()
-
- conductor_error_response_file = 'test/apps/slice_selection/conductor_error_response.json'
- conductor_error_response = json_from_file(conductor_error_response_file)
-
- self.patcher_req = patch('osdf.adapters.conductor.conductor.request',
- side_effect=RequestException(response=json.dumps(conductor_error_response)))
- self.Mock_req = self.patcher_req.start()
- self.assertEquals(error_response_json, process_nsi_selection_opt(request_json, self.osdf_config))
- self.patcher_req.stop()
-
- self.patcher_req = patch('osdf.adapters.conductor.conductor.request',
- side_effect=Exception("test_exception"))
- self.Mock_req = self.patcher_req.start()
- self.assertEquals('test_exception',
- process_nsi_selection_opt(request_json, self.osdf_config).get('statusMessage'))
- self.patcher_req.stop()
-
-
-if __name__ == "__main__":
- unittest.main()
-
+# -------------------------------------------------------------------------\r
+# Copyright (C) 2020 Wipro Limited.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+#\r
+# -------------------------------------------------------------------------\r
+#\r
+\r
+import json\r
+import unittest\r
+from requests import RequestException\r
+\r
+from apps.slice_selection.optimizers.conductor.remote_opt_processor import process_nsi_selection_opt\r
+from osdf.adapters.local_data import local_policies\r
+from osdf.utils.interfaces import json_from_file, yaml_from_file\r
+from osdf.utils.programming_utils import DotDict\r
+import osdf.config.loader as config_loader\r
+from mock import patch, MagicMock\r
+import json\r
+from osdf.logging.osdf_logging import error_log, debug_log\r
+from osdf.adapters.policy.interface import get_policies\r
+\r
+\r
+class TestRemoteOptProcessor(unittest.TestCase):\r
+ def setUp(self):\r
+ self.config_spec = {\r
+ "deployment": "config/osdf_config.yaml",\r
+ "core": "config/common_config.yaml"\r
+ }\r
+ self.osdf_config = DotDict(config_loader.all_configs(**self.config_spec))\r
+\r
+ def tearDown(self):\r
+ patch.stopall()\r
+\r
+ def test_process_nsi_selection_opt(self):\r
+ main_dir = ""\r
+ request_file = main_dir + 'test/apps/slice_selection/nsi_request.json'\r
+ not_shared_request_file = main_dir + 'test/apps/slice_selection/not_shared_nsi_request.json'\r
+ #response files\r
+ new_solution_response_file = main_dir + 'test/apps/slice_selection/new_solution_nsi_response.json'\r
+ shared_solution_response_file = main_dir + 'test/apps/slice_selection/shared_solution_nsi_response.json'\r
+ no_solution_response_file = main_dir + 'test/apps/slice_selection/no_recomm_nsi_response.json'\r
+ not_shared_response_file = main_dir + 'test/apps/slice_selection/not_shared_nsi_response.json'\r
+ error_response_file = main_dir + 'test/apps/slice_selection/nsi_error_response.json'\r
+\r
+ not_shared_request_json = json_from_file(not_shared_request_file)\r
+ not_shared_response_json = json_from_file(not_shared_response_file)\r
+ request_json = json_from_file(request_file)\r
+ new_solution_response_json = json_from_file(new_solution_response_file)\r
+ shared_solution_response_json = json_from_file(shared_solution_response_file)\r
+ no_solution_response_json = json_from_file(no_solution_response_file)\r
+ error_response_json = json_from_file(error_response_file)\r
+\r
+ policies_path = main_dir + 'test/policy-local-files'\r
+ slice_policies_file = main_dir + 'test/apps/slice_selection/slice_policies.txt'\r
+\r
+ valid_policies_files = local_policies.get_policy_names_from_file(slice_policies_file)\r
+ policies = [json_from_file(policies_path + '/' + name) for name in valid_policies_files]\r
+ self.patcher_get_policies = patch('osdf.adapters.policy.interface.remote_api',\r
+ return_value=policies)\r
+ self.Mock_get_policies = self.patcher_get_policies.start()\r
+ # new solution\r
+ new_solution_conductor_response_file = 'test/apps/slice_selection/new_solution_conductor_response.json'\r
+ new_solution_conductor_response = json_from_file(new_solution_conductor_response_file)\r
+ self.patcher_req = patch('osdf.adapters.conductor.conductor.request',\r
+ return_value=new_solution_conductor_response)\r
+ self.Mock_req = self.patcher_req.start()\r
+ self.assertEquals(new_solution_response_json, process_nsi_selection_opt(request_json, self.osdf_config))\r
+ self.patcher_req.stop()\r
+ # shared solution\r
+ shared_solution_conductor_response_file = 'test/apps/slice_selection/shared_solution_conductor_response.json'\r
+ shared_solution_conductor_response = json_from_file(shared_solution_conductor_response_file)\r
+ self.patcher_req = patch('osdf.adapters.conductor.conductor.request',\r
+ return_value=shared_solution_conductor_response)\r
+ self.Mock_req = self.patcher_req.start()\r
+ self.assertEquals(shared_solution_response_json,\r
+ process_nsi_selection_opt(request_json, self.osdf_config))\r
+ self.patcher_req.stop()\r
+ # not-shared solution\r
+ self.assertEquals(not_shared_response_json,\r
+ process_nsi_selection_opt(not_shared_request_json, self.osdf_config))\r
+ # no recommendation\r
+ no_solution_conductor_response_file = 'test/apps/slice_selection/no_rec.json'\r
+ no_solution_conductor_response = json_from_file(no_solution_conductor_response_file)\r
+ self.patcher_req = patch('osdf.adapters.conductor.conductor.request',\r
+ return_value=no_solution_conductor_response)\r
+ self.Mock_req = self.patcher_req.start()\r
+ self.assertEquals(no_solution_response_json,\r
+ process_nsi_selection_opt(request_json, self.osdf_config))\r
+ self.patcher_req.stop()\r
+\r
+ conductor_error_response_file = 'test/apps/slice_selection/conductor_error_response.json'\r
+ conductor_error_response = json_from_file(conductor_error_response_file)\r
+\r
+ self.patcher_req = patch('osdf.adapters.conductor.conductor.request',\r
+ side_effect=RequestException(response=json.dumps(conductor_error_response)))\r
+ self.Mock_req = self.patcher_req.start()\r
+ self.assertEquals(error_response_json, process_nsi_selection_opt(request_json, self.osdf_config))\r
+ self.patcher_req.stop()\r
+\r
+ self.patcher_req = patch('osdf.adapters.conductor.conductor.request',\r
+ side_effect=Exception("test_exception"))\r
+ self.Mock_req = self.patcher_req.start()\r
+ self.assertEquals('test_exception',\r
+ process_nsi_selection_opt(request_json, self.osdf_config).get('statusMessage'))\r
+ self.patcher_req.stop()\r
+\r
+\r
+if __name__ == "__main__":\r
+ unittest.main()\r
+\r