1 # -------------------------------------------------------------------------
2 # Copyright (C) 2020 Wipro Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # -------------------------------------------------------------------------
20 from unittest.mock import patch
21 import osdf.config.loader as config_loader
23 from apps.nxi_termination.optimizers.remote_opt_processor import process_nxi_termination_opt
24 from osdf.adapters.aai.fetch_aai_data import AAIException
26 from osdf.config.base import osdf_config
27 from osdf.utils.programming_utils import DotDict
28 from osdf.utils.interfaces import json_from_file
30 class TestRemoteOptProcessor(unittest.TestCase):
33 "deployment": "config/osdf_config.yaml",
34 "core": "config/common_config.yaml"
36 self.osdf_config = DotDict(config_loader.all_configs(**self.config_spec))
42 def test_process_nxi_termination_opt(self):
44 request_file = main_dir + 'test/apps/nxi_termination/nxi_termination.json'
45 nssi_request_file=main_dir + 'test/apps/nxi_termination/nssi_termination.json'
46 service_file = main_dir + 'test/apps/nxi_termination/service_profiles.json'
47 failure_service_file = main_dir + 'test/apps/nxi_termination/failure_service_profiles.json'
48 failure_service_file2 = main_dir + 'test/apps/nxi_termination/failure_service_profiles2.json'
49 nsi_success=main_dir + 'test/apps/nxi_termination/nsi_success_output.json'
50 nxi_failure1 = main_dir + 'test/apps/nxi_termination/nxi_failure_output1.json'
51 nxi_failure2 = main_dir + 'test/apps/nxi_termination/nxi_failure_output2.json'
52 nssi_failure = main_dir + 'test/apps/nxi_termination/nssi_failure_output.json'
53 success_rel_file = main_dir + 'test/apps/nxi_termination/success_relationship_list.json'
54 failure_rel_file1 = main_dir + 'test/apps/nxi_termination/failure_relationship_list.json'
55 failure_rel_file2 = main_dir + 'test/apps/nxi_termination/failure_relationship_list2.json'
56 exception_response_file1 = main_dir + 'test/apps/nxi_termination/exception_response1.json'
57 request_json=json_from_file(request_file)
58 nssi_request_json = json_from_file(nssi_request_file)
59 service_profile_json = json_from_file(service_file)
60 failure_service_profile_json = json_from_file(failure_service_file)
61 failure_service_profile_json2 = json_from_file(failure_service_file2)
62 success_rel_json=json_from_file(success_rel_file)
63 failure_rel_json = json_from_file(failure_rel_file1)
64 failure_rel_json2 = json_from_file(failure_rel_file2)
65 success_output_json=json_from_file(nsi_success)
66 nxi_failure_output_json1 = json_from_file(nxi_failure1)
67 nxi_failure_output_json2 = json_from_file(nxi_failure2)
68 nssi_failure_output_json = json_from_file(nssi_failure)
69 exception_response_json1 = json_from_file(exception_response_file1)
72 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_allotted_resources', return_value=success_rel_json)
73 self.Mock_req = self.patcher_req.start()
74 self.assertEquals(success_output_json, process_nxi_termination_opt(request_json, osdf_config))
75 self.patcher_req.stop()
78 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_allotted_resources', return_value=failure_rel_json)
79 self.Mock_req = self.patcher_req.start()
80 self.assertEquals(nxi_failure_output_json1, process_nxi_termination_opt(request_json, osdf_config))
81 self.patcher_req.stop()
83 request_json["requestInfo"]["addtnlArgs"] = {}
86 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_allotted_resources',
88 self.Mock_req = self.patcher_req.start()
89 self.assertEquals(success_output_json, process_nxi_termination_opt(request_json, osdf_config))
90 self.patcher_req.stop()
93 # nssi success scenario
94 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count', return_value=1)
95 self.Mock_req = self.patcher_req.start()
96 self.assertEquals(success_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
97 self.patcher_req.stop()
99 # nssi failure scenario
100 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
102 self.Mock_req = self.patcher_req.start()
103 self.assertEquals(nssi_failure_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
104 self.patcher_req.stop()
106 nssi_request_json["requestInfo"]["addtnlArgs"] = {}
108 # nssi success scenario
109 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
111 self.Mock_req = self.patcher_req.start()
112 self.assertEquals(success_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
113 self.patcher_req.stop()
115 # nssi failure scenario
116 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
118 self.Mock_req = self.patcher_req.start()
119 self.assertEquals(nxi_failure_output_json2, process_nxi_termination_opt(nssi_request_json, osdf_config))
120 self.patcher_req.stop()
122 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
123 side_effect=AAIException("Error response recieved from AAI for the request"))
124 self.Mock_req = self.patcher_req.start()
125 self.assertEquals("failure", process_nxi_termination_opt(nssi_request_json, osdf_config).get('requestStatus'))
126 self.patcher_req.stop()
128 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
129 side_effect=AAIException("Request exception was encountered"))
130 self.Mock_req = self.patcher_req.start()
131 self.assertEquals("failure", process_nxi_termination_opt(nssi_request_json, osdf_config).get('requestStatus'))
132 self.patcher_req.stop()
135 if __name__ == "__main__":