Seed policysync container code
[dcaegen2/deployments.git] / dcae-services-policy-sync / policysync / inventory.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """ In memory data store for policies which are currently used by a mS """
17 import asyncio
18 import json
19 import uuid
20 import os
21 import tempfile
22 import aiohttp
23 from datetime import datetime
24 from .util import get_module_logger
25
26 logger = get_module_logger(__name__)
27
28 ACTION_GATHERED = "gathered"
29 ACTION_UPDATED = "updated"
30 OUTFILE_INDENT = 4
31
32
33 class Inventory:
34     """ In memory data store for policies which are currently used by a mS """
35     def __init__(self, filters, ids, outfile, client):
36         self.policy_filters = filters
37         self.policy_ids = ids
38         self.hp_active_inventory = set()
39         self.get_lock = asyncio.Lock()
40         self.file = outfile
41         self.queue = asyncio.Queue()
42         self.client = client
43
44     async def gather(self):
45         """
46         Run at startup to gather an initial inventory of policies
47         """
48         return await self._sync_inventory(ACTION_GATHERED)
49
50     async def update(self):
51         """
52         Run to update an inventory of policies on the fly
53         """
54         return await self._sync_inventory(ACTION_UPDATED)
55
56     async def check_and_update(self):
57         """ check and update the policy inventory """
58         return await self.update()
59
60     async def close(self):
61         """ close the policy inventory and its associated client """
62         await self.client.close()
63
64     def _atomic_dump(self, data):
65         """ atomically dump the policy content to a file by rename """
66         try:
67             temp_file = tempfile.NamedTemporaryFile(
68                 delete=False,
69                 dir=os.path.dirname(self.file),
70                 prefix=os.path.basename(self.file),
71                 mode="w",
72             )
73             try:
74                 temp_file.write(data)
75             finally:
76                 # fsync the file so its on disk
77                 temp_file.flush()
78                 os.fsync(temp_file.fileno())
79         finally:
80             temp_file.close()
81
82         os.rename(temp_file.name, os.path.abspath(self.file))
83
84     async def get_policy_content(self, action=ACTION_UPDATED):
85         """
86         get the policy content off the PDP
87         :param action: what action to present
88         :returns: True/False depending on if update was successful
89         """
90         logger.info("Starting policy update process...")
91         try:
92             policy_bodies = await self.client.get_config(
93                 filters=self.policy_filters, ids=self.policy_ids
94             )
95         except aiohttp.ClientError:
96             logger.exception('Conncection Error while connecting to PDP')
97             return False
98         
99         # match the format a bit of the Config Binding Service
100         out = {
101             "policies": {"items": policy_bodies},
102             "event": {
103                 "action": action,
104                 "timestamp": (datetime.utcnow().isoformat()[:-3] + "Z"),
105                 "update_id": str(uuid.uuid4()),
106                 "policies_count": len(policy_bodies),
107             },
108         }
109
110         # Atomically dump the file to disk
111         tmp = {
112             x.get("policyName") for x in policy_bodies if "policyName" in x
113         }
114
115         if tmp != self.hp_active_inventory:
116             data = json.dumps(out)
117             loop = asyncio.get_event_loop()
118             await loop.run_in_executor(None, self._atomic_dump, data)
119             logger.info(
120                 "Update complete. Policies dumped to: %s", self.file
121             )
122             self.hp_active_inventory = tmp
123             return True
124         else:
125             logger.info("No updates needed for now")
126             return False
127
128     async def _sync_inventory(self, action):
129         """
130         Pull an inventory of policies. Commit changes if there is a change.
131         return: boolean to represent whether changes were commited
132         """
133         try:
134             pdp_inventory = await self.client.list_policies(
135                 filters=self.policy_filters, ids=self.policy_ids
136             )
137         except aiohttp.ClientError:
138             logger.exception("Inventory sync failed due to a connection error")
139             return False
140
141         logger.debug("pdp_inventory -> %s", pdp_inventory)
142
143         # Below needs to be under a lock because of
144         # the call to getConfig being awaited.
145         async with self.get_lock:
146             if self.hp_active_inventory != pdp_inventory or \
147                  pdp_inventory is None:
148
149                 # Log a delta of what has changed related to this policy update
150                 if pdp_inventory is not None and \
151                      self.hp_active_inventory is not None:
152                     msg = {
153                             "removed": list(
154                                 self.hp_active_inventory - pdp_inventory
155                             ),
156                             "added": list(
157                                 pdp_inventory - self.hp_active_inventory
158                             ),
159                     }
160                     logger.info(
161                         "PDP indicates the following changes: %s ", msg
162                     )
163
164                 return await self.get_policy_content(action)
165
166             logger.info(
167                 "local matches pdp. no update required for now"
168             )
169             return False