2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.engdep;
24 import com.google.common.eventbus.Subscribe;
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingDeque;
31 import java.util.concurrent.TimeUnit;
32 import org.java_websocket.WebSocket;
33 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
34 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
35 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
36 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.core.protocols.Message;
39 import org.onap.policy.apex.core.protocols.engdep.EngDepAction;
40 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
41 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
42 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
43 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
44 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
45 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
46 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
47 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
48 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
49 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
50 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
51 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
52 import org.onap.policy.apex.service.engine.runtime.EngineService;
53 import org.slf4j.ext.XLogger;
54 import org.slf4j.ext.XLoggerFactory;
57 * The listener interface for receiving engDepMessage events. The class that is interested in processing a engDepMessage
58 * event implements this interface, and the object created with that class is registered with a component using the
59 * component's <code>addEngDepMessageListener</code> method. When the engDepMessage event occurs, that object's
60 * appropriate method is invoked.
62 * <p>This class uses a queue to buffer incoming messages. When the listener is called, it places the incoming message
63 * on the queue. A thread runs which removes the messages from the queue and forwards them to the Apex engine.
65 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
67 public class EngDepMessageListener implements MessageListener<Message>, Runnable {
68 private static final int LISTENER_STOP_WAIT_INTERVAL = 10;
70 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngDepMessageListener.class);
72 // The timeout to wait between queue poll timeouts in milliseconds
73 private static final long QUEUE_POLL_TIMEOUT = 50;
75 // The Apex service itself
76 private final EngineService apexService;
78 // The message listener thread and stopping flag
79 private Thread messageListenerThread;
80 private volatile boolean stopOrderedFlag = false;
82 // The message queue is used to hold messages prior to forwarding to Apex
83 private final BlockingQueue<MessageBlock<Message>> messageQueue = new LinkedBlockingDeque<>();
86 * Instantiates a new EngDep message listener for listening for messages coming in from the Deployment client. The
87 * <code>apexService</code> is the Apex service to send the messages onto.
89 * @param apexService the Apex engine service
91 protected EngDepMessageListener(final EngineService apexService) {
92 this.apexService = apexService;
96 * This method is an implementation of the message listener. It receives a message and places it on the queue for
97 * processing by the message listening thread.
99 * @param data the data
100 * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage
101 * (org.onap.policy.apex.core.infrastructure.messaging.impl.ws.data.Data)
105 public void onMessage(final MessageBlock<Message> data) {
106 if (LOGGER.isDebugEnabled()) {
107 LOGGER.debug("message received from client application {} port {}",
108 data.getConnection().getRemoteSocketAddress().getAddress(),
109 data.getConnection().getRemoteSocketAddress().getPort());
111 messageQueue.add(data);
118 public void onMessage(final String messageString) {
119 throw new UnsupportedOperationException("String messages are not supported on the EngDep protocol");
123 * This method gets a new message listening thread from the thread factory and starts it.
125 public void startProcessorThread() {
127 messageListenerThread = new Thread(this);
128 messageListenerThread.setDaemon(true);
129 messageListenerThread.start();
134 * Stops the message listening threads.
136 public void stopProcessorThreads() {
138 stopOrderedFlag = true;
140 while (messageListenerThread.isAlive()) {
141 ThreadUtilities.sleep(LISTENER_STOP_WAIT_INTERVAL);
147 * Runs the message listening thread. Here, the messages come in on the message queue and are processed one by one
151 // Take messages off the queue and forward them to the Apex engine
152 while (!stopOrderedFlag) {
153 pollAndHandleMessage();
158 * Poll the queue for a message and handle that message.
160 private void pollAndHandleMessage() {
162 final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
164 final List<Message> messages = data.getMessages();
165 for (final Message message : messages) {
166 handleMessage(message, data.getConnection());
169 } catch (final InterruptedException e) {
170 // restore the interrupt status
171 Thread.currentThread().interrupt();
172 LOGGER.debug("message listener execution has been interrupted");
177 * This method handles EngDep messages as they come in. It uses the inevitable switch statement to handle the
180 * @param message the incoming EngDep message
181 * @param webSocket the web socket on which the message came in
183 private void handleMessage(final Message message, final WebSocket webSocket) {
184 LOGGER.entry(webSocket.getRemoteSocketAddress().toString());
185 if (message.getAction() == null) {
186 // This is a response message
191 LOGGER.debug("Manager action {} being applied to engine", message.getAction());
193 // Get and check the incoming action for validity
194 EngDepAction enDepAction = null;
195 if (message.getAction() instanceof EngDepAction) {
196 enDepAction = (EngDepAction) message.getAction();
198 throw new ApexException(message.getAction().getClass().getName()
199 + "action on received message invalid, action must be of type \"EnDepAction\"");
202 handleIncomingMessages(message, webSocket, enDepAction);
203 } catch (final ApexException e) {
204 LOGGER.warn("apex failed to execute message", e);
205 sendReply(webSocket, message, false, e.getCascadedMessage());
206 } catch (final Exception e) {
207 LOGGER.warn("system failure executing message", e);
208 sendReply(webSocket, message, false, e.getMessage());
214 * Handle incoming EngDep messages.
216 * @param message the incoming message
217 * @param webSocket the web socket the message came in on
218 * @param enDepAction the action from the message
219 * @throws ApexException on message handling errors
221 private void handleIncomingMessages(final Message message, final WebSocket webSocket, EngDepAction enDepAction)
222 throws ApexException {
223 // Handle each incoming message using the inevitable switch statement for the EngDep
225 switch (enDepAction) {
226 case GET_ENGINE_SERVICE_INFO:
227 handleGetEngineServiceInfoMessage(message, webSocket);
231 handleUpdateModelMessage(message, webSocket);
235 handleStartEngineMessage(message, webSocket);
239 handleStopEngineMessage(message, webSocket);
242 case START_PERIODIC_EVENTS:
243 handleStartPeriodicEventsMessage(message, webSocket);
246 case STOP_PERIODIC_EVENTS:
247 handleStopPeriodicEventsMessage(message, webSocket);
250 case GET_ENGINE_STATUS:
251 handleEngineStatusMessage(message, webSocket);
254 case GET_ENGINE_INFO:
255 handleEngineInfoMessage(message, webSocket);
259 throw new ApexException("action " + enDepAction + " on received message not handled by engine");
264 * Handle the get engine service information message.
266 * @param message the message
267 * @param webSocket the web socket that the message came on
268 * @throws ApexException on message handling exceptions
270 private void handleGetEngineServiceInfoMessage(final Message message, final WebSocket webSocket) {
271 final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
272 LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId()
274 // Send a reply with the engine service information
275 sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
276 apexService.getEngineKeys(), apexService.getApexModelKey());
277 LOGGER.debug("returned engine service information for engine service "
278 + apexService.getKey().getId());
282 * Handle the update model message.
284 * @param message the message
285 * @param webSocket the web socket that the message came on
286 * @throws ApexException on message handling exceptions
288 private void handleUpdateModelMessage(final Message message, final WebSocket webSocket) throws ApexException {
289 final UpdateModel updateModelMessage = (UpdateModel) message;
290 LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId());
292 apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
293 updateModelMessage.isForceInstall());
294 // Send a reply indicating the message action worked
295 sendReply(webSocket, updateModelMessage, true,
296 "updated model in engine " + updateModelMessage.getTarget().getId());
297 LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId());
301 * Handle the start engine message.
303 * @param message the message
304 * @param webSocket the web socket that the message came on
305 * @throws ApexException on message handling exceptions
307 private void handleStartEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
308 final StartEngine startEngineMessage = (StartEngine) message;
309 LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId());
311 apexService.start(startEngineMessage.getTarget());
312 // Send a reply indicating the message action worked
313 sendReply(webSocket, startEngineMessage, true,
314 "started engine " + startEngineMessage.getTarget().getId());
315 LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId());
319 * Handle the stop engine message.
321 * @param message the message
322 * @param webSocket the web socket that the message came on
323 * @throws ApexException on message handling exceptions
325 private void handleStopEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
326 final StopEngine stopEngineMessage = (StopEngine) message;
327 LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId());
329 apexService.stop(stopEngineMessage.getTarget());
330 // Send a reply indicating the message action worked
331 sendReply(webSocket, stopEngineMessage, true,
332 "stopped engine " + stopEngineMessage.getTarget().getId());
333 LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId());
337 * Handle the start periodic events message.
339 * @param message the message
340 * @param webSocket the web socket that the message came on
341 * @throws ApexException on message handling exceptions
343 private void handleStartPeriodicEventsMessage(final Message message, final WebSocket webSocket)
344 throws ApexException {
345 final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
346 LOGGER.debug("starting periodic events on engine {} . . .",
347 startPeriodicEventsMessage.getTarget().getId());
348 // Start periodic events with the period specified in the message
349 final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
350 apexService.startPeriodicEvents(period);
351 // Send a reply indicating the message action worked
352 String periodicStartedMessage = "started periodic events on engine "
353 + startPeriodicEventsMessage.getTarget().getId() + " with period " + period;
354 sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage);
355 LOGGER.debug(periodicStartedMessage);
359 * Handle the stop periodic events message.
361 * @param message the message
362 * @param webSocket the web socket that the message came on
363 * @throws ApexException on message handling exceptions
365 private void handleStopPeriodicEventsMessage(final Message message, final WebSocket webSocket)
366 throws ApexException {
367 final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
368 LOGGER.debug("stopping periodic events on engine {} . . .",
369 stopPeriodicEventsMessage.getTarget().getId());
370 // Stop periodic events
371 apexService.stopPeriodicEvents();
372 // Send a reply indicating the message action worked
373 sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine "
374 + stopPeriodicEventsMessage.getTarget().getId());
375 LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
379 * Handle the engine status message.
381 * @param message the message
382 * @param webSocket the web socket that the message came on
383 * @throws ApexException on message handling exceptions
385 private void handleEngineStatusMessage(final Message message, final WebSocket webSocket) throws ApexException {
386 final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message;
387 LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId());
388 // Send a reply with the engine status
389 sendReply(webSocket, getEngineStatusMessage, true,
390 apexService.getStatus(getEngineStatusMessage.getTarget()));
391 LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId());
395 * Handle the engine information message.
397 * @param message the message
398 * @param webSocket the web socket that the message came on
399 * @throws ApexException on message handling exceptions
401 private void handleEngineInfoMessage(final Message message, final WebSocket webSocket) throws ApexException {
402 final GetEngineInfo getEngineInfo = (GetEngineInfo) message;
403 LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId());
404 // Send a reply with the engine runtime information
405 sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget()));
406 LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId());
410 * Get the local address for the WS MessageHolder, or null if there is a problem.
412 private InetAddress getLocalAddress() {
414 return MessagingUtils.getLocalHostLanAddress();
415 } catch (UnknownHostException e) {
416 LOGGER.debug("failed to find the localhost address - continuing ...", e);
422 * Send the Response message to the client.
424 * @param client the client to which to send the response message
425 * @param requestMessage the message to which we are responding
426 * @param result the result indicating success or failure
427 * @param messageData the message data
429 private void sendReply(final WebSocket client, final Message requestMessage, final boolean result,
430 final String messageData) {
431 LOGGER.entry(result, messageData);
433 if (client == null || !client.isOpen()) {
434 LOGGER.debug("error sending reply {}, client has disconnected", requestMessage.getAction());
438 String replyString = "sending " + requestMessage.getAction() + " to web socket "
439 + client.getRemoteSocketAddress().toString();
440 LOGGER.debug(replyString);
442 final Response responseMessage = new Response(requestMessage.getTarget(), result, requestMessage);
443 responseMessage.setMessageData(messageData);
445 final MessageHolder<Message> messageHolder = new MessageHolder<>(getLocalAddress());
446 messageHolder.addMessage(responseMessage);
447 client.send(MessagingUtils.serializeObject(messageHolder));
453 * Send the EngineServiceInfoResponse message to the client.
455 * @param client the client to which to send the response message
456 * @param requestMessage the message to which we are responding
457 * @param engineServiceKey The key of this engine service
458 * @param engineKeyCollection The keys of the engines in this engine service
459 * @param apexModelKey the apex model key
461 private void sendServiceInfoReply(final WebSocket client, final Message requestMessage,
462 final AxArtifactKey engineServiceKey, final Collection<AxArtifactKey> engineKeyCollection,
463 final AxArtifactKey apexModelKey) {
465 String sendingMessage = "sending " + requestMessage.getAction() + " to web socket "
466 + client.getRemoteSocketAddress().toString();
467 LOGGER.debug(sendingMessage);
469 final EngineServiceInfoResponse responseMessage = new EngineServiceInfoResponse(requestMessage.getTarget(),
470 true, requestMessage);
471 responseMessage.setMessageData("engine service information");
472 responseMessage.setEngineServiceKey(engineServiceKey);
473 responseMessage.setEngineKeyArray(engineKeyCollection);
474 responseMessage.setApexModelKey(apexModelKey);
476 final MessageHolder<Message> messageHolder = new MessageHolder<>(getLocalAddress());
477 messageHolder.addMessage(responseMessage);
478 client.send(MessagingUtils.serializeObject(messageHolder));