Seed policysync container code
[dcaegen2/deployments.git] / dcae-services-policy-sync / policysync / coroutines.py
1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 """
17 Asyncio coroutine setup for both periodic and real time notification tasks """
18 import signal
19 import asyncio
20 from prometheus_client import start_http_server
21 from .inventory import Inventory
22 from .util import get_module_logger
23
24 SLEEP_ON_ERROR = 10
25 logger = get_module_logger(__name__)
26
27
28 async def notify_task(inventory, sleep):
29     """
30     start the notification task
31     :param inventory: Inventory
32     :param sleep: how long to wait on error in seconds
33     """
34
35     logger.info("opening notificationhandler for policy...")
36     await inventory.client.notificationhandler(
37         inventory.check_and_update,
38         ids=inventory.policy_ids,
39         filters=inventory.policy_filters,
40     )
41     logger.warning("websocket closed or errored...will attempt reconnection")
42     await asyncio.sleep(sleep)
43
44
45 async def periodic_task(inventory, sleep):
46     """
47     start the periodic task
48     :param inventory: Inventory
49     :param sleep: how long to wait between periodic checks
50     """
51     await asyncio.sleep(sleep)
52     logger.info("Executing periodic check of PDP policies")
53     await inventory.update()
54
55
56 async def task_runner(inventory, sleep, task, should_run):
57     """
58     Runs a task in an event loop
59     :param inventory: Inventory
60     :param sleep: how long to wait between loop iterations
61     :param task: coroutine to run
62     :param should_run: function for should this task continue to run
63     """
64     # pylint: disable=broad-except
65     while should_run():
66         try:
67             await task(inventory, sleep)
68         except asyncio.CancelledError:
69             break
70         except Exception:
71             logger.exception("Received exception")
72
73
74 async def shutdown(loop, tasks, inventory):
75     """
76     shutdown the event loop and cancel all tasks
77     :param loop: Asyncio eventloop
78     :param tasks: list of asyncio tasks
79     :param inventory: the inventory object
80     """
81
82     logger.info("caught signal")
83     # Stop the websocket routines
84     for task in tasks:
85         task.cancel()
86         await task
87
88     # Close the client
89     await inventory.close()
90     loop.stop()
91
92
93 def _setup_coroutines(
94     loop,
95     inventory,
96     shutdown_handler,
97     task_r,
98     **kwargs
99 ):
100     """ sets up the application coroutines"""
101     # Task runner takes a function for stop condition
102     # (for testing purposes) but should always run in practice
103     # pylint: disable=broad-except
104     def infinite_condition():
105         return True
106
107     logger.info("Starting gather of all policies...")
108     try:
109         loop.run_until_complete(inventory.gather())
110     except Exception:
111         logger.exception('received exception on initial gather')
112
113     # websocket and the periodic check of policies
114     tasks = [
115         loop.create_task(
116             task_r(
117                 inventory,
118                 kwargs.get('check_period', 2400),
119                 periodic_task,
120                 infinite_condition
121             )
122         )
123     ]
124
125     if inventory.client.supports_notifications():
126         tasks.append(
127             loop.create_task(
128                 task_r(
129                     inventory,
130                     SLEEP_ON_ERROR,
131                     notify_task,
132                     infinite_condition
133                 )
134             )
135         )
136     else:
137         logger.warning(
138             "Defaulting to polling... Provide a dmaap url to receive faster updates"
139         )
140
141     # Add shutdown handlers for sigint and sigterm
142     for signame in ("SIGINT", "SIGTERM"):
143         sig = getattr(signal, signame)
144         loop.add_signal_handler(
145             sig,
146             lambda: asyncio.ensure_future(
147                 shutdown_handler(loop, tasks, inventory)
148             ),
149         )
150
151     # Start prometheus server daemonthread for metrics/healthchecking
152     if 'bind' in kwargs:
153         metrics_server = kwargs.get('metrics_server', start_http_server)
154         metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname)
155
156
157 def start_event_loop(config):
158     """
159     start the event loop that runs the application
160     :param config: Config object for the application
161     """
162     loop = asyncio.get_event_loop()
163     inventory = Inventory(
164         config.filters,
165         config.ids,
166         config.out_file,
167         config.client
168     )
169
170     _setup_coroutines(
171         loop,
172         inventory,
173         shutdown,
174         task_runner,
175         metrics_server=start_http_server,
176         bind=config.bind,
177         check_period=config.check_period
178     )
179
180     loop.run_forever()
181     loop.close()
182     logger.info("shutdown complete")