57c547b8c3dab89511d66b9e21b65306e631485a
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / engdep / EngDepMessageListener.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.engdep;
23
24 import com.google.common.eventbus.Subscribe;
25 import java.net.InetAddress;
26 import java.net.UnknownHostException;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingDeque;
31 import java.util.concurrent.TimeUnit;
32 import org.java_websocket.WebSocket;
33 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
34 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
35 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
36 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.core.protocols.Message;
39 import org.onap.policy.apex.core.protocols.engdep.EngDepAction;
40 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
41 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
42 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
43 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
44 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
45 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
46 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
47 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
48 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
49 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
50 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
51 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
52 import org.onap.policy.apex.service.engine.runtime.EngineService;
53 import org.slf4j.ext.XLogger;
54 import org.slf4j.ext.XLoggerFactory;
55
56 /**
57  * The listener interface for receiving engDepMessage events. The class that is interested in processing a engDepMessage
58  * event implements this interface, and the object created with that class is registered with a component using the
59  * component's <code>addEngDepMessageListener</code> method. When the engDepMessage event occurs, that object's
60  * appropriate method is invoked.
61  *
62  * <p>This class uses a queue to buffer incoming messages. When the listener is called, it places the incoming message
63  * on the queue. A thread runs which removes the messages from the queue and forwards them to the Apex engine.
64  *
65  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
66  */
67 public class EngDepMessageListener implements MessageListener<Message>, Runnable {
68     private static final int LISTENER_STOP_WAIT_INTERVAL = 10;
69
70     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngDepMessageListener.class);
71
72     // The timeout to wait between queue poll timeouts in milliseconds
73     private static final long QUEUE_POLL_TIMEOUT = 50;
74
75     // The Apex service itself
76     private final EngineService apexService;
77
78     // The message listener thread and stopping flag
79     private Thread messageListenerThread;
80     private volatile boolean stopOrderedFlag = false;
81
82     // The message queue is used to hold messages prior to forwarding to Apex
83     private final BlockingQueue<MessageBlock<Message>> messageQueue = new LinkedBlockingDeque<>();
84
85     /**
86      * Instantiates a new EngDep message listener for listening for messages coming in from the Deployment client. The
87      * <code>apexService</code> is the Apex service to send the messages onto.
88      *
89      * @param apexService the Apex engine service
90      */
91     protected EngDepMessageListener(final EngineService apexService) {
92         this.apexService = apexService;
93     }
94
95     /**
96      * This method is an implementation of the message listener. It receives a message and places it on the queue for
97      * processing by the message listening thread.
98      *
99      * @param data the data
100      * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage
101      *      (org.onap.policy.apex.core.infrastructure.messaging.impl.ws.data.Data)
102      */
103     @Subscribe
104     @Override
105     public void onMessage(final MessageBlock<Message> data) {
106         if (LOGGER.isDebugEnabled()) {
107             LOGGER.debug("message received from client application {} port {}",
108                             data.getConnection().getRemoteSocketAddress().getAddress(),
109                             data.getConnection().getRemoteSocketAddress().getPort());
110         }
111         messageQueue.add(data);
112     }
113
114     /**
115      * {@inheritDoc}.
116      */
117     @Override
118     public void onMessage(final String messageString) {
119         throw new UnsupportedOperationException("String messages are not supported on the EngDep protocol");
120     }
121
122     /**
123      * This method gets a new message listening thread from the thread factory and starts it.
124      */
125     public void startProcessorThread() {
126         LOGGER.entry();
127         messageListenerThread = new Thread(this);
128         messageListenerThread.setDaemon(true);
129         messageListenerThread.start();
130         LOGGER.exit();
131     }
132
133     /**
134      * Stops the message listening threads.
135      */
136     public void stopProcessorThreads() {
137         LOGGER.entry();
138         stopOrderedFlag = true;
139
140         while (messageListenerThread.isAlive()) {
141             ThreadUtilities.sleep(LISTENER_STOP_WAIT_INTERVAL);
142         }
143         LOGGER.exit();
144     }
145
146     /**
147      * Runs the message listening thread. Here, the messages come in on the message queue and are processed one by one
148      */
149     @Override
150     public void run() {
151         // Take messages off the queue and forward them to the Apex engine
152         while (!stopOrderedFlag) {
153             pollAndHandleMessage();
154         }
155     }
156
157     /**
158      * Poll the queue for a message and handle that message.
159      */
160     private void pollAndHandleMessage() {
161         try {
162             final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
163             if (data != null) {
164                 final List<Message> messages = data.getMessages();
165                 for (final Message message : messages) {
166                     handleMessage(message, data.getConnection());
167                 }
168             }
169         } catch (final InterruptedException e) {
170             // restore the interrupt status
171             Thread.currentThread().interrupt();
172             LOGGER.debug("message listener execution has been interrupted");
173         }
174     }
175
176     /**
177      * This method handles EngDep messages as they come in. It uses the inevitable switch statement to handle the
178      * messages.
179      *
180      * @param message the incoming EngDep message
181      * @param webSocket the web socket on which the message came in
182      */
183     private void handleMessage(final Message message, final WebSocket webSocket) {
184         LOGGER.entry(webSocket.getRemoteSocketAddress().toString());
185         if (message.getAction() == null) {
186             // This is a response message
187             return;
188         }
189
190         try {
191             LOGGER.debug("Manager action {} being applied to engine", message.getAction());
192
193             // Get and check the incoming action for validity
194             EngDepAction enDepAction = null;
195             if (message.getAction() instanceof EngDepAction) {
196                 enDepAction = (EngDepAction) message.getAction();
197             } else {
198                 throw new ApexException(message.getAction().getClass().getName()
199                                 + "action on received message invalid, action must be of type \"EnDepAction\"");
200             }
201
202             handleIncomingMessages(message, webSocket, enDepAction);
203         } catch (final ApexException e) {
204             LOGGER.warn("apex failed to execute message", e);
205             sendReply(webSocket, message, false, e.getCascadedMessage());
206         } catch (final Exception e) {
207             LOGGER.warn("system failure executing message", e);
208             sendReply(webSocket, message, false, e.getMessage());
209         }
210         LOGGER.exit();
211     }
212
213     /**
214      * Handle incoming EngDep messages.
215      *
216      * @param message the incoming message
217      * @param webSocket the web socket the message came in on
218      * @param enDepAction the action from the message
219      * @throws ApexException on message handling errors
220      */
221     private void handleIncomingMessages(final Message message, final WebSocket webSocket, EngDepAction enDepAction)
222                     throws ApexException {
223         // Handle each incoming message using the inevitable switch statement for the EngDep
224         // protocol
225         switch (enDepAction) {
226             case GET_ENGINE_SERVICE_INFO:
227                 handleGetEngineServiceInfoMessage(message, webSocket);
228                 break;
229
230             case UPDATE_MODEL:
231                 handleUpdateModelMessage(message, webSocket);
232                 break;
233
234             case START_ENGINE:
235                 handleStartEngineMessage(message, webSocket);
236                 break;
237
238             case STOP_ENGINE:
239                 handleStopEngineMessage(message, webSocket);
240                 break;
241
242             case START_PERIODIC_EVENTS:
243                 handleStartPeriodicEventsMessage(message, webSocket);
244                 break;
245
246             case STOP_PERIODIC_EVENTS:
247                 handleStopPeriodicEventsMessage(message, webSocket);
248                 break;
249
250             case GET_ENGINE_STATUS:
251                 handleEngineStatusMessage(message, webSocket);
252                 break;
253
254             case GET_ENGINE_INFO:
255                 handleEngineInfoMessage(message, webSocket);
256                 break;
257
258             default:
259                 throw new ApexException("action " + enDepAction + " on received message not handled by engine");
260         }
261     }
262
263     /**
264      * Handle the get engine service information message.
265      *
266      * @param message the message
267      * @param webSocket the web socket that the message came on
268      * @throws ApexException on message handling exceptions
269      */
270     private void handleGetEngineServiceInfoMessage(final Message message, final WebSocket webSocket) {
271         final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
272         LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId()
273                         + " . . .");
274         // Send a reply with the engine service information
275         sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
276                         apexService.getEngineKeys(), apexService.getApexModelKey());
277         LOGGER.debug("returned engine service information for engine service "
278                         + apexService.getKey().getId());
279     }
280
281     /**
282      * Handle the update model message.
283      *
284      * @param message the message
285      * @param webSocket the web socket that the message came on
286      * @throws ApexException on message handling exceptions
287      */
288     private void handleUpdateModelMessage(final Message message, final WebSocket webSocket) throws ApexException {
289         final UpdateModel updateModelMessage = (UpdateModel) message;
290         LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId());
291         // Update the model
292         apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
293                         updateModelMessage.isForceInstall());
294         // Send a reply indicating the message action worked
295         sendReply(webSocket, updateModelMessage, true,
296                         "updated model in engine " + updateModelMessage.getTarget().getId());
297         LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId());
298     }
299
300     /**
301      * Handle the start engine message.
302      *
303      * @param message the message
304      * @param webSocket the web socket that the message came on
305      * @throws ApexException on message handling exceptions
306      */
307     private void handleStartEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
308         final StartEngine startEngineMessage = (StartEngine) message;
309         LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId());
310         // Start the engine
311         apexService.start(startEngineMessage.getTarget());
312         // Send a reply indicating the message action worked
313         sendReply(webSocket, startEngineMessage, true,
314                         "started engine " + startEngineMessage.getTarget().getId());
315         LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId());
316     }
317
318     /**
319      * Handle the stop engine message.
320      *
321      * @param message the message
322      * @param webSocket the web socket that the message came on
323      * @throws ApexException on message handling exceptions
324      */
325     private void handleStopEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
326         final StopEngine stopEngineMessage = (StopEngine) message;
327         LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId());
328         // Stop the engine
329         apexService.stop(stopEngineMessage.getTarget());
330         // Send a reply indicating the message action worked
331         sendReply(webSocket, stopEngineMessage, true,
332                         "stopped engine " + stopEngineMessage.getTarget().getId());
333         LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId());
334     }
335
336     /**
337      * Handle the start periodic events message.
338      *
339      * @param message the message
340      * @param webSocket the web socket that the message came on
341      * @throws ApexException on message handling exceptions
342      */
343     private void handleStartPeriodicEventsMessage(final Message message, final WebSocket webSocket)
344                     throws ApexException {
345         final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
346         LOGGER.debug("starting periodic events on engine {} . . .",
347                         startPeriodicEventsMessage.getTarget().getId());
348         // Start periodic events with the period specified in the message
349         final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
350         apexService.startPeriodicEvents(period);
351         // Send a reply indicating the message action worked
352         String periodicStartedMessage = "started periodic events on engine "
353                         + startPeriodicEventsMessage.getTarget().getId() + " with period " + period;
354         sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage);
355         LOGGER.debug(periodicStartedMessage);
356     }
357
358     /**
359      * Handle the stop periodic events message.
360      *
361      * @param message the message
362      * @param webSocket the web socket that the message came on
363      * @throws ApexException on message handling exceptions
364      */
365     private void handleStopPeriodicEventsMessage(final Message message, final WebSocket webSocket)
366                     throws ApexException {
367         final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
368         LOGGER.debug("stopping periodic events on engine {} . . .",
369                         stopPeriodicEventsMessage.getTarget().getId());
370         // Stop periodic events
371         apexService.stopPeriodicEvents();
372         // Send a reply indicating the message action worked
373         sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine "
374                         + stopPeriodicEventsMessage.getTarget().getId());
375         LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
376     }
377
378     /**
379      * Handle the engine status message.
380      *
381      * @param message the message
382      * @param webSocket the web socket that the message came on
383      * @throws ApexException on message handling exceptions
384      */
385     private void handleEngineStatusMessage(final Message message, final WebSocket webSocket) throws ApexException {
386         final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message;
387         LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId());
388         // Send a reply with the engine status
389         sendReply(webSocket, getEngineStatusMessage, true,
390                         apexService.getStatus(getEngineStatusMessage.getTarget()));
391         LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId());
392     }
393
394     /**
395      * Handle the engine information message.
396      *
397      * @param message the message
398      * @param webSocket the web socket that the message came on
399      * @throws ApexException on message handling exceptions
400      */
401     private void handleEngineInfoMessage(final Message message, final WebSocket webSocket) throws ApexException {
402         final GetEngineInfo getEngineInfo = (GetEngineInfo) message;
403         LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId());
404         // Send a reply with the engine runtime information
405         sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget()));
406         LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId());
407     }
408
409     /**
410      * Get the local address for the WS MessageHolder, or null if there is a problem.
411      */
412     private InetAddress getLocalAddress() {
413         try {
414             return MessagingUtils.getLocalHostLanAddress();
415         }
416         catch (UnknownHostException e) {
417             LOGGER.debug("failed to find the localhost address - continuing ...", e);
418             return null;
419         }
420     }
421
422     /**
423      * Send the Response message to the client.
424      *
425      * @param client the client to which to send the response message
426      * @param requestMessage the message to which we are responding
427      * @param result the result indicating success or failure
428      * @param messageData the message data
429      */
430     private void sendReply(final WebSocket client, final Message requestMessage, final boolean result,
431                     final String messageData) {
432         LOGGER.entry(result, messageData);
433
434         if (client == null || !client.isOpen()) {
435             LOGGER.debug("error sending reply {}, client has disconnected", requestMessage.getAction());
436             return;
437         }
438
439         String replyString = "sending " + requestMessage.getAction() + " to web socket "
440                         + client.getRemoteSocketAddress().toString();
441         LOGGER.debug(replyString);
442
443         final Response responseMessage = new Response(requestMessage.getTarget(), result, requestMessage);
444         responseMessage.setMessageData(messageData);
445
446         final MessageHolder<Message> messageHolder = new MessageHolder<>(getLocalAddress());
447         messageHolder.addMessage(responseMessage);
448         client.send(MessagingUtils.serializeObject(messageHolder));
449
450         LOGGER.exit();
451     }
452
453     /**
454      * Send the EngineServiceInfoResponse message to the client.
455      *
456      * @param client the client to which to send the response message
457      * @param requestMessage the message to which we are responding
458      * @param engineServiceKey The key of this engine service
459      * @param engineKeyCollection The keys of the engines in this engine service
460      * @param apexModelKey the apex model key
461      */
462     private void sendServiceInfoReply(final WebSocket client, final Message requestMessage,
463                     final AxArtifactKey engineServiceKey, final Collection<AxArtifactKey> engineKeyCollection,
464                     final AxArtifactKey apexModelKey) {
465         LOGGER.entry();
466         String sendingMessage = "sending " + requestMessage.getAction() + " to web socket "
467                         + client.getRemoteSocketAddress().toString();
468         LOGGER.debug(sendingMessage);
469
470         final EngineServiceInfoResponse responseMessage = new EngineServiceInfoResponse(requestMessage.getTarget(),
471                         true, requestMessage);
472         responseMessage.setMessageData("engine service information");
473         responseMessage.setEngineServiceKey(engineServiceKey);
474         responseMessage.setEngineKeyArray(engineKeyCollection);
475         responseMessage.setApexModelKey(apexModelKey);
476
477         final MessageHolder<Message> messageHolder = new MessageHolder<>(getLocalAddress());
478         messageHolder.addMessage(responseMessage);
479         client.send(MessagingUtils.serializeObject(messageHolder));
480
481         LOGGER.exit();
482     }
483 }