Add components for PDP communication
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / startstop / PapActivator.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  *  Modifications Copyright (C) 2019 AT&T Intellectual Property.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.pap.main.startstop;
23
24 import java.util.Arrays;
25 import java.util.Properties;
26 import java.util.concurrent.atomic.AtomicReference;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
28 import org.onap.policy.common.endpoints.event.comm.TopicSource;
29 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
30 import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
31 import org.onap.policy.common.parameters.ParameterService;
32 import org.onap.policy.common.utils.services.Registry;
33 import org.onap.policy.common.utils.services.ServiceManagerContainer;
34 import org.onap.policy.models.pdp.concepts.PdpStatus;
35 import org.onap.policy.models.pdp.enums.PdpMessageType;
36 import org.onap.policy.pap.main.PapConstants;
37 import org.onap.policy.pap.main.PolicyPapRuntimeException;
38 import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
39 import org.onap.policy.pap.main.comm.Publisher;
40 import org.onap.policy.pap.main.comm.TimerManager;
41 import org.onap.policy.pap.main.parameters.PapParameterGroup;
42 import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
43 import org.onap.policy.pap.main.parameters.PdpParameters;
44 import org.onap.policy.pap.main.rest.PapRestServer;
45 import org.onap.policy.pap.main.rest.PapStatisticsManager;
46
47 /**
48  * This class wraps a distributor so that it can be activated as a complete service
49  * together with all its pap and forwarding handlers.
50  *
51  * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
52  */
53 public class PapActivator extends ServiceManagerContainer {
54     private static final String[] MSG_TYPE_NAMES = {"messageName"};
55     private static final String[] REQ_ID_NAMES = {"response", "responseTo"};
56
57     private final PapParameterGroup papParameterGroup;
58
59     /**
60      * The PAP REST API server.
61      */
62     private PapRestServer restServer;
63
64     /**
65      * Listens for messages on the topic, decodes them into a {@link PdpStatus} message,
66      * and then dispatches them to {@link #reqIdDispatcher}.
67      */
68     private final MessageTypeDispatcher msgDispatcher;
69
70     /**
71      * Listens for {@link PdpStatus} messages and then routes them to the listener
72      * associated with the ID of the originating request.
73      */
74     private final RequestIdDispatcher<PdpStatus> reqIdDispatcher;
75
76     /**
77      * Instantiate the activator for policy pap as a complete service.
78      *
79      * @param papParameterGroup the parameters for the pap service
80      * @param topicProperties properties used to configure the topics
81      */
82     public PapActivator(final PapParameterGroup papParameterGroup, Properties topicProperties) {
83         super("Policy PAP");
84
85         TopicEndpoint.manager.addTopicSinks(topicProperties);
86         TopicEndpoint.manager.addTopicSources(topicProperties);
87
88         try {
89             this.papParameterGroup = papParameterGroup;
90             this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
91             this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
92
93         } catch (RuntimeException e) {
94             throw new PolicyPapRuntimeException(e);
95         }
96
97         papParameterGroup.getRestServerParameters().setName(papParameterGroup.getName());
98
99         final Object pdpUpdateLock = new Object();
100         PdpParameters pdpParams = papParameterGroup.getPdpParameters();
101         AtomicReference<Publisher> pdpPub = new AtomicReference<>();
102         AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>();
103         AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>();
104
105         // @formatter:off
106         addAction("PAP parameters",
107             () -> ParameterService.register(papParameterGroup),
108             () -> ParameterService.deregister(papParameterGroup.getName()));
109
110         addAction("Request ID Dispatcher",
111             () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher),
112             () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));
113
114         addAction("Message Dispatcher",
115             () -> registerMsgDispatcher(),
116             () -> unregisterMsgDispatcher());
117
118         addAction("topics",
119             () -> TopicEndpoint.manager.start(),
120             () -> TopicEndpoint.manager.shutdown());
121
122         addAction("PAP statistics",
123             () -> Registry.register(PapConstants.REG_STATISTICS_MANAGER, new PapStatisticsManager()),
124             () -> Registry.unregister(PapConstants.REG_STATISTICS_MANAGER));
125
126         addAction("PDP publisher",
127             () -> {
128                 pdpPub.set(new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP));
129                 startThread(pdpPub.get());
130             },
131             () -> pdpPub.get().stop());
132
133         addAction("PDP update timers",
134             () -> {
135                 pdpUpdTimers.set(new TimerManager("update", pdpParams.getUpdateParameters().getMaxWaitMs()));
136                 startThread(pdpUpdTimers.get());
137             },
138             () -> pdpUpdTimers.get().stop());
139
140         addAction("PDP state-change timers",
141             () -> {
142                 pdpStChgTimers.set(new TimerManager("state-change", pdpParams.getUpdateParameters().getMaxWaitMs()));
143                 startThread(pdpStChgTimers.get());
144             },
145             () -> pdpStChgTimers.get().stop());
146
147         addAction("PDP modification lock",
148             () -> Registry.register(PapConstants.REG_PDP_MODIFY_LOCK, pdpUpdateLock),
149             () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_LOCK));
150
151         addAction("PDP modification requests",
152             () -> Registry.register(PapConstants.REG_PDP_MODIFY_MAP, new PdpModifyRequestMap(
153                             new PdpModifyRequestMapParams()
154                                     .setModifyLock(pdpUpdateLock)
155                                     .setParams(pdpParams)
156                                     .setPublisher(pdpPub.get())
157                                     .setResponseDispatcher(reqIdDispatcher)
158                                     .setStateChangeTimers(pdpStChgTimers.get())
159                                     .setUpdateTimers(pdpUpdTimers.get()))),
160             () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_MAP));
161
162         addAction("Create REST server",
163             () -> {
164                 restServer = new PapRestServer(papParameterGroup.getRestServerParameters());
165             },
166             () -> {
167                 restServer = null;
168             });
169
170         addAction("REST server",
171             () -> restServer.start(),
172             () -> restServer.stop());
173         // @formatter:on
174     }
175
176     /**
177      * Starts a background thread.
178      *
179      * @param runner function to run in the background
180      */
181     private void startThread(Runnable runner) {
182         Thread thread = new Thread(runner);
183         thread.setDaemon(true);
184
185         thread.start();
186     }
187
188     /**
189      * Get the parameters used by the activator.
190      *
191      * @return the parameters of the activator
192      */
193     public PapParameterGroup getParameterGroup() {
194         return papParameterGroup;
195     }
196
197     /**
198      * Registers the dispatcher with the topic source(s).
199      */
200     private void registerMsgDispatcher() {
201         for (TopicSource source : TopicEndpoint.manager
202                         .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) {
203             source.register(msgDispatcher);
204         }
205     }
206
207     /**
208      * Unregisters the dispatcher from the topic source(s).
209      */
210     private void unregisterMsgDispatcher() {
211         for (TopicSource source : TopicEndpoint.manager
212                         .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) {
213             source.unregister(msgDispatcher);
214         }
215     }
216 }