Seed policysync container code
[dcaegen2/deployments.git] / dcae-services-policy-sync / tests / test_client_v0.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16
17 from aiohttp import web, WSMsgType
18 import json, pytest, re
19 from policysync.clients import (
20     PolicyClientV0 as PolicyClient,
21     WS_HEARTBEAT
22 )
23
24
25 async def listpolicy(request):
26     return web.json_response(["hello"])
27
28
29 async def getconfig(request):
30     j = [
31         {
32             "policyConfigMessage": "Config Retrieved!",
33             "policyConfigStatus": "CONFIG_RETRIEVED",
34             "type": "JSON",
35             "config": '{"service":"DCAE_HighlandPark_AgingConfig","location":" Edge","uuid":"TestUUID","policyName":"DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging","configName":"DCAE_HighlandPark_AgingConfig","templateVersion":"1607","priority":"4","version":11.0,"policyScope":"resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8","riskType":"test","riskLevel":"2","guard":"False","content":{"signature":{"filter_clause":"event.faultFields.alarmCondition LIKE(\'%chrisluckenbaugh%\')"},"name":"testAging","context":["PROD"],"priority":1,"prePublishAging":40,"preCorrelationAging":20},"policyNameWithPrefix":"DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging"}',
36             "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
37             "policyType": "MicroService",
38             "policyVersion": "78",
39             "matchingConditions": {
40                 "ECOMPName": "DCAE",
41                 "ONAPName": "DCAE",
42                 "ConfigName": "DCAE_HighlandPark_AgingConfig",
43                 "service": "DCAE_HighlandPark_AgingConfig",
44                 "uuid": "TestUUID",
45                 "Location": " Edge",
46             },
47             "responseAttributes": {},
48             "property": None,
49         },
50         {
51             "policyConfigMessage": "Config Retrieved! ",
52             "policyConfigStatus": "CONFIG_RETRIEVED",
53             "type": "JSON",
54             "config": "adlskjfadslkjf",
55             "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
56             "policyType": "MicroService",
57             "policyVersion": "78",
58             "matchingConditions": {
59                 "ECOMPName": "DCAE",
60                 "ONAPName": "DCAE",
61                 "ConfigName": "DCAE_HighlandPark_AgingConfig",
62                 "service": "DCAE_HighlandPark_AgingConfig",
63                 "uuid": "TestUUID",
64                 "Location": " Edge",
65             },
66             "responseAttributes": {},
67             "property": None,
68         },
69     ]
70
71     return web.json_response(j)
72
73
74 async def wshandler(request):
75     resp = web.WebSocketResponse()
76     available = resp.can_prepare(request)
77     await resp.prepare(request)
78     await resp.send_str('{ "loadedPolicies": [{ "policyName": "bar"}] }')
79     await resp.send_bytes(b"bar!!!")
80     await resp.close("closed")
81
82
83 @pytest.fixture
84 def policyclient(aiohttp_client, loop):
85     app = web.Application()
86     app.router.add_route("POST", "/pdp/api/listPolicy", listpolicy)
87     app.router.add_route("POST", "/pdp/api/getConfig", getconfig)
88     app.router.add_get("/pdp/notifications", wshandler)
89     fake_client = loop.run_until_complete(aiohttp_client(app))
90     server = "{}://{}:{}".format("http", fake_client.host, fake_client.port)
91     return PolicyClient({}, server)
92
93
94 async def test_listpolicies(policyclient):
95     j = await policyclient.list_policies(filters=["bar"])
96     assert j == set(["hello"])
97     await policyclient.close()
98     assert policyclient.session.closed
99
100
101 async def test_getconfig(policyclient):
102     j = await policyclient.get_config(filters=["bar"])
103
104     assert j == [
105         {
106             "policyConfigMessage": "Config Retrieved!",
107             "policyConfigStatus": "CONFIG_RETRIEVED",
108             "type": "JSON",
109             "config": {
110                 "service": "DCAE_HighlandPark_AgingConfig",
111                 "location": " Edge",
112                 "uuid": "TestUUID",
113                 "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
114                 "configName": "DCAE_HighlandPark_AgingConfig",
115                 "templateVersion": "1607",
116                 "priority": "4",
117                 "version": 11.0,
118                 "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
119                 "riskType": "test",
120                 "riskLevel": "2",
121                 "guard": "False",
122                 "content": {
123                     "signature": {
124                         "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
125                     },
126                     "name": "testAging",
127                     "context": ["PROD"],
128                     "priority": 1,
129                     "prePublishAging": 40,
130                     "preCorrelationAging": 20,
131                 },
132                 "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
133             },
134             "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
135             "policyType": "MicroService",
136             "policyVersion": "78",
137             "matchingConditions": {
138                 "ECOMPName": "DCAE",
139                 "ONAPName": "DCAE",
140                 "ConfigName": "DCAE_HighlandPark_AgingConfig",
141                 "service": "DCAE_HighlandPark_AgingConfig",
142                 "uuid": "TestUUID",
143                 "Location": " Edge",
144             },
145             "responseAttributes": {},
146             "property": None,
147         },
148         {
149             "policyConfigMessage": "Config Retrieved! ",
150             "policyConfigStatus": "CONFIG_RETRIEVED",
151             "type": "JSON",
152             "config": "adlskjfadslkjf",
153             "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
154             "policyType": "MicroService",
155             "policyVersion": "78",
156             "matchingConditions": {
157                 "ECOMPName": "DCAE",
158                 "ONAPName": "DCAE",
159                 "ConfigName": "DCAE_HighlandPark_AgingConfig",
160                 "service": "DCAE_HighlandPark_AgingConfig",
161                 "uuid": "TestUUID",
162                 "Location": " Edge",
163             },
164             "responseAttributes": {},
165             "property": None,
166         },
167     ]
168     await policyclient.close()
169
170
171 async def test_supports_notifications(policyclient):
172     assert policyclient.supports_notifications()
173
174
175 async def test_needs_update(policyclient):
176     assert policyclient._needs_update(
177         {"loadedPolicies": [{"policyName": "bar"}]}, [], ["bar"] 
178     )
179     assert not policyclient._needs_update(
180         {"loadedPolicies": [{"policyName": "bar"}]}, [], ["foo"]
181     )
182
183
184 async def test_ws(policyclient):
185     async def ws_callback():
186         assert True
187
188     await policyclient.notificationhandler(ws_callback, filters=["bar"])
189     await policyclient.close()
190
191     assert policyclient.ws_session.closed