0da554d754ae278440d3df26f298b324b59d6d2f
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.services.onappf;
22
23 import java.util.List;
24 import java.util.Properties;
25
26 import lombok.Getter;
27 import lombok.Setter;
28
29 import org.onap.policy.apex.services.onappf.comm.PdpStateChangeListener;
30 import org.onap.policy.apex.services.onappf.comm.PdpStatusPublisher;
31 import org.onap.policy.apex.services.onappf.comm.PdpUpdateListener;
32 import org.onap.policy.apex.services.onappf.exception.ApexStarterException;
33 import org.onap.policy.apex.services.onappf.exception.ApexStarterRunTimeException;
34 import org.onap.policy.apex.services.onappf.handler.PdpMessageHandler;
35 import org.onap.policy.apex.services.onappf.parameters.ApexStarterParameterGroup;
36 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
37 import org.onap.policy.common.endpoints.event.comm.TopicSink;
38 import org.onap.policy.common.endpoints.event.comm.TopicSource;
39 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
40 import org.onap.policy.common.utils.services.Registry;
41 import org.onap.policy.common.utils.services.ServiceManager;
42 import org.onap.policy.common.utils.services.ServiceManagerException;
43 import org.onap.policy.models.pdp.enums.PdpMessageType;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * This class activates the ApexStarter as a complete service together with all its handlers.
49  *
50  * @author Ajith Sreekumar (ajith.sreekumar@est.tech)
51  */
52 public class ApexStarterActivator {
53
54     private static final Logger LOGGER = LoggerFactory.getLogger(ApexStarterActivator.class);
55     private final ApexStarterParameterGroup apexStarterParameterGroup;
56     private List<TopicSink> topicSinks;// topics to which apex-pdp sends pdp status
57     private List<TopicSource> topicSources; // topics to which apex-pdp listens to for messages from pap.
58     private static final String[] MSG_TYPE_NAMES = { "messageName" };
59     /**
60      * Listens for messages on the topic, decodes them into a message, and then dispatches them.
61      */
62     private final MessageTypeDispatcher msgDispatcher;
63
64     /**
65      * Used to manage the services.
66      */
67     private ServiceManager manager;
68
69     @Getter
70     @Setter(lombok.AccessLevel.PRIVATE)
71     private volatile boolean alive = false;
72
73     public ApexStarterActivator(final ApexStarterParameterGroup apexStarterParameterGroup,
74             final Properties topicProperties) {
75
76         topicSinks = TopicEndpoint.manager.addTopicSinks(topicProperties);
77         topicSources = TopicEndpoint.manager.addTopicSources(topicProperties);
78
79         // TODO: instanceId currently set as a random string, could be fetched from actual deployment
80         final int random = (int) (Math.random() * 100);
81         final String instanceId = "apex_" + random;
82         LOGGER.debug("ApexStarterActivator initializing with instance id:" + instanceId);
83         try {
84             this.apexStarterParameterGroup = apexStarterParameterGroup;
85             this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
86         } catch (final RuntimeException e) {
87             throw new ApexStarterRunTimeException(e);
88         }
89
90         final PdpUpdateListener pdpUpdateListener = new PdpUpdateListener();
91         final PdpStateChangeListener pdpStateChangeListener = new PdpStateChangeListener();
92         // @formatter:off
93         this.manager = new ServiceManager()
94                 .addAction("topics",
95                         () -> TopicEndpoint.manager.start(),
96                         () -> TopicEndpoint.manager.shutdown())
97                 .addAction("set alive",
98                         () -> setAlive(true),
99                         () -> setAlive(false))
100                 .addAction("register pdp status context object",
101                         () -> Registry.register(ApexStarterConstants.REG_PDP_STATUS_OBJECT,
102                                 new PdpMessageHandler().createPdpStatusFromParameters(instanceId,
103                                         apexStarterParameterGroup.getPdpStatusParameters())),
104                         () -> Registry.unregister(ApexStarterConstants.REG_PDP_STATUS_OBJECT))
105                 .addAction("topic sinks",
106                         () -> Registry.register(ApexStarterConstants.REG_APEX_PDP_TOPIC_SINKS, topicSinks),
107                         () -> Registry.unregister(ApexStarterConstants.REG_APEX_PDP_TOPIC_SINKS))
108                 .addAction("Pdp Status publisher",
109                         () -> Registry.register(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER,
110                                 new PdpStatusPublisher(topicSinks,
111                                         apexStarterParameterGroup.getPdpStatusParameters().getTimeIntervalMs())),
112                         () -> stopAndRemovePdpStatusPublisher())
113                 .addAction("Register pdp update listener",
114                     () -> msgDispatcher.register(PdpMessageType.PDP_UPDATE.name(), pdpUpdateListener),
115                     () -> msgDispatcher.unregister(PdpMessageType.PDP_UPDATE.name()))
116                 .addAction("Register pdp state change request dispatcher",
117                         () -> msgDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), pdpStateChangeListener),
118                         () -> msgDispatcher.unregister(PdpMessageType.PDP_STATE_CHANGE.name()))
119                 .addAction("Message Dispatcher",
120                     () -> registerMsgDispatcher(),
121                     () -> unregisterMsgDispatcher());
122         // @formatter:on
123     }
124
125     /**
126      * Method to stop and unregister the pdp status publisher.
127      */
128     private void stopAndRemovePdpStatusPublisher() {
129         final PdpStatusPublisher pdpStatusPublisher =
130                 Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER, PdpStatusPublisher.class);
131         pdpStatusPublisher.terminate();
132         Registry.unregister(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER);
133     }
134
135     /**
136      * Initialize ApexStarter service.
137      *
138      * @throws ApexStarterException on errors in initializing the service
139      */
140     public void initialize() throws ApexStarterException {
141         if (isAlive()) {
142             throw new IllegalStateException("activator already initialized");
143         }
144
145         try {
146             LOGGER.debug("ApexStarter starting as a service . . .");
147             manager.start();
148             LOGGER.debug("ApexStarter started as a service");
149         } catch (final ServiceManagerException exp) {
150             LOGGER.error("ApexStarter service startup failed");
151             throw new ApexStarterException(exp.getMessage(), exp);
152         }
153     }
154
155     /**
156      * Terminate ApexStarter.
157      *
158      * @throws ApexStarterException on errors in terminating the service
159      */
160     public void terminate() throws ApexStarterException {
161         if (!isAlive()) {
162             throw new IllegalStateException("activator is not running");
163         }
164         try {
165             final PdpStatusPublisher pdpStatusPublisher =
166                     Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER, PdpStatusPublisher.class);
167             // send a final heartbeat with terminated status
168             pdpStatusPublisher.send(new PdpMessageHandler().getTerminatedPdpStatus());
169             manager.stop();
170             Registry.unregister(ApexStarterConstants.REG_APEX_STARTER_ACTIVATOR);
171         } catch (final ServiceManagerException exp) {
172             LOGGER.error("ApexStarter termination failed");
173             throw new ApexStarterException(exp.getMessage(), exp);
174         }
175     }
176
177     /**
178      * Get the parameters used by the activator.
179      *
180      * @return the parameters of the activator
181      */
182     public ApexStarterParameterGroup getParameterGroup() {
183         return apexStarterParameterGroup;
184     }
185
186     /**
187      * Registers the dispatcher with the topic source(s).
188      */
189     private void registerMsgDispatcher() {
190         for (final TopicSource source : topicSources) {
191             source.register(msgDispatcher);
192         }
193     }
194
195     /**
196      * Unregisters the dispatcher from the topic source(s).
197      */
198     private void unregisterMsgDispatcher() {
199         for (final TopicSource source : topicSources) {
200             source.unregister(msgDispatcher);
201         }
202     }
203 }