4fa8170e70bdac81040de32274eb7550231ae55c
[optf/osdf.git] / test / apps / nxi_termination / test_remote_opt_processor_termination.py
1 # -------------------------------------------------------------------------
2 #   Copyright (C) 2020 Wipro Limited.
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 #
16 # -------------------------------------------------------------------------
17 #
18
19 import unittest
20 from unittest.mock import patch
21 import osdf.config.loader as config_loader
22 import pytest
23 from apps.nxi_termination.optimizers.remote_opt_processor import process_nxi_termination_opt
24 from osdf.adapters.aai.fetch_aai_data import AAIException
25
26 from osdf.config.base import osdf_config
27 from osdf.utils.programming_utils import DotDict
28 from osdf.utils.interfaces import json_from_file
29
30 class TestRemoteOptProcessor(unittest.TestCase):
31     def setUp(self):
32         self.config_spec = {
33             "deployment": "config/osdf_config.yaml",
34             "core": "config/common_config.yaml"
35         }
36         self.osdf_config = DotDict(config_loader.all_configs(**self.config_spec))
37
38     def tearDown(self):
39
40         patch.stopall()
41
42     def test_process_nxi_termination_opt(self):
43         main_dir = ""
44         request_file = main_dir + 'test/apps/nxi_termination/nxi_termination.json'
45         nssi_request_file=main_dir + 'test/apps/nxi_termination/nssi_termination.json'
46         service_file = main_dir + 'test/apps/nxi_termination/service_profiles.json'
47         failure_service_file = main_dir + 'test/apps/nxi_termination/failure_service_profiles.json'
48         failure_service_file2 = main_dir + 'test/apps/nxi_termination/failure_service_profiles2.json'
49         nsi_success=main_dir + 'test/apps/nxi_termination/nsi_success_output.json'
50         nxi_failure1 = main_dir + 'test/apps/nxi_termination/nxi_failure_output1.json'
51         nxi_failure2 = main_dir + 'test/apps/nxi_termination/nxi_failure_output2.json'
52         nssi_failure = main_dir + 'test/apps/nxi_termination/nssi_failure_output.json'
53         success_rel_file = main_dir + 'test/apps/nxi_termination/success_relationship_list.json'
54         failure_rel_file1 = main_dir + 'test/apps/nxi_termination/failure_relationship_list.json'
55         failure_rel_file2 = main_dir + 'test/apps/nxi_termination/failure_relationship_list2.json'
56         exception_response_file1 = main_dir + 'test/apps/nxi_termination/exception_response1.json'
57         request_json=json_from_file(request_file)
58         nssi_request_json = json_from_file(nssi_request_file)
59         service_profile_json = json_from_file(service_file)
60         failure_service_profile_json = json_from_file(failure_service_file)
61         failure_service_profile_json2 = json_from_file(failure_service_file2)
62         success_rel_json=json_from_file(success_rel_file)
63         failure_rel_json = json_from_file(failure_rel_file1)
64         failure_rel_json2 = json_from_file(failure_rel_file2)
65         success_output_json=json_from_file(nsi_success)
66         nxi_failure_output_json1 = json_from_file(nxi_failure1)
67         nxi_failure_output_json2 = json_from_file(nxi_failure2)
68         nssi_failure_output_json = json_from_file(nssi_failure)
69         exception_response_json1 = json_from_file(exception_response_file1)
70
71         #nsi success scenario
72         self.patcher_req=patch('apps.nxi_termination.optimizers.remote_opt_processor.get_service_profiles',return_value=service_profile_json)
73         self.Mock_req = self.patcher_req.start()
74         self.assertEquals(success_output_json, process_nxi_termination_opt(request_json,osdf_config))
75         self.patcher_req.stop()
76
77         #nsi failure scenario
78         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_service_profiles', return_value=failure_service_profile_json)
79         self.Mock_req = self.patcher_req.start()
80         self.assertEquals(nxi_failure_output_json1, process_nxi_termination_opt(request_json, osdf_config))
81         self.patcher_req.stop()
82
83         #nsi success scenario
84         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_service_profiles',
85                                  return_value=[])
86         self.Mock_req = self.patcher_req.start()
87         self.assertEquals(success_output_json, process_nxi_termination_opt(request_json, osdf_config))
88         self.patcher_req.stop()
89
90         # nsi failure scenario
91         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_service_profiles',
92                                  return_value=failure_service_profile_json2)
93         self.Mock_req = self.patcher_req.start()
94         self.assertEquals(nxi_failure_output_json2, process_nxi_termination_opt(request_json, osdf_config))
95         self.patcher_req.stop()
96         # #
97         # nssi success scenario
98         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist', return_value=success_rel_json)
99         self.Mock_req = self.patcher_req.start()
100         self.assertEquals(success_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
101         self.patcher_req.stop()
102
103         # nssi success scenario
104         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
105                                  return_value=[])
106         self.Mock_req = self.patcher_req.start()
107         self.assertEquals(success_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
108         self.patcher_req.stop()
109
110         # nssi failure scenario
111         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
112                                  return_value=failure_rel_json)
113         self.Mock_req = self.patcher_req.start()
114         self.assertEquals(nssi_failure_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
115         self.patcher_req.stop()
116
117         # nssi failure scenario
118         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
119                                  return_value=failure_rel_json2)
120         self.Mock_req = self.patcher_req.start()
121         self.assertEquals(nxi_failure_output_json2, process_nxi_termination_opt(nssi_request_json, osdf_config))
122         self.patcher_req.stop()
123
124         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
125                                  side_effect=AAIException("Error response recieved from AAI for the request"))
126         self.Mock_req = self.patcher_req.start()
127         self.assertEquals(exception_response_json1, process_nxi_termination_opt(nssi_request_json, osdf_config))
128         self.patcher_req.stop()
129
130
131         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_relationshiplist',
132                                  side_effect=AAIException("Request exception was encountered"))
133         self.Mock_req = self.patcher_req.start()
134         self.assertEquals("failure", process_nxi_termination_opt(nssi_request_json, osdf_config).get('requestStatus'))
135         self.patcher_req.stop()
136
137
138
139
140 if __name__ == "__main__":
141     unittest.main()