Seed policysync container code
[dcaegen2/deployments.git] / dcae-services-policy-sync / tests / mocks.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16
17 from urllib.parse import urlsplit
18 import asyncio, aiohttp
19
20
21 class MockConfig:
22     def __init__(self):
23         self.check_period = 60
24         self.quiet_period = 0
25         self.bind = urlsplit("//localhost:8080")
26
27
28 class MockFileDumper:
29     def __init__(self):
30         self.closed = False
31
32     async def close(self):
33         self.closed = True
34
35
36 class MockInventory:
37     def __init__(self, queue=None):
38         self.was_updated = False
39         self.was_gathered = False
40         self.client = MockClient()
41         self.queue = queue
42         self.quiet = 0
43         self.updates = []
44         self.policy_filters = []
45         self.policy_ids = []
46
47     async def update(self):
48         self.was_updated = True
49         return True
50
51     async def gather(self):
52         self.was_gathered = True
53         print("got here GATHERED")
54         return True
55
56     async def close(self):
57         self.client.closed = True
58
59     async def check_and_update(self):
60         await self.update()
61
62     async def get_policy_content(self, action="UPDATED"):
63         self.updates.append(action)
64
65
66 class MockClient:
67     def __init__(self, raise_on_listpolicies=False, raise_on_getconfig=False):
68         self.closed = False
69         self.opened = False
70         self.raise_on_listpolicies = raise_on_listpolicies
71         self.raise_on_getconfig = raise_on_getconfig
72
73     async def close(self):
74         self.closed = True
75
76     async def notificationhandler(self, callback, ids=[], filters=[]):
77         await callback()
78
79     def supports_notifications(self):
80         return True
81
82     async def list_policies(self, filters=[], ids=[]):
83         if self.raise_on_listpolicies:
84             raise aiohttp.ClientError 
85
86         return set(
87             [
88                 "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml"
89             ]
90         )
91
92     async def get_config(self, filters=[], ids=[]):
93         if self.raise_on_getconfig:
94             raise aiohttp.ClientError
95
96         return [
97             {
98                 "policyConfigMessage": "Config Retrieved!",
99                 "policyConfigStatus": "CONFIG_RETRIEVED",
100                 "type": "JSON",
101                 "config": {
102                     "service": "DCAE_HighlandPark_AgingConfig",
103                     "location": " Edge",
104                     "uuid": "TestUUID",
105                     "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
106                     "configName": "DCAE_HighlandPark_AgingConfig",
107                     "templateVersion": "1607",
108                     "priority": "4",
109                     "version": 11.0,
110                     "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
111                     "riskType": "test",
112                     "riskLevel": "2",
113                     "guard": "False",
114                     "content": {
115                         "signature": {
116                             "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
117                         },
118                         "name": "testAging",
119                         "context": ["PROD"],
120                         "priority": 1,
121                         "prePublishAging": 40,
122                         "preCorrelationAging": 20,
123                     },
124                     "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
125                 },
126                 "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
127                 "policyType": "MicroService",
128                 "policyVersion": "78",
129                 "matchingConditions": {
130                     "ECOMPName": "DCAE",
131                     "ONAPName": "DCAE",
132                     "ConfigName": "DCAE_HighlandPark_AgingConfig",
133                     "service": "DCAE_HighlandPark_AgingConfig",
134                     "uuid": "TestUUID",
135                     "Location": " Edge",
136                 },
137                 "responseAttributes": {},
138                 "property": None,
139             },
140             {
141                 "policyConfigMessage": "Config Retrieved! ",
142                 "policyConfigStatus": "CONFIG_RETRIEVED",
143                 "type": "JSON",
144                 "config": "adlskjfadslkjf",
145                 "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
146                 "policyType": "MicroService",
147                 "policyVersion": "78",
148                 "matchingConditions": {
149                     "ECOMPName": "DCAE",
150                     "ONAPName": "DCAE",
151                     "ConfigName": "DCAE_HighlandPark_AgingConfig",
152                     "service": "DCAE_HighlandPark_AgingConfig",
153                     "uuid": "TestUUID",
154                     "Location": " Edge",
155                 },
156                 "responseAttributes": {},
157                 "property": None,
158             },
159         ]
160
161
162 class MockLoop:
163     def __init__(self):
164         self.stopped = False
165         self.handlers = []
166         self.tasks = []
167
168     def stop(self):
169         self.stopped = True
170
171     def add_signal_handler(self, signal, handler):
172         self.handlers.append(signal)
173
174     def create_task(self, task):
175         self.tasks.append(task)
176
177     def run_until_complete(self, task):
178         loop = asyncio.new_event_loop()
179         asyncio.set_event_loop(loop)
180         loop.run_until_complete(task)
181
182
183 class MockTask:
184     def __init__(self):
185         self.canceled = False
186
187     def cancel(self):
188         self.canceled = True
189
190     def __await__(self):
191         return iter([])