1 # ============LICENSE_START=======================================================
2 # Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 import pytest, json, sys, asyncio, signal
18 from tests.mocks import (
26 from policysync.coroutines import (
34 import policysync.coroutines as coroutines
37 async def test_shutdownhandler():
41 inventory = MockInventory()
43 await shutdown( loop, tasks, inventory)
45 # Assert that a shutdown results in all tasks in the loop being canceled
49 # ... And the the PDP client is closed
50 assert inventory.client.closed
52 # ... And that the event loop is stopped
56 async def test_periodic():
57 inventory = MockInventory()
58 await periodic_task(inventory, 1)
59 assert inventory.was_updated
63 inventory = MockInventory()
64 await notify_task(inventory, 1)
65 assert inventory.was_updated
68 async def test_task_runner():
70 if should_run.counter == 0:
71 should_run.counter += 1
76 should_run.counter = 0
78 def mocktask(inventory):
81 await task_runner(MockInventory(), 1, mocktask, should_run)
84 async def test_task_runner_cancel():
86 if should_run.counter == 0:
87 should_run.counter += 1
89 elif should_run.counter == 1:
90 # If we get here then fail the test
91 assert False, "Task runner should have broken out of loop before this"
94 should_run.counter = 0
96 # We create a mock task that raises a cancellation error (sent when a asyncio task is canceled)
97 def mocktask(inventory, sleep):
98 raise asyncio.CancelledError
100 await task_runner(MockInventory(), 1, mocktask, should_run)
103 def test_setup_coroutines():
106 def fake_task_runner(inventory, sleep, task, should_run):
109 def fake_shutdown(sig, loop, tasks, client):
112 def fake_metrics_server(port, addr=None):
113 fake_metrics_server.started = True
115 fake_metrics_server.started = False
117 inventory = MockInventory()
118 client = MockClient()
119 config = MockConfig()
126 metrics_server=fake_metrics_server,
127 check_period=config.check_period,
131 # By the end of setup coroutines we should have...
133 # Gathered initial set of policies
134 assert inventory.was_gathered
136 # started the websocket and periodic task running
137 assert (SLEEP_ON_ERROR, notify_task) in loop.tasks
138 assert (config.check_period, periodic_task) in loop.tasks
140 # Signal handlers for SIGINT and SIGTERM
141 assert signal.SIGINT in loop.handlers
142 assert signal.SIGTERM in loop.handlers