Move PAP database provider to spring boot default
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / startstop / PapActivator.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  *  Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
5  *  Modifications Copyright (C) 2021-2022 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.pap.main.startstop;
24
25 import java.util.List;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
31 import org.onap.policy.common.endpoints.event.comm.TopicListener;
32 import org.onap.policy.common.endpoints.event.comm.TopicSource;
33 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
34 import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
35 import org.onap.policy.common.parameters.ParameterService;
36 import org.onap.policy.common.utils.services.Registry;
37 import org.onap.policy.common.utils.services.ServiceManagerContainer;
38 import org.onap.policy.models.pap.concepts.PolicyNotification;
39 import org.onap.policy.models.pdp.concepts.PdpMessage;
40 import org.onap.policy.models.pdp.concepts.PdpStatus;
41 import org.onap.policy.models.pdp.enums.PdpMessageType;
42 import org.onap.policy.pap.main.PapConstants;
43 import org.onap.policy.pap.main.PolicyPapRuntimeException;
44 import org.onap.policy.pap.main.comm.PdpHeartbeatListener;
45 import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
46 import org.onap.policy.pap.main.comm.Publisher;
47 import org.onap.policy.pap.main.comm.TimerManager;
48 import org.onap.policy.pap.main.notification.PolicyNotifier;
49 import org.onap.policy.pap.main.parameters.PapParameterGroup;
50 import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
51 import org.onap.policy.pap.main.rest.PapStatisticsManager;
52 import org.springframework.context.event.ContextClosedEvent;
53 import org.springframework.context.event.ContextRefreshedEvent;
54 import org.springframework.context.event.EventListener;
55 import org.springframework.stereotype.Component;
56
57 /**
58  * This class activates Policy Administration (PAP) as a complete service together with all its controllers, listeners &
59  * handlers.
60  *
61  * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
62  */
63 @Component
64 public class PapActivator extends ServiceManagerContainer {
65     private static final String[] MSG_TYPE_NAMES = { "messageName" };
66     private static final String[] REQ_ID_NAMES = { "response", "responseTo" };
67
68     /**
69      * Max number of heat beats that can be missed before PAP removes a PDP.
70      */
71     private static final int MAX_MISSED_HEARTBEATS = 3;
72
73     private PapParameterGroup papParameterGroup;
74
75     /**
76      * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then dispatches them to
77      * {@link #responseReqIdDispatcher}.
78      */
79     private final MessageTypeDispatcher responseMsgDispatcher;
80     private final MessageTypeDispatcher heartbeatMsgDispatcher;
81
82     /**
83      * Listens for {@link PdpStatus} messages and then routes them to the listener associated with the ID of the
84      * originating request.
85      */
86     private final RequestIdDispatcher<PdpStatus> responseReqIdDispatcher;
87     private final RequestIdDispatcher<PdpStatus> heartbeatReqIdDispatcher;
88
89     /**
90      * Instantiate the activator for policy pap as a complete service.
91      *
92      * @param papParameterGroup the parameters for the pap service
93      */
94     public PapActivator(PapParameterGroup papParameterGroup, PolicyNotifier policyNotifier,
95         PdpHeartbeatListener pdpHeartbeatListener, PdpModifyRequestMap pdpModifyRequestMap) {
96         super("Policy PAP");
97         this.papParameterGroup = papParameterGroup;
98         TopicEndpointManager.getManager().addTopics(papParameterGroup.getTopicParameterGroup());
99
100         try {
101             this.responseMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
102             this.heartbeatMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
103             this.responseReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
104             this.heartbeatReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
105
106         } catch (final RuntimeException e) {
107             throw new PolicyPapRuntimeException(e);
108         }
109
110
111         final var pdpUpdateLock = new Object();
112         final var pdpParams = papParameterGroup.getPdpParameters();
113         final AtomicReference<Publisher<PdpMessage>> pdpPub = new AtomicReference<>();
114         final AtomicReference<Publisher<PolicyNotification>> notifyPub = new AtomicReference<>();
115         final AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>();
116         final AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>();
117         final AtomicReference<ScheduledExecutorService> pdpExpirationTimer = new AtomicReference<>();
118         final AtomicReference<PdpModifyRequestMap> requestMap = new AtomicReference<>();
119
120         // @formatter:off
121         addAction("PAP parameters",
122             () -> ParameterService.register(papParameterGroup),
123             () -> ParameterService.deregister(papParameterGroup.getName()));
124
125         addAction("Pdp Heartbeat Listener",
126             () -> heartbeatReqIdDispatcher.register(pdpHeartbeatListener),
127             () -> heartbeatReqIdDispatcher.unregister(pdpHeartbeatListener));
128
129         addAction("Response Request ID Dispatcher",
130             () -> responseMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.responseReqIdDispatcher),
131             () -> responseMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));
132
133         addAction("Heartbeat Request ID Dispatcher",
134             () -> heartbeatMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.heartbeatReqIdDispatcher),
135             () -> heartbeatMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));
136
137         addAction("Response Message Dispatcher",
138             () -> registerMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP),
139             () -> unregisterMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP));
140
141         addAction("Heartbeat Message Dispatcher",
142             () -> registerMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT),
143             () -> unregisterMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT));
144
145         addAction("topics",
146             TopicEndpointManager.getManager()::start,
147             TopicEndpointManager.getManager()::shutdown);
148
149         addAction("PAP statistics",
150             () -> Registry.register(PapConstants.REG_STATISTICS_MANAGER, new PapStatisticsManager()),
151             () -> Registry.unregister(PapConstants.REG_STATISTICS_MANAGER));
152
153         addAction("PAP Activator",
154             () -> Registry.register(PapConstants.REG_PAP_ACTIVATOR, this),
155             () -> Registry.unregister(PapConstants.REG_PAP_ACTIVATOR));
156
157         addAction("PDP publisher",
158             () -> {
159                 pdpPub.set(new Publisher<>(PapConstants.TOPIC_POLICY_PDP_PAP));
160                 startThread(pdpPub.get());
161             },
162             () -> pdpPub.get().stop());
163
164         addAction("Policy Notification publisher",
165             () -> {
166                 notifyPub.set(new Publisher<>(PapConstants.TOPIC_POLICY_NOTIFICATION));
167                 startThread(notifyPub.get());
168                 policyNotifier.setPublisher(notifyPub.get());
169             },
170             () -> notifyPub.get().stop());
171
172         addAction("PDP update timers",
173             () -> {
174                 pdpUpdTimers.set(new TimerManager("update", pdpParams.getUpdateParameters().getMaxWaitMs()));
175                 startThread(pdpUpdTimers.get());
176             },
177             () -> pdpUpdTimers.get().stop());
178
179         addAction("PDP state-change timers",
180             () -> {
181                 pdpStChgTimers.set(new TimerManager("state-change", pdpParams.getUpdateParameters().getMaxWaitMs()));
182                 startThread(pdpStChgTimers.get());
183             },
184             () -> pdpStChgTimers.get().stop());
185
186         addAction("PDP modification lock",
187             () -> Registry.register(PapConstants.REG_PDP_MODIFY_LOCK, pdpUpdateLock),
188             () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_LOCK));
189
190         addAction("PDP modification requests",
191             () -> {
192                 pdpModifyRequestMap.initialize(
193                     PdpModifyRequestMapParams.builder()
194                     .maxPdpAgeMs(MAX_MISSED_HEARTBEATS * pdpParams.getHeartBeatMs())
195                     .modifyLock(pdpUpdateLock)
196                     .params(pdpParams)
197                     .pdpPublisher(pdpPub.get())
198                     .responseDispatcher(responseReqIdDispatcher)
199                     .stateChangeTimers(pdpStChgTimers.get())
200                     .updateTimers(pdpUpdTimers.get())
201                     .savePdpStatistics(papParameterGroup.isSavePdpStatisticsInDb())
202                     .build());
203                 requestMap.set(pdpModifyRequestMap);
204                 Registry.register(PapConstants.REG_PDP_MODIFY_MAP, requestMap.get());
205             },
206             () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_MAP));
207
208         addAction("PDP expiration timer",
209             () -> {
210                 long frequencyMs = pdpParams.getHeartBeatMs();
211                 pdpExpirationTimer.set(Executors.newScheduledThreadPool(1));
212                 pdpExpirationTimer.get().scheduleWithFixedDelay(
213                     requestMap.get()::removeExpiredPdps,
214                     frequencyMs,
215                     frequencyMs,
216                     TimeUnit.MILLISECONDS);
217             },
218             () -> pdpExpirationTimer.get().shutdown());
219
220         // @formatter:on
221     }
222
223     /**
224      * Starts a background thread.
225      *
226      * @param runner function to run in the background
227      */
228     private void startThread(final Runnable runner) {
229         final var thread = new Thread(runner);
230         thread.setDaemon(true);
231
232         thread.start();
233     }
234
235     /**
236      * Get the parameters used by the activator.
237      *
238      * @return the parameters of the activator
239      */
240     public PapParameterGroup getParameterGroup() {
241         return papParameterGroup;
242     }
243
244     /**
245      * Registers the dispatcher with the topic source(s).
246      * @param dispatcher dispatcher to register
247      * @param topic topic of interest
248      */
249     private void registerMsgDispatcher(TopicListener dispatcher, String topic) {
250         for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) {
251             source.register(dispatcher);
252         }
253     }
254
255     /**
256      * Unregisters the dispatcher from the topic source(s).
257      * @param dispatcher dispatcher to unregister
258      * @param topic topic of interest
259      */
260     private void unregisterMsgDispatcher(TopicListener dispatcher, String topic) {
261         for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) {
262             source.unregister(dispatcher);
263         }
264     }
265
266     /**
267      * Handle ContextRefreshEvent.
268      *
269      * @param ctxRefreshedEvent the ContextRefreshedEvent
270      */
271     @EventListener
272     public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
273         if (!isAlive()) {
274             start();
275         }
276     }
277
278     /**
279      * Handle ContextClosedEvent.
280      *
281      * @param ctxClosedEvent the ContextClosedEvent
282      */
283     @EventListener
284     public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
285         if (isAlive()) {
286             stop();
287         }
288     }
289 }