2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.engdep;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.TimeUnit;
29 import org.java_websocket.WebSocket;
30 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
31 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
32 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
33 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.core.protocols.Message;
36 import org.onap.policy.apex.core.protocols.engdep.EngDepAction;
37 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
38 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
39 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
40 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
41 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
42 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
43 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
44 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
45 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
46 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
47 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
48 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
49 import org.onap.policy.apex.service.engine.runtime.EngineService;
50 import org.slf4j.ext.XLogger;
51 import org.slf4j.ext.XLoggerFactory;
53 import com.google.common.eventbus.Subscribe;
56 * The listener interface for receiving engDepMessage events. The class that is interested in
57 * processing a engDepMessage event implements this interface, and the object created with that
58 * class is registered with a component using the component's <code>addEngDepMessageListener</code>
59 * method. When the engDepMessage event occurs, that object's appropriate method is invoked.
61 * This class uses a queue to buffer incoming messages. When the listener is called, it places the
62 * incoming message on the queue. A thread runs which removes the messages from the queue and
63 * forwards them to the Apex engine.
65 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
67 public class EngDepMessageListener implements MessageListener<Message>, Runnable {
68 private static final int LISTENER_STOP_WAIT_INTERVAL = 10;
70 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngDepMessageListener.class);
72 // The timeout to wait between queue poll timeouts in milliseconds
73 private static final long QUEUE_POLL_TIMEOUT = 50;
75 // The Apex service itself
76 private final EngineService apexService;
78 // The message listener thread and stopping flag
79 private Thread messageListenerThread;
80 private boolean stopOrderedFlag = false;
82 // The message queue is used to hold messages prior to forwarding to Apex
83 private final BlockingQueue<MessageBlock<Message>> messageQueue = new LinkedBlockingDeque<>();
86 * Instantiates a new EngDep message listener for listening for messages coming in from the
87 * Deployment client. The <code>apexService</code> is the Apex service to send the messages
90 * @param apexService the Apex engine service
92 protected EngDepMessageListener(final EngineService apexService) {
93 this.apexService = apexService;
97 * This method is an implementation of the message listener. It receives a message and places it
98 * on the queue for processing by the message listening thread.
100 * @param data the data
101 * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage
102 * (org.onap.policy.apex.core.infrastructure.messaging.impl.ws.data.Data)
106 public void onMessage(final MessageBlock<Message> data) {
107 if (LOGGER.isDebugEnabled()) {
108 LOGGER.debug("message received from client application {} port {}",
109 data.getConnection().getRemoteSocketAddress().getAddress(),
110 data.getConnection().getRemoteSocketAddress().getPort());
112 messageQueue.add(data);
118 * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.
122 public void onMessage(final String messageString) {
123 throw new UnsupportedOperationException("String messages are not supported on the EngDep protocol");
127 * This method gets a new message listening thread from the thread factory and starts it.
129 public void startProcessorThread() {
131 messageListenerThread = new Thread(this);
132 messageListenerThread.setDaemon(true);
133 messageListenerThread.start();
138 * Stops the message listening threads.
140 public void stopProcessorThreads() {
142 stopOrderedFlag = true;
144 while (messageListenerThread.isAlive()) {
145 ThreadUtilities.sleep(LISTENER_STOP_WAIT_INTERVAL);
151 * Runs the message listening thread. Here, the messages come in on the message queue and are
152 * processed one by one
156 // Take messages off the queue and forward them to the Apex engine
157 while (messageListenerThread.isAlive() && !stopOrderedFlag) {
159 final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
161 final List<Message> messages = data.getMessages();
162 for (final Message message : messages) {
163 handleMessage(message, data.getConnection());
166 } catch (final InterruptedException e) {
167 // restore the interrupt status
168 Thread.currentThread().interrupt();
169 LOGGER.debug("message listener execution has been interrupted");
176 * This method handles EngDep messages as they come in. It uses the inevitable switch statement
177 * to handle the messages.
179 * @param message the incoming EngDep message
180 * @param webSocket the web socket on which the message came in
182 private void handleMessage(final Message message, final WebSocket webSocket) {
183 LOGGER.entry(webSocket.getRemoteSocketAddress().toString());
184 if (message.getAction() == null) {
185 // This is a response message
190 LOGGER.debug("Manager action {} being applied to engine", message.getAction());
192 // Get and check the incoming action for validity
193 EngDepAction enDepAction = null;
194 if (message.getAction() instanceof EngDepAction) {
195 enDepAction = (EngDepAction) message.getAction();
197 throw new ApexException(message.getAction().getClass().getName()
198 + "action on received message invalid, action must be of type \"EnDepAction\"");
201 // Handle each incoming message using the inevitable switch statement for the EngDep
203 switch (enDepAction) {
204 case GET_ENGINE_SERVICE_INFO:
205 final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
206 LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getID()
208 // Send a reply with the engine service information
209 sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
210 apexService.getEngineKeys(), apexService.getApexModelKey());
212 "returned engine service information for engine service " + apexService.getKey().getID());
216 final UpdateModel updateModelMessage = (UpdateModel) message;
217 LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getID());
219 apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
220 updateModelMessage.isForceInstall());
221 // Send a reply indicating the message action worked
222 sendReply(webSocket, updateModelMessage, true,
223 "updated model in engine " + updateModelMessage.getTarget().getID());
224 LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getID());
228 final StartEngine startEngineMessage = (StartEngine) message;
229 LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getID());
231 apexService.start(startEngineMessage.getTarget());
232 // Send a reply indicating the message action worked
233 sendReply(webSocket, startEngineMessage, true,
234 "started engine " + startEngineMessage.getTarget().getID());
235 LOGGER.debug("started engine {}", startEngineMessage.getTarget().getID());
239 final StopEngine stopEngineMessage = (StopEngine) message;
240 LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getID());
242 apexService.stop(stopEngineMessage.getTarget());
243 // Send a reply indicating the message action worked
244 sendReply(webSocket, stopEngineMessage, true,
245 "stopped engine " + stopEngineMessage.getTarget().getID());
246 LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getID());
249 case START_PERIODIC_EVENTS:
250 final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
251 LOGGER.debug("starting periodic events on engine {} . . .",
252 startPeriodicEventsMessage.getTarget().getID());
253 // Start periodic events with the period specified in the message
254 final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
255 apexService.startPeriodicEvents(period);
256 // Send a reply indicating the message action worked
257 sendReply(webSocket, startPeriodicEventsMessage, true, "started periodic events on engine "
258 + startPeriodicEventsMessage.getTarget().getID() + " with period " + period);
259 LOGGER.debug("started periodic events on engine " + startPeriodicEventsMessage.getTarget().getID()
260 + " with period " + period);
263 case STOP_PERIODIC_EVENTS:
264 final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
265 LOGGER.debug("stopping periodic events on engine {} . . .",
266 stopPeriodicEventsMessage.getTarget().getID());
267 // Stop periodic events
268 apexService.stopPeriodicEvents();
269 // Send a reply indicating the message action worked
270 sendReply(webSocket, stopPeriodicEventsMessage, true,
271 "stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getID());
272 LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getID());
275 case GET_ENGINE_STATUS:
276 final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message;
277 LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getID());
278 // Send a reply with the engine status
279 sendReply(webSocket, getEngineStatusMessage, true,
280 apexService.getStatus(getEngineStatusMessage.getTarget()));
281 LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getID());
284 case GET_ENGINE_INFO:
285 final GetEngineInfo getEngineInfo = (GetEngineInfo) message;
286 LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getID());
287 // Send a reply with the engine runtime information
288 sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget()));
289 LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getID());
292 throw new ApexException("RESPONSE action on received message not handled by engine");
297 } catch (final ApexException e) {
298 LOGGER.warn("apex failed to execute message", e);
299 sendReply(webSocket, message, false, e.getCascadedMessage());
300 } catch (final Exception e) {
301 LOGGER.warn("system failure executing message", e);
302 sendReply(webSocket, message, false, e.getMessage());
308 * Send the Response message to the client.
310 * @param client the client to which to send the response message
311 * @param requestMessage the message to which we are responding
312 * @param result the result indicating success or failure
313 * @param messageData the message data
315 private void sendReply(final WebSocket client, final Message requestMessage, final boolean result,
316 final String messageData) {
317 LOGGER.entry(result, messageData);
319 if (client == null || !client.isOpen()) {
320 LOGGER.debug("error sending reply {}, client has disconnected", requestMessage.getAction());
324 LOGGER.debug("sending {} to web socket {}", requestMessage.getAction(),
325 client.getRemoteSocketAddress().toString());
327 final Response responseMessage = new Response(requestMessage.getTarget(), result, requestMessage);
328 responseMessage.setMessageData(messageData);
330 final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
331 messageHolder.addMessage(responseMessage);
332 client.send(MessagingUtils.serializeObject(messageHolder));
338 * Send the EngineServiceInfoResponse message to the client.
340 * @param client the client to which to send the response message
341 * @param requestMessage the message to which we are responding
342 * @param engineServiceKey The key of this engine service
343 * @param engineKeyCollection The keys of the engines in this engine service
344 * @param apexModelKey the apex model key
346 private void sendServiceInfoReply(final WebSocket client, final Message requestMessage,
347 final AxArtifactKey engineServiceKey, final Collection<AxArtifactKey> engineKeyCollection,
348 final AxArtifactKey apexModelKey) {
350 LOGGER.debug("sending {} to web socket {}", requestMessage.getAction(),
351 client.getRemoteSocketAddress().toString());
353 final EngineServiceInfoResponse responseMessage =
354 new EngineServiceInfoResponse(requestMessage.getTarget(), true, requestMessage);
355 responseMessage.setMessageData("engine service information");
356 responseMessage.setEngineServiceKey(engineServiceKey);
357 responseMessage.setEngineKeyArray(engineKeyCollection);
358 responseMessage.setApexModelKey(apexModelKey);
360 final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
361 messageHolder.addMessage(responseMessage);
362 client.send(MessagingUtils.serializeObject(messageHolder));