1 # -------------------------------------------------------------------------
2 # Copyright (C) 2020 Wipro Limited.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # -------------------------------------------------------------------------
20 from unittest.mock import patch
21 import osdf.config.loader as config_loader
23 from apps.nxi_termination.optimizers.remote_opt_processor import process_nxi_termination_opt
24 from osdf.adapters.aai.fetch_aai_data import AAIException
26 from osdf.config.base import osdf_config
27 from osdf.utils.programming_utils import DotDict
28 from osdf.utils.interfaces import json_from_file
30 class TestRemoteOptProcessor(unittest.TestCase):
33 "deployment": "config/osdf_config.yaml",
34 "core": "config/common_config.yaml"
36 self.osdf_config = DotDict(config_loader.all_configs(**self.config_spec))
42 def test_process_nxi_termination_opt(self):
44 request_file = main_dir + 'test/apps/nxi_termination/nxi_termination.json'
45 nssi_request_file=main_dir + 'test/apps/nxi_termination/nssi_termination.json'
46 service_file = main_dir + 'test/apps/nxi_termination/service_profiles.json'
47 failure_service_file = main_dir + 'test/apps/nxi_termination/failure_service_profiles.json'
48 failure_service_file2 = main_dir + 'test/apps/nxi_termination/failure_service_profiles2.json'
49 nsi_success=main_dir + 'test/apps/nxi_termination/nsi_success_output.json'
50 nxi_failure1 = main_dir + 'test/apps/nxi_termination/nxi_failure_output1.json'
51 nxi_failure2 = main_dir + 'test/apps/nxi_termination/nxi_failure_output2.json'
52 nssi_failure = main_dir + 'test/apps/nxi_termination/nssi_failure_output.json'
53 success_rel_file = main_dir + 'test/apps/nxi_termination/success_relationship_list.json'
54 failure_rel_file1 = main_dir + 'test/apps/nxi_termination/failure_relationship_list.json'
55 failure_rel_file2 = main_dir + 'test/apps/nxi_termination/failure_relationship_list2.json'
56 exception_response_file1 = main_dir + 'test/apps/nxi_termination/exception_response1.json'
57 request_json=json_from_file(request_file)
58 nssi_request_json = json_from_file(nssi_request_file)
59 service_profile_json = json_from_file(service_file)
60 failure_service_profile_json = json_from_file(failure_service_file)
61 failure_service_profile_json2 = json_from_file(failure_service_file2)
62 success_rel_json=json_from_file(success_rel_file)
63 failure_rel_json = json_from_file(failure_rel_file1)
64 failure_rel_json2 = json_from_file(failure_rel_file2)
65 success_output_json=json_from_file(nsi_success)
66 nxi_failure_output_json1 = json_from_file(nxi_failure1)
67 nxi_failure_output_json2 = json_from_file(nxi_failure2)
68 nssi_failure_output_json = json_from_file(nssi_failure)
69 exception_response_json1 = json_from_file(exception_response_file1)
72 self.patcher_req=patch('apps.nxi_termination.optimizers.remote_opt_processor.get_service_profiles',return_value=service_profile_json)
73 self.Mock_req = self.patcher_req.start()
74 self.assertEquals(success_output_json, process_nxi_termination_opt(request_json,osdf_config))
75 self.patcher_req.stop()
78 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_service_profiles', return_value=failure_service_profile_json)
79 self.Mock_req = self.patcher_req.start()
80 self.assertEquals(nxi_failure_output_json1, process_nxi_termination_opt(request_json, osdf_config))
81 self.patcher_req.stop()
84 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_service_profiles',
86 self.Mock_req = self.patcher_req.start()
87 self.assertEquals(success_output_json, process_nxi_termination_opt(request_json, osdf_config))
88 self.patcher_req.stop()
90 # nsi failure scenario
91 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_service_profiles',
92 return_value=failure_service_profile_json2)
93 self.Mock_req = self.patcher_req.start()
94 self.assertEquals(nxi_failure_output_json2, process_nxi_termination_opt(request_json, osdf_config))
95 self.patcher_req.stop()
97 # nssi success scenario
98 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist', return_value=success_rel_json)
99 self.Mock_req = self.patcher_req.start()
100 self.assertEquals(success_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
101 self.patcher_req.stop()
103 # nssi success scenario
104 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
106 self.Mock_req = self.patcher_req.start()
107 self.assertEquals(success_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
108 self.patcher_req.stop()
110 # nssi failure scenario
111 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
112 return_value=failure_rel_json)
113 self.Mock_req = self.patcher_req.start()
114 self.assertEquals(nssi_failure_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
115 self.patcher_req.stop()
117 # nssi failure scenario
118 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
119 return_value=failure_rel_json2)
120 self.Mock_req = self.patcher_req.start()
121 self.assertEquals(nxi_failure_output_json2, process_nxi_termination_opt(nssi_request_json, osdf_config))
122 self.patcher_req.stop()
124 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
125 side_effect=AAIException("Error response recieved from AAI for the request"))
126 self.Mock_req = self.patcher_req.start()
127 self.assertEquals(exception_response_json1, process_nxi_termination_opt(nssi_request_json, osdf_config))
128 self.patcher_req.stop()
131 self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
132 side_effect=AAIException("Request exception was encountered"))
133 self.Mock_req = self.patcher_req.start()
134 self.assertEquals("failure", process_nxi_termination_opt(nssi_request_json, osdf_config).get('requestStatus'))
135 self.patcher_req.stop()
140 if __name__ == "__main__":