1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 Asyncio coroutine setup for both periodic and real time notification tasks """
20 from prometheus_client import start_http_server
21 from .inventory import Inventory
22 from .util import get_module_logger
25 logger = get_module_logger(__name__)
28 async def notify_task(inventory, sleep):
30 start the notification task
31 :param inventory: Inventory
32 :param sleep: how long to wait on error in seconds
35 logger.info("opening notificationhandler for policy...")
36 await inventory.client.notificationhandler(
37 inventory.check_and_update,
38 ids=inventory.policy_ids,
39 filters=inventory.policy_filters,
41 logger.warning("websocket closed or errored...will attempt reconnection")
42 await asyncio.sleep(sleep)
45 async def periodic_task(inventory, sleep):
47 start the periodic task
48 :param inventory: Inventory
49 :param sleep: how long to wait between periodic checks
51 await asyncio.sleep(sleep)
52 logger.info("Executing periodic check of PDP policies")
53 await inventory.update()
56 async def task_runner(inventory, sleep, task, should_run):
58 Runs a task in an event loop
59 :param inventory: Inventory
60 :param sleep: how long to wait between loop iterations
61 :param task: coroutine to run
62 :param should_run: function for should this task continue to run
64 # pylint: disable=broad-except
67 await task(inventory, sleep)
68 except asyncio.CancelledError:
71 logger.exception("Received exception")
74 async def shutdown(loop, tasks, inventory):
76 shutdown the event loop and cancel all tasks
77 :param loop: Asyncio eventloop
78 :param tasks: list of asyncio tasks
79 :param inventory: the inventory object
82 logger.info("caught signal")
83 # Stop the websocket routines
89 await inventory.close()
93 def _setup_coroutines(
100 """ sets up the application coroutines"""
101 # Task runner takes a function for stop condition
102 # (for testing purposes) but should always run in practice
103 # pylint: disable=broad-except
104 def infinite_condition():
107 logger.info("Starting gather of all policies...")
109 loop.run_until_complete(inventory.gather())
111 logger.exception('received exception on initial gather')
113 # websocket and the periodic check of policies
118 kwargs.get('check_period', 2400),
125 if inventory.client.supports_notifications():
138 "Defaulting to polling... Provide a dmaap url to receive faster updates"
141 # Add shutdown handlers for sigint and sigterm
142 for signame in ("SIGINT", "SIGTERM"):
143 sig = getattr(signal, signame)
144 loop.add_signal_handler(
146 lambda: asyncio.ensure_future(
147 shutdown_handler(loop, tasks, inventory)
151 # Start prometheus server daemonthread for metrics/healthchecking
153 metrics_server = kwargs.get('metrics_server', start_http_server)
154 metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname)
157 def start_event_loop(config):
159 start the event loop that runs the application
160 :param config: Config object for the application
162 loop = asyncio.get_event_loop()
163 inventory = Inventory(
175 metrics_server=start_http_server,
177 check_period=config.check_period
182 logger.info("shutdown complete")