1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 from aiohttp import web, WSMsgType
18 import json, pytest, re
19 from policysync.clients import (
20 PolicyClientV0 as PolicyClient,
25 async def listpolicy(request):
26 return web.json_response(["hello"])
29 async def getconfig(request):
32 "policyConfigMessage": "Config Retrieved!",
33 "policyConfigStatus": "CONFIG_RETRIEVED",
35 "config": '{"service":"DCAE_HighlandPark_AgingConfig","location":" Edge","uuid":"TestUUID","policyName":"DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging","configName":"DCAE_HighlandPark_AgingConfig","templateVersion":"1607","priority":"4","version":11.0,"policyScope":"resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8","riskType":"test","riskLevel":"2","guard":"False","content":{"signature":{"filter_clause":"event.faultFields.alarmCondition LIKE(\'%chrisluckenbaugh%\')"},"name":"testAging","context":["PROD"],"priority":1,"prePublishAging":40,"preCorrelationAging":20},"policyNameWithPrefix":"DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging"}',
36 "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
37 "policyType": "MicroService",
38 "policyVersion": "78",
39 "matchingConditions": {
42 "ConfigName": "DCAE_HighlandPark_AgingConfig",
43 "service": "DCAE_HighlandPark_AgingConfig",
47 "responseAttributes": {},
51 "policyConfigMessage": "Config Retrieved! ",
52 "policyConfigStatus": "CONFIG_RETRIEVED",
54 "config": "adlskjfadslkjf",
55 "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
56 "policyType": "MicroService",
57 "policyVersion": "78",
58 "matchingConditions": {
61 "ConfigName": "DCAE_HighlandPark_AgingConfig",
62 "service": "DCAE_HighlandPark_AgingConfig",
66 "responseAttributes": {},
71 return web.json_response(j)
74 async def wshandler(request):
75 resp = web.WebSocketResponse()
76 available = resp.can_prepare(request)
77 await resp.prepare(request)
78 await resp.send_str('{ "loadedPolicies": [{ "policyName": "bar"}] }')
79 await resp.send_bytes(b"bar!!!")
80 await resp.close("closed")
84 def policyclient(aiohttp_client, loop):
85 app = web.Application()
86 app.router.add_route("POST", "/pdp/api/listPolicy", listpolicy)
87 app.router.add_route("POST", "/pdp/api/getConfig", getconfig)
88 app.router.add_get("/pdp/notifications", wshandler)
89 fake_client = loop.run_until_complete(aiohttp_client(app))
90 server = "{}://{}:{}".format("http", fake_client.host, fake_client.port)
91 return PolicyClient({}, server)
94 async def test_listpolicies(policyclient):
95 j = await policyclient.list_policies(filters=["bar"])
96 assert j == set(["hello"])
97 await policyclient.close()
98 assert policyclient.session.closed
101 async def test_getconfig(policyclient):
102 j = await policyclient.get_config(filters=["bar"])
106 "policyConfigMessage": "Config Retrieved!",
107 "policyConfigStatus": "CONFIG_RETRIEVED",
110 "service": "DCAE_HighlandPark_AgingConfig",
113 "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
114 "configName": "DCAE_HighlandPark_AgingConfig",
115 "templateVersion": "1607",
118 "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
124 "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
129 "prePublishAging": 40,
130 "preCorrelationAging": 20,
132 "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
134 "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
135 "policyType": "MicroService",
136 "policyVersion": "78",
137 "matchingConditions": {
140 "ConfigName": "DCAE_HighlandPark_AgingConfig",
141 "service": "DCAE_HighlandPark_AgingConfig",
145 "responseAttributes": {},
149 "policyConfigMessage": "Config Retrieved! ",
150 "policyConfigStatus": "CONFIG_RETRIEVED",
152 "config": "adlskjfadslkjf",
153 "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
154 "policyType": "MicroService",
155 "policyVersion": "78",
156 "matchingConditions": {
159 "ConfigName": "DCAE_HighlandPark_AgingConfig",
160 "service": "DCAE_HighlandPark_AgingConfig",
164 "responseAttributes": {},
168 await policyclient.close()
171 async def test_supports_notifications(policyclient):
172 assert policyclient.supports_notifications()
175 async def test_needs_update(policyclient):
176 assert policyclient._needs_update(
177 {"loadedPolicies": [{"policyName": "bar"}]}, [], ["bar"]
179 assert not policyclient._needs_update(
180 {"loadedPolicies": [{"policyName": "bar"}]}, [], ["foo"]
184 async def test_ws(policyclient):
185 async def ws_callback():
188 await policyclient.notificationhandler(ws_callback, filters=["bar"])
189 await policyclient.close()
191 assert policyclient.ws_session.closed