3840e915d84f5aef61427e5c672e5384de2adcc7
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.engdep;
22
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.TimeUnit;
28
29 import org.java_websocket.WebSocket;
30 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
31 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
32 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
33 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.core.protocols.Message;
36 import org.onap.policy.apex.core.protocols.engdep.EngDepAction;
37 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
38 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
39 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
40 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
41 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
42 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
43 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
44 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
45 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
46 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
47 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
48 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
49 import org.onap.policy.apex.service.engine.runtime.EngineService;
50 import org.slf4j.ext.XLogger;
51 import org.slf4j.ext.XLoggerFactory;
52
53 import com.google.common.eventbus.Subscribe;
54
55 /**
56  * The listener interface for receiving engDepMessage events. The class that is interested in
57  * processing a engDepMessage event implements this interface, and the object created with that
58  * class is registered with a component using the component's <code>addEngDepMessageListener</code>
59  * method. When the engDepMessage event occurs, that object's appropriate method is invoked.
60  * 
61  * <p>This class uses a queue to buffer incoming messages. When the listener is called, it places the
62  * incoming message on the queue. A thread runs which removes the messages from the queue and
63  * forwards them to the Apex engine.
64  *
65  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
66  */
67 public class EngDepMessageListener implements MessageListener<Message>, Runnable {
68     private static final int LISTENER_STOP_WAIT_INTERVAL = 10;
69
70     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngDepMessageListener.class);
71
72     // The timeout to wait between queue poll timeouts in milliseconds
73     private static final long QUEUE_POLL_TIMEOUT = 50;
74
75     // The Apex service itself
76     private final EngineService apexService;
77
78     // The message listener thread and stopping flag
79     private Thread messageListenerThread;
80     private boolean stopOrderedFlag = false;
81
82     // The message queue is used to hold messages prior to forwarding to Apex
83     private final BlockingQueue<MessageBlock<Message>> messageQueue = new LinkedBlockingDeque<>();
84
85     /**
86      * Instantiates a new EngDep message listener for listening for messages coming in from the
87      * Deployment client. The <code>apexService</code> is the Apex service to send the messages
88      * onto.
89      *
90      * @param apexService the Apex engine service
91      */
92     protected EngDepMessageListener(final EngineService apexService) {
93         this.apexService = apexService;
94     }
95
96     /**
97      * This method is an implementation of the message listener. It receives a message and places it
98      * on the queue for processing by the message listening thread.
99      *
100      * @param data the data
101      * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage
102      *      (org.onap.policy.apex.core.infrastructure.messaging.impl.ws.data.Data)
103      */
104     @Subscribe
105     @Override
106     public void onMessage(final MessageBlock<Message> data) {
107         if (LOGGER.isDebugEnabled()) {
108             LOGGER.debug("message received from client application {} port {}",
109                     data.getConnection().getRemoteSocketAddress().getAddress(),
110                     data.getConnection().getRemoteSocketAddress().getPort());
111         }
112         messageQueue.add(data);
113     }
114
115     /*
116      * (non-Javadoc)
117      *
118      * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.
119      * String)
120      */
121     @Override
122     public void onMessage(final String messageString) {
123         throw new UnsupportedOperationException("String messages are not supported on the EngDep protocol");
124     }
125
126     /**
127      * This method gets a new message listening thread from the thread factory and starts it.
128      */
129     public void startProcessorThread() {
130         LOGGER.entry();
131         messageListenerThread = new Thread(this);
132         messageListenerThread.setDaemon(true);
133         messageListenerThread.start();
134         LOGGER.exit();
135     }
136
137     /**
138      * Stops the message listening threads.
139      */
140     public void stopProcessorThreads() {
141         LOGGER.entry();
142         stopOrderedFlag = true;
143
144         while (messageListenerThread.isAlive()) {
145             ThreadUtilities.sleep(LISTENER_STOP_WAIT_INTERVAL);
146         }
147         LOGGER.exit();
148     }
149
150     /**
151      * Runs the message listening thread. Here, the messages come in on the message queue and are
152      * processed one by one
153      */
154     @Override
155     public void run() {
156         // Take messages off the queue and forward them to the Apex engine
157         while (messageListenerThread.isAlive() && !stopOrderedFlag) {
158             try {
159                 final MessageBlock<Message> data = messageQueue.poll(QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
160                 if (data != null) {
161                     final List<Message> messages = data.getMessages();
162                     for (final Message message : messages) {
163                         handleMessage(message, data.getConnection());
164                     }
165                 }
166             } catch (final InterruptedException e) {
167                 // restore the interrupt status
168                 Thread.currentThread().interrupt();
169                 LOGGER.debug("message listener execution has been interrupted");
170                 break;
171             }
172         }
173     }
174
175     /**
176      * This method handles EngDep messages as they come in. It uses the inevitable switch statement
177      * to handle the messages.
178      *
179      * @param message the incoming EngDep message
180      * @param webSocket the web socket on which the message came in
181      */
182     private void handleMessage(final Message message, final WebSocket webSocket) {
183         LOGGER.entry(webSocket.getRemoteSocketAddress().toString());
184         if (message.getAction() == null) {
185             // This is a response message
186             return;
187         }
188
189         try {
190             LOGGER.debug("Manager action {} being applied to engine", message.getAction());
191
192             // Get and check the incoming action for validity
193             EngDepAction enDepAction = null;
194             if (message.getAction() instanceof EngDepAction) {
195                 enDepAction = (EngDepAction) message.getAction();
196             } else {
197                 throw new ApexException(message.getAction().getClass().getName()
198                         + "action on received message invalid, action must be of type \"EnDepAction\"");
199             }
200
201             // Handle each incoming message using the inevitable switch statement for the EngDep
202             // protocol
203             switch (enDepAction) {
204                 case GET_ENGINE_SERVICE_INFO:
205                     final GetEngineServiceInfo engineServiceInformationMessage = (GetEngineServiceInfo) message;
206                     LOGGER.debug("getting engine service information for engine service " + apexService.getKey().getId()
207                             + " . . .");
208                     // Send a reply with the engine service information
209                     sendServiceInfoReply(webSocket, engineServiceInformationMessage, apexService.getKey(),
210                             apexService.getEngineKeys(), apexService.getApexModelKey());
211                     LOGGER.debug(
212                             "returned engine service information for engine service " + apexService.getKey().getId());
213                     break;
214
215                 case UPDATE_MODEL:
216                     final UpdateModel updateModelMessage = (UpdateModel) message;
217                     LOGGER.debug("updating model in engine {} . . .", updateModelMessage.getTarget().getId());
218                     // Update the model
219                     apexService.updateModel(updateModelMessage.getTarget(), updateModelMessage.getMessageData(),
220                             updateModelMessage.isForceInstall());
221                     // Send a reply indicating the message action worked
222                     sendReply(webSocket, updateModelMessage, true,
223                             "updated model in engine " + updateModelMessage.getTarget().getId());
224                     LOGGER.debug("updated model in engine service {}", updateModelMessage.getTarget().getId());
225                     break;
226
227                 case START_ENGINE:
228                     final StartEngine startEngineMessage = (StartEngine) message;
229                     LOGGER.debug("starting engine {} . . .", startEngineMessage.getTarget().getId());
230                     // Start the engine
231                     apexService.start(startEngineMessage.getTarget());
232                     // Send a reply indicating the message action worked
233                     sendReply(webSocket, startEngineMessage, true,
234                             "started engine " + startEngineMessage.getTarget().getId());
235                     LOGGER.debug("started engine {}", startEngineMessage.getTarget().getId());
236                     break;
237
238                 case STOP_ENGINE:
239                     final StopEngine stopEngineMessage = (StopEngine) message;
240                     LOGGER.debug("stopping engine {} . . .", stopEngineMessage.getTarget().getId());
241                     // Stop the engine
242                     apexService.stop(stopEngineMessage.getTarget());
243                     // Send a reply indicating the message action worked
244                     sendReply(webSocket, stopEngineMessage, true,
245                             "stopped engine " + stopEngineMessage.getTarget().getId());
246                     LOGGER.debug("stopping engine {}", stopEngineMessage.getTarget().getId());
247                     break;
248
249                 case START_PERIODIC_EVENTS:
250                     final StartPeriodicEvents startPeriodicEventsMessage = (StartPeriodicEvents) message;
251                     LOGGER.debug("starting periodic events on engine {} . . .",
252                             startPeriodicEventsMessage.getTarget().getId());
253                     // Start periodic events with the period specified in the message
254                     final Long period = Long.parseLong(startPeriodicEventsMessage.getMessageData());
255                     apexService.startPeriodicEvents(period);
256                     // Send a reply indicating the message action worked
257                     sendReply(webSocket, startPeriodicEventsMessage, true, "started periodic events on engine "
258                             + startPeriodicEventsMessage.getTarget().getId() + " with period " + period);
259                     LOGGER.debug("started periodic events on engine " + startPeriodicEventsMessage.getTarget().getId()
260                             + " with period " + period);
261                     break;
262
263                 case STOP_PERIODIC_EVENTS:
264                     final StopPeriodicEvents stopPeriodicEventsMessage = (StopPeriodicEvents) message;
265                     LOGGER.debug("stopping periodic events on engine {} . . .",
266                             stopPeriodicEventsMessage.getTarget().getId());
267                     // Stop periodic events
268                     apexService.stopPeriodicEvents();
269                     // Send a reply indicating the message action worked
270                     sendReply(webSocket, stopPeriodicEventsMessage, true,
271                             "stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
272                     LOGGER.debug("stopped periodic events on engine " + stopPeriodicEventsMessage.getTarget().getId());
273                     break;
274
275                 case GET_ENGINE_STATUS:
276                     final GetEngineStatus getEngineStatusMessage = (GetEngineStatus) message;
277                     LOGGER.debug("getting status for engine{} . . .", getEngineStatusMessage.getTarget().getId());
278                     // Send a reply with the engine status
279                     sendReply(webSocket, getEngineStatusMessage, true,
280                             apexService.getStatus(getEngineStatusMessage.getTarget()));
281                     LOGGER.debug("returned status for engine {}", getEngineStatusMessage.getTarget().getId());
282                     break;
283
284                 case GET_ENGINE_INFO:
285                     final GetEngineInfo getEngineInfo = (GetEngineInfo) message;
286                     LOGGER.debug("getting runtime information for engine {} . . .", getEngineInfo.getTarget().getId());
287                     // Send a reply with the engine runtime information
288                     sendReply(webSocket, getEngineInfo, true, apexService.getRuntimeInfo(getEngineInfo.getTarget()));
289                     LOGGER.debug("returned runtime information for engine {}", getEngineInfo.getTarget().getId());
290                     break;
291                 case RESPONSE:
292                     throw new ApexException("RESPONSE action on received message not handled by engine");
293
294                 default:
295                     break;
296             }
297         } catch (final ApexException e) {
298             LOGGER.warn("apex failed to execute message", e);
299             sendReply(webSocket, message, false, e.getCascadedMessage());
300         } catch (final Exception e) {
301             LOGGER.warn("system failure executing message", e);
302             sendReply(webSocket, message, false, e.getMessage());
303         }
304         LOGGER.exit();
305     }
306
307     /**
308      * Send the Response message to the client.
309      *
310      * @param client the client to which to send the response message
311      * @param requestMessage the message to which we are responding
312      * @param result the result indicating success or failure
313      * @param messageData the message data
314      */
315     private void sendReply(final WebSocket client, final Message requestMessage, final boolean result,
316             final String messageData) {
317         LOGGER.entry(result, messageData);
318
319         if (client == null || !client.isOpen()) {
320             LOGGER.debug("error sending reply {}, client has disconnected", requestMessage.getAction());
321             return;
322         }
323
324         LOGGER.debug("sending {} to web socket {}", requestMessage.getAction(),
325                 client.getRemoteSocketAddress().toString());
326
327         final Response responseMessage = new Response(requestMessage.getTarget(), result, requestMessage);
328         responseMessage.setMessageData(messageData);
329
330         final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
331         messageHolder.addMessage(responseMessage);
332         client.send(MessagingUtils.serializeObject(messageHolder));
333
334         LOGGER.exit();
335     }
336
337     /**
338      * Send the EngineServiceInfoResponse message to the client.
339      *
340      * @param client the client to which to send the response message
341      * @param requestMessage the message to which we are responding
342      * @param engineServiceKey The key of this engine service
343      * @param engineKeyCollection The keys of the engines in this engine service
344      * @param apexModelKey the apex model key
345      */
346     private void sendServiceInfoReply(final WebSocket client, final Message requestMessage,
347             final AxArtifactKey engineServiceKey, final Collection<AxArtifactKey> engineKeyCollection,
348             final AxArtifactKey apexModelKey) {
349         LOGGER.entry();
350         LOGGER.debug("sending {} to web socket {}", requestMessage.getAction(),
351                 client.getRemoteSocketAddress().toString());
352
353         final EngineServiceInfoResponse responseMessage =
354                 new EngineServiceInfoResponse(requestMessage.getTarget(), true, requestMessage);
355         responseMessage.setMessageData("engine service information");
356         responseMessage.setEngineServiceKey(engineServiceKey);
357         responseMessage.setEngineKeyArray(engineKeyCollection);
358         responseMessage.setApexModelKey(apexModelKey);
359
360         final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
361         messageHolder.addMessage(responseMessage);
362         client.send(MessagingUtils.serializeObject(messageHolder));
363
364         LOGGER.exit();
365     }
366 }