555f2e81cfae56d1c01e45f3f74100a669383c10
[optf/osdf.git] / test / apps / nxi_termination / test_remote_opt_processor_termination.py
1 # -------------------------------------------------------------------------
2 #   Copyright (C) 2020 Wipro Limited.
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #       http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 #
16 # -------------------------------------------------------------------------
17 #
18
19 import unittest
20 from unittest.mock import patch
21 import osdf.config.loader as config_loader
22 import pytest
23 from apps.nxi_termination.optimizers.remote_opt_processor import process_nxi_termination_opt
24 from osdf.adapters.aai.fetch_aai_data import AAIException
25
26 from osdf.config.base import osdf_config
27 from osdf.utils.programming_utils import DotDict
28 from osdf.utils.interfaces import json_from_file
29
30 class TestRemoteOptProcessor(unittest.TestCase):
31     def setUp(self):
32         self.config_spec = {
33             "deployment": "config/osdf_config.yaml",
34             "core": "config/common_config.yaml"
35         }
36         self.osdf_config = DotDict(config_loader.all_configs(**self.config_spec))
37
38     def tearDown(self):
39
40         patch.stopall()
41
42     def test_process_nxi_termination_opt(self):
43         main_dir = ""
44         request_file = main_dir + 'test/apps/nxi_termination/nxi_termination.json'
45         nssi_request_file=main_dir + 'test/apps/nxi_termination/nssi_termination.json'
46         service_file = main_dir + 'test/apps/nxi_termination/service_profiles.json'
47         failure_service_file = main_dir + 'test/apps/nxi_termination/failure_service_profiles.json'
48         failure_service_file2 = main_dir + 'test/apps/nxi_termination/failure_service_profiles2.json'
49         nsi_success=main_dir + 'test/apps/nxi_termination/nsi_success_output.json'
50         nxi_failure1 = main_dir + 'test/apps/nxi_termination/nxi_failure_output1.json'
51         nxi_failure2 = main_dir + 'test/apps/nxi_termination/nxi_failure_output2.json'
52         nssi_failure = main_dir + 'test/apps/nxi_termination/nssi_failure_output.json'
53         success_rel_file = main_dir + 'test/apps/nxi_termination/success_relationship_list.json'
54         failure_rel_file1 = main_dir + 'test/apps/nxi_termination/failure_relationship_list.json'
55         failure_rel_file2 = main_dir + 'test/apps/nxi_termination/failure_relationship_list2.json'
56         exception_response_file1 = main_dir + 'test/apps/nxi_termination/exception_response1.json'
57         request_json=json_from_file(request_file)
58         nssi_request_json = json_from_file(nssi_request_file)
59         service_profile_json = json_from_file(service_file)
60         failure_service_profile_json = json_from_file(failure_service_file)
61         failure_service_profile_json2 = json_from_file(failure_service_file2)
62         success_rel_json=json_from_file(success_rel_file)
63         failure_rel_json = json_from_file(failure_rel_file1)
64         failure_rel_json2 = json_from_file(failure_rel_file2)
65         success_output_json=json_from_file(nsi_success)
66         nxi_failure_output_json1 = json_from_file(nxi_failure1)
67         nxi_failure_output_json2 = json_from_file(nxi_failure2)
68         nssi_failure_output_json = json_from_file(nssi_failure)
69         exception_response_json1 = json_from_file(exception_response_file1)
70
71         #nsi success scenario
72         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_allotted_resources', return_value=success_rel_json)
73         self.Mock_req = self.patcher_req.start()
74         self.assertEquals(success_output_json, process_nxi_termination_opt(request_json, osdf_config))
75         self.patcher_req.stop()
76
77         #nsi failure scenario
78         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_allotted_resources', return_value=failure_rel_json)
79         self.Mock_req = self.patcher_req.start()
80         self.assertEquals(nxi_failure_output_json1, process_nxi_termination_opt(request_json, osdf_config))
81         self.patcher_req.stop()
82
83         request_json["requestInfo"]["addtnlArgs"] = {}
84
85         #nsi success scenario
86         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_allotted_resources',
87                                  return_value=[])
88         self.Mock_req = self.patcher_req.start()
89         self.assertEquals(success_output_json, process_nxi_termination_opt(request_json, osdf_config))
90         self.patcher_req.stop()
91
92         # #
93         # nssi success scenario
94         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count', return_value=1)
95         self.Mock_req = self.patcher_req.start()
96         self.assertEquals(success_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
97         self.patcher_req.stop()
98
99         # nssi failure scenario
100         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
101                                  return_value=2)
102         self.Mock_req = self.patcher_req.start()
103         self.assertEquals(nssi_failure_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
104         self.patcher_req.stop()
105
106         nssi_request_json["requestInfo"]["addtnlArgs"] = {}
107
108         # nssi success scenario
109         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
110                                  return_value=0)
111         self.Mock_req = self.patcher_req.start()
112         self.assertEquals(success_output_json, process_nxi_termination_opt(nssi_request_json, osdf_config))
113         self.patcher_req.stop()
114
115         # nssi failure scenario
116         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
117                                  return_value=1)
118         self.Mock_req = self.patcher_req.start()
119         self.assertEquals(nxi_failure_output_json2, process_nxi_termination_opt(nssi_request_json, osdf_config))
120         self.patcher_req.stop()
121
122         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
123                                  side_effect=AAIException("Error response recieved from AAI for the request"))
124         self.Mock_req = self.patcher_req.start()
125         self.assertEquals("failure", process_nxi_termination_opt(nssi_request_json, osdf_config).get('requestStatus'))
126         self.patcher_req.stop()
127
128         self.patcher_req = patch('apps.nxi_termination.optimizers.remote_opt_processor.get_resource_count',
129                                  side_effect=AAIException("Request exception was encountered"))
130         self.Mock_req = self.patcher_req.start()
131         self.assertEquals("failure", process_nxi_termination_opt(nssi_request_json, osdf_config).get('requestStatus'))
132         self.patcher_req.stop()
133
134
135 if __name__ == "__main__":
136     unittest.main()