1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 import pytest, json, aiohttp, asyncio
18 from policysync.inventory import (
23 from tests.mocks import MockClient
27 def __init__(self, type, data):
33 def inventory(request, tmpdir):
34 f1 = tmpdir.mkdir("sub").join("myfile")
36 return Inventory(["DCAE.Config_MS_AGING_UVERSE_PROD_.*"], [], f1, MockClient())
41 async def test_close(self, inventory):
42 await inventory.close()
43 assert inventory.client.closed
46 async def test_get_policy_content(self, inventory):
47 await inventory.get_policy_content()
48 with open(inventory.file) as f:
51 assert data["policies"] == {
54 "policyConfigMessage": "Config Retrieved!",
55 "policyConfigStatus": "CONFIG_RETRIEVED",
58 "service": "DCAE_HighlandPark_AgingConfig",
61 "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
62 "configName": "DCAE_HighlandPark_AgingConfig",
63 "templateVersion": "1607",
66 "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
72 "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
77 "prePublishAging": 40,
78 "preCorrelationAging": 20,
80 "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
82 "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
83 "policyType": "MicroService",
84 "policyVersion": "78",
85 "matchingConditions": {
88 "ConfigName": "DCAE_HighlandPark_AgingConfig",
89 "service": "DCAE_HighlandPark_AgingConfig",
93 "responseAttributes": {},
97 "policyConfigMessage": "Config Retrieved! ",
98 "policyConfigStatus": "CONFIG_RETRIEVED",
100 "config": "adlskjfadslkjf",
101 "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
102 "policyType": "MicroService",
103 "policyVersion": "78",
104 "matchingConditions": {
107 "ConfigName": "DCAE_HighlandPark_AgingConfig",
108 "service": "DCAE_HighlandPark_AgingConfig",
112 "responseAttributes": {},
118 assert data["event"]["action"] == ACTION_UPDATED
121 async def test_update(self, inventory):
122 await inventory.update()
123 assert len(inventory.hp_active_inventory) == 1
125 assert not await inventory.update()
128 async def test_update_listpolicies_exception(self, inventory):
129 inventory.client.raise_on_listpolicies = True
130 assert not await inventory.update()
133 async def test_update_getconfig_exception(self, inventory):
134 inventory.client.raise_on_getconfig = True
135 await inventory.get_policy_content()
138 async def test_gather(self, inventory):
139 await inventory.gather()
141 # We should gather one policy
142 assert len(inventory.hp_active_inventory) == 1
144 # type in event should be gather
145 with open(inventory.file) as f:
148 assert data["event"]["action"] == ACTION_GATHERED
151 async def test_ws_text(self, inventory):
152 result = await inventory.check_and_update()
153 assert result == True