1b84c0a920cec95ad522fec3f18105e17bd6d299
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.engdep;
22
23 import com.google.common.eventbus.Subscribe;
24
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingDeque;
29 import java.util.concurrent.TimeUnit;
30
31 import org.java_websocket.WebSocket;
32 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
33 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
34 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
35 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
36 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
37 import org.onap.policy.apex.core.protocols.Message;
38 import org.onap.policy.apex.core.protocols.engdep.EngDepAction;
39 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
40 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
41 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
42 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
43 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
44 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
45 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
46 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
47 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
48 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
49 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
50 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
51 import org.onap.policy.apex.service.engine.runtime.EngineService;
52 import org.slf4j.ext.XLogger;
53 import org.slf4j.ext.XLoggerFactory;
54
55 /**
56  * The listener interface for receiving engDepMessage events. The class that is interested in processing a engDepMessage
57  * event implements this interface, and the object created with that class is registered with a component using the
58  * component's <code>addEngDepMessageListener</code> method. When the engDepMessage event occurs, that object's
59  * appropriate method is invoked.
60  * 
61  * <p>This class uses a queue to buffer incoming messages. When the listener is called, it places the incoming message
62  * on the queue. A thread runs which removes the messages from the queue and forwards them to the Apex engine.
63  *
64  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
65  */
66 public class EngDepMessageListener implements MessageListener<Message>, Runnable {
67     private static final int LISTENER_STOP_WAIT_INTERVAL = 10;
68
69     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngDepMessageListener.class);
70
71     // The timeout to wait between queue poll timeouts in milliseconds
72     private static final long QUEUE_POLL_TIMEOUT = 50;
73
74     // The Apex service itself
75     private final EngineService apexService;
76
77     // The message listener thread and stopping flag
78     private Thread messageListenerThread;
79     private boolean stopOrderedFlag = false;
80
81     // The message queue is used to hold messages prior to forwarding to Apex
82     private final BlockingQueue<MessageBlock<Message>> messageQueue = new LinkedBlockingDeque<>();
83
84     /**
85      * Instantiates a new EngDep message listener for listening for messages coming in from the Deployment client. The
86      * <code>apexService</code> is the Apex service to send the messages onto.
87      *
88      * @param apexService the Apex engine service
89      */
90     protected EngDepMessageListener(final EngineService apexService) {
91         this.apexService = apexService;
92     }
93
94     /**
95      * This method is an implementation of the message listener. It receives a message and places it on the queue for
96      * processing by the message listening thread.
97      *
98      * @param data the data
99      * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage
100      *      (org.onap.policy.apex.core.infrastructure.messaging.impl.ws.data.Data)
101      */
102     @Subscribe
103     @Override
104     public void onMessage(final MessageBlock<Message> data) {
105         if (LOGGER.isDebugEnabled()) {
106             LOGGER.debug("message received from client application {} port {}",
107                             data.getConnection().getRemoteSocketAddress().getAddress(),
108                             data.getConnection().getRemoteSocketAddress().getPort());
109         }
110         messageQueue.add(data);
111     }
112
113     /*
114      * (non-Javadoc)
115      *
116      * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. String)
117      */
118     @Override
119     public void onMessage(final String messageString) {
120         throw new UnsupportedOperationException("String messages are not supported on the EngDep protocol");
121     }
122
123     /**
124      * This method gets a new message listening thread from the thread factory and starts it.
125      */
126     public void startProcessorThread() {
127         LOGGER.entry();
128         messageListenerThread = new Thread(this);
129         messageListenerThread.setDaemon(true);
130         messageListenerThread.start();
131         LOGGER.exit();
132     }
133
134     /**
135      * Stops the message listening threads.
136      */
137     public void stopProcessorThreads() {
138         LOGGER.entry();
139         stopOrderedFlag = true;
140
141         while (messageListenerThread.isAlive()) {
142             ThreadUtilities.sleep(LISTENER_STOP_WAIT_INTERVAL);
143         }
144         LOGGER.exit();
145     }
146
147     /**
148      * Runs the message listening thread. Here, the messages come in on the message queue and are processed one by one
149      */
150     @Override
151     public void run() {
152         // Take messages off the queue and forward them to the Apex engine
153         while (messageListenerThread.isAlive() && !stopOrderedFlag) {
154             pollAndHandleMessage();
155         }
156     }
157
158     /**
159      * Poll the queue for a message and handle that message.
160      */
161     private void pollAndHandleMessage() {
162         try {
163             final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
164             if (data != null) {
165                 final List<Message> messages = data.getMessages();
166                 for (final Message message : messages) {
167                     handleMessage(message, data.getConnection());
168                 }
169             }
170         } catch (final InterruptedException e) {
171             // restore the interrupt status
172             Thread.currentThread().interrupt();
173             LOGGER.debug("message listener execution has been interrupted");
174         }
175     }
176
177     /**
178      * This method handles EngDep messages as they come in. It uses the inevitable switch statement to handle the
179      * messages.
180      *
181      * @param message the incoming EngDep message
182      * @param webSocket the web socket on which the message came in
183      */
184     private void handleMessage(final Message message, final WebSocket webSocket) {
185         LOGGER.entry(webSocket.getRemoteSocketAddress().toString());
186         if (message.getAction() == null) {
187             // This is a response message
188             return;
189         }
190
191         try {
192             LOGGER.debug("Manager action {} being applied to engine", message.getAction());
193
194             // Get and check the incoming action for validity
195             EngDepAction enDepAction = null;
196             if (message.getAction() instanceof EngDepAction) {
197                 enDepAction = (EngDepAction) message.getAction();
198             } else {
199                 throw new ApexException(message.getAction().getClass().getName()
200                                 + "action on received message invalid, action must be of type \"EnDepAction\"");
201             }
202
203             handleIncomingMessages(message, webSocket, enDepAction);
204         } catch (final ApexException e) {
205             LOGGER.warn("apex failed to execute message", e);
206             sendReply(webSocket, message, false, e.getCascadedMessage());
207         } catch (final Exception e) {
208             LOGGER.warn("system failure executing message", e);
209             sendReply(webSocket, message, false, e.getMessage());
210         }
211         LOGGER.exit();
212     }
213
214     /**
215      * Handle incoming EngDep messages.
216      * 
217      * @param message the incoming message
218      * @param webSocket the web socket the message came in on
219      * @param enDepAction the action from the message
220      * @throws ApexException on message handling errors
221      */
222     private void handleIncomingMessages(final Message message, final WebSocket webSocket, EngDepAction enDepAction)
223                     throws ApexException {
224         // Handle each incoming message using the inevitable switch statement for the EngDep
225         // protocol
226         switch (enDepAction) {
227             case GET_ENGINE_SERVICE_INFO:
228                 handleGetEngineServiceInfoMessage(message, webSocket);
229                 break;
230
231             case UPDATE_MODEL:
232                 handleUpdateModelMessage(message, webSocket);
233                 break;
234
235             case START_ENGINE:
236                 handleStartEngineMessage(message, webSocket);
237                 break;
238
239             case STOP_ENGINE:
240                 handleStopEngineMessage(message, webSocket);
241                 break;
242
243             case START_PERIODIC_EVENTS:
244                 handleStartPeriodicEventsMessage(message, webSocket);
245                 break;
246
247             case STOP_PERIODIC_EVENTS:
248                 handleStopPeriodicEventsMessage(message, webSocket);
249                 break;
250
251             case GET_ENGINE_STATUS:
252                 handleEngineStatusMessage(message, webSocket);
253                 break;
254
255             case GET_ENGINE_INFO:
256                 handleEngineInfoMessage(message, webSocket);
257                 break;
258                 
259             default:
260                 throw new ApexException("action " + enDepAction + " on received message not handled by engine");
261         }
262     }
263
264     /**
265      * Handle the get engine service information message.
266      * 
267      * @param message the message
268      * @param webSocket the web socket that the message came on
269      * @throws ApexException on message handling exceptions
270      */
271     private void handleGetEngineServiceInfoMessage(final Message message, final WebSocket webSocket) {
272         final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
273         LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId()
274                         + " . . .");
275         // Send a reply with the engine service information
276         sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
277                         apexService.getEngineKeys(), apexService.getApexModelKey());
278         LOGGER.debug("returned engine service information for engine service "
279                         + apexService.getKey().getId());
280     }
281
282     /**
283      * Handle the update model message.
284      * 
285      * @param message the message
286      * @param webSocket the web socket that the message came on
287      * @throws ApexException on message handling exceptions
288      */
289     private void handleUpdateModelMessage(final Message message, final WebSocket webSocket) throws ApexException {
290         final UpdateModel updateModelMessage = (UpdateModel) message;
291         LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId());
292         // Update the model
293         apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
294                         updateModelMessage.isForceInstall());
295         // Send a reply indicating the message action worked
296         sendReply(webSocket, updateModelMessage, true,
297                         "updated model in engine " + updateModelMessage.getTarget().getId());
298         LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId());
299     }
300
301     /**
302      * Handle the start engine message.
303      * 
304      * @param message the message
305      * @param webSocket the web socket that the message came on
306      * @throws ApexException on message handling exceptions
307      */
308     private void handleStartEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
309         final StartEngine startEngineMessage = (StartEngine) message;
310         LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId());
311         // Start the engine
312         apexService.start(startEngineMessage.getTarget());
313         // Send a reply indicating the message action worked
314         sendReply(webSocket, startEngineMessage, true,
315                         "started engine " + startEngineMessage.getTarget().getId());
316         LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId());
317     }
318
319     /**
320      * Handle the stop engine message.
321      * 
322      * @param message the message
323      * @param webSocket the web socket that the message came on
324      * @throws ApexException on message handling exceptions
325      */
326     private void handleStopEngineMessage(final Message message, final WebSocket webSocket) throws ApexException {
327         final StopEngine stopEngineMessage = (StopEngine) message;
328         LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId());
329         // Stop the engine
330         apexService.stop(stopEngineMessage.getTarget());
331         // Send a reply indicating the message action worked
332         sendReply(webSocket, stopEngineMessage, true,
333                         "stopped engine " + stopEngineMessage.getTarget().getId());
334         LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId());
335     }
336
337     /**
338      * Handle the start periodic events message.
339      * 
340      * @param message the message
341      * @param webSocket the web socket that the message came on
342      * @throws ApexException on message handling exceptions
343      */
344     private void handleStartPeriodicEventsMessage(final Message message, final WebSocket webSocket)
345                     throws ApexException {
346         final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
347         LOGGER.debug("starting periodic events on engine {} . . .",
348                         startPeriodicEventsMessage.getTarget().getId());
349         // Start periodic events with the period specified in the message
350         final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
351         apexService.startPeriodicEvents(period);
352         // Send a reply indicating the message action worked
353         String periodicStartedMessage = "started periodic events on engine "
354                         + startPeriodicEventsMessage.getTarget().getId() + " with period " + period;
355         sendReply(webSocket, startPeriodicEventsMessage, true, periodicStartedMessage);
356         LOGGER.debug(periodicStartedMessage);
357     }
358
359     /**
360      * Handle the stop periodic events message.
361      * 
362      * @param message the message
363      * @param webSocket the web socket that the message came on
364      * @throws ApexException on message handling exceptions
365      */
366     private void handleStopPeriodicEventsMessage(final Message message, final WebSocket webSocket)
367                     throws ApexException {
368         final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
369         LOGGER.debug("stopping periodic events on engine {} . . .",
370                         stopPeriodicEventsMessage.getTarget().getId());
371         // Stop periodic events
372         apexService.stopPeriodicEvents();
373         // Send a reply indicating the message action worked
374         sendReply(webSocket, stopPeriodicEventsMessage, true, "stopped periodic events on engine "
375                         + stopPeriodicEventsMessage.getTarget().getId());
376         LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
377     }
378
379     /**
380      * Handle the engine status message.
381      * 
382      * @param message the message
383      * @param webSocket the web socket that the message came on
384      * @throws ApexException on message handling exceptions
385      */
386     private void handleEngineStatusMessage(final Message message, final WebSocket webSocket) throws ApexException {
387         final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message;
388         LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId());
389         // Send a reply with the engine status
390         sendReply(webSocket, getEngineStatusMessage, true,
391                         apexService.getStatus(getEngineStatusMessage.getTarget()));
392         LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId());
393     }
394
395     /**
396      * Handle the engine information message.
397      * 
398      * @param message the message
399      * @param webSocket the web socket that the message came on
400      * @throws ApexException on message handling exceptions
401      */
402     private void handleEngineInfoMessage(final Message message, final WebSocket webSocket) throws ApexException {
403         final GetEngineInfo getEngineInfo = (GetEngineInfo) message;
404         LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId());
405         // Send a reply with the engine runtime information
406         sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget()));
407         LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId());
408     }
409
410     /**
411      * Send the Response message to the client.
412      *
413      * @param client the client to which to send the response message
414      * @param requestMessage the message to which we are responding
415      * @param result the result indicating success or failure
416      * @param messageData the message data
417      */
418     private void sendReply(final WebSocket client, final Message requestMessage, final boolean result,
419                     final String messageData) {
420         LOGGER.entry(result, messageData);
421
422         if (client == null || !client.isOpen()) {
423             LOGGER.debug("error sending reply {}, client has disconnected", requestMessage.getAction());
424             return;
425         }
426
427         String replyString = "sending " + requestMessage.getAction() + " to web socket "
428                         + client.getRemoteSocketAddress().toString();
429         LOGGER.debug(replyString);
430
431         final Response responseMessage = new Response(requestMessage.getTarget(), result, requestMessage);
432         responseMessage.setMessageData(messageData);
433
434         final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
435         messageHolder.addMessage(responseMessage);
436         client.send(MessagingUtils.serializeObject(messageHolder));
437
438         LOGGER.exit();
439     }
440
441     /**
442      * Send the EngineServiceInfoResponse message to the client.
443      *
444      * @param client the client to which to send the response message
445      * @param requestMessage the message to which we are responding
446      * @param engineServiceKey The key of this engine service
447      * @param engineKeyCollection The keys of the engines in this engine service
448      * @param apexModelKey the apex model key
449      */
450     private void sendServiceInfoReply(final WebSocket client, final Message requestMessage,
451                     final AxArtifactKey engineServiceKey, final Collection<AxArtifactKey> engineKeyCollection,
452                     final AxArtifactKey apexModelKey) {
453         LOGGER.entry();
454         String sendingMessage = "sending " + requestMessage.getAction() + " to web socket "
455                         + client.getRemoteSocketAddress().toString();
456         LOGGER.debug(sendingMessage);
457
458         final EngineServiceInfoResponse responseMessage = new EngineServiceInfoResponse(requestMessage.getTarget(),
459                         true, requestMessage);
460         responseMessage.setMessageData("engine service information");
461         responseMessage.setEngineServiceKey(engineServiceKey);
462         responseMessage.setEngineKeyArray(engineKeyCollection);
463         responseMessage.setApexModelKey(apexModelKey);
464
465         final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
466         messageHolder.addMessage(responseMessage);
467         client.send(MessagingUtils.serializeObject(messageHolder));
468
469         LOGGER.exit();
470     }
471 }