1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """ In memory data store for policies which are currently used by a mS """
23 from datetime import datetime
24 from .util import get_module_logger
26 logger = get_module_logger(__name__)
28 ACTION_GATHERED = "gathered"
29 ACTION_UPDATED = "updated"
34 """ In memory data store for policies which are currently used by a mS """
35 def __init__(self, filters, ids, outfile, client):
36 self.policy_filters = filters
38 self.hp_active_inventory = set()
39 self.get_lock = asyncio.Lock()
41 self.queue = asyncio.Queue()
44 async def gather(self):
46 Run at startup to gather an initial inventory of policies
48 return await self._sync_inventory(ACTION_GATHERED)
50 async def update(self):
52 Run to update an inventory of policies on the fly
54 return await self._sync_inventory(ACTION_UPDATED)
56 async def check_and_update(self):
57 """ check and update the policy inventory """
58 return await self.update()
60 async def close(self):
61 """ close the policy inventory and its associated client """
62 await self.client.close()
64 def _atomic_dump(self, data):
65 """ atomically dump the policy content to a file by rename """
67 temp_file = tempfile.NamedTemporaryFile(
69 dir=os.path.dirname(self.file),
70 prefix=os.path.basename(self.file),
76 # fsync the file so its on disk
78 os.fsync(temp_file.fileno())
82 os.rename(temp_file.name, os.path.abspath(self.file))
84 async def get_policy_content(self, action=ACTION_UPDATED):
86 get the policy content off the PDP
87 :param action: what action to present
88 :returns: True/False depending on if update was successful
90 logger.info("Starting policy update process...")
92 policy_bodies = await self.client.get_config(
93 filters=self.policy_filters, ids=self.policy_ids
95 except aiohttp.ClientError:
96 logger.exception('Conncection Error while connecting to PDP')
99 # match the format a bit of the Config Binding Service
101 "policies": {"items": policy_bodies},
104 "timestamp": (datetime.utcnow().isoformat()[:-3] + "Z"),
105 "update_id": str(uuid.uuid4()),
106 "policies_count": len(policy_bodies),
110 # Atomically dump the file to disk
112 x.get("policyName") for x in policy_bodies if "policyName" in x
115 if tmp != self.hp_active_inventory:
116 data = json.dumps(out)
117 loop = asyncio.get_event_loop()
118 await loop.run_in_executor(None, self._atomic_dump, data)
120 "Update complete. Policies dumped to: %s", self.file
122 self.hp_active_inventory = tmp
125 logger.info("No updates needed for now")
128 async def _sync_inventory(self, action):
130 Pull an inventory of policies. Commit changes if there is a change.
131 return: boolean to represent whether changes were commited
134 pdp_inventory = await self.client.list_policies(
135 filters=self.policy_filters, ids=self.policy_ids
137 except aiohttp.ClientError:
138 logger.exception("Inventory sync failed due to a connection error")
141 logger.debug("pdp_inventory -> %s", pdp_inventory)
143 # Below needs to be under a lock because of
144 # the call to getConfig being awaited.
145 async with self.get_lock:
146 if self.hp_active_inventory != pdp_inventory or \
147 pdp_inventory is None:
149 # Log a delta of what has changed related to this policy update
150 if pdp_inventory is not None and \
151 self.hp_active_inventory is not None:
154 self.hp_active_inventory - pdp_inventory
157 pdp_inventory - self.hp_active_inventory
161 "PDP indicates the following changes: %s ", msg
164 return await self.get_policy_content(action)
167 "local matches pdp. no update required for now"