Changes for checkstyle 8.32
[policy/apex-pdp.git] / core / core-deployment / src / main / java / org / onap / policy / apex / core / deployment / EngineServiceFacade.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.core.deployment;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.InputStream;
26 import java.net.URL;
27 import java.util.concurrent.TimeUnit;
28 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
29 import org.onap.policy.apex.core.protocols.Message;
30 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
31 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
32 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
33 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
34 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
35 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
36 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
37 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
38 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
39 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
40 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
41 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
42 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
43 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
44 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
45 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
46 import org.onap.policy.common.utils.resources.ResourceUtils;
47 import org.slf4j.ext.XLogger;
48 import org.slf4j.ext.XLoggerFactory;
49
50 /**
51  * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to
52  * communicate with the engine, with the EngDep protocol being carried on Java web sockets.
53  *
54  * <p>This deployer is a simple command line deployer that reads the communication parameters and the location of
55  * the XML model file as arguments.
56  *
57  * @author Liam Fallon (liam.fallon@ericsson.com)
58  */
59 public class EngineServiceFacade {
60     // Get a reference to the logger
61     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class);
62
63     // Repeated string constants
64     private static final String RECEIVED_FROM_SERVER = " received from server";
65     private static final String FAILED_RESPONSE = "failed response ";
66
67     // The default message timeout and timeout increment (the amount of time between polls) in
68     // milliseconds
69     private static final int CLIENT_START_WAIT_INTERVAL = 100;
70     private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000;
71     private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100;
72
73     // The Apex engine host and EngDep port
74     private final String hostName;
75     private final int port;
76
77     // The deployment client handles the EngDep communication session towards the Apex server
78     private DeploymentClient client = null;
79     private Thread clientThread = null;
80
81     // Information about the Engine service we are connected to
82     private AxArtifactKey engineServiceKey = null;
83     private AxArtifactKey[] engineKeyArray = null;
84     private AxArtifactKey apexModelKey = null;
85
86     /**
87      * Instantiates a new deployer.
88      *
89      * @param hostName the host name of the host running the Apex Engine
90      * @param port the port to use for EngDep communication with the Apex engine
91      */
92     public EngineServiceFacade(final String hostName, final int port) {
93         this.hostName = hostName;
94         this.port = port;
95
96         // Use the deployment client to handle the EngDep communication towards the Apex server.
97         client = new DeploymentClient(hostName, port);
98     }
99
100     /**
101      * Initializes the facade, opens an EngDep communication session with the Apex engine.
102      *
103      * @throws ApexDeploymentException thrown on deployment and communication errors
104      */
105     public void init() throws ApexDeploymentException {
106         try {
107             LOGGER.debug("handshaking with server {}:{} . . .", hostName, port);
108
109             // Use the deployment client to handle the EngDep communication towards the Apex server.
110             // The deployment client runs a thread to monitor the session and to send messages
111             clientThread = new Thread(client);
112             clientThread.start();
113
114             // Wait for the connection to come up
115             while (!client.isStarted()) {
116                 if (clientThread.isAlive()) {
117                     ThreadUtilities.sleep(CLIENT_START_WAIT_INTERVAL);
118                 } else {
119                     LOGGER.error("cound not handshake with server {}:{}", hostName, port);
120                     throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port);
121                 }
122             }
123
124             LOGGER.debug("opened connection to server {}:{} . . .", hostName, port);
125
126             // Get engine service information to see what engines we're dealing with
127             final GetEngineServiceInfo engineServiceInfo = new GetEngineServiceInfo(null);
128             LOGGER.debug("sending get engine service info message {} to server {}:{} . . .", engineServiceInfo,
129                     hostName, port);
130             client.sendMessage(engineServiceInfo);
131             LOGGER.debug("sent get engine service info message to server {}:{} . . .", hostName, port);
132
133             final EngineServiceInfoResponse engineServiceInfoResponse =
134                     (EngineServiceInfoResponse) getResponse(engineServiceInfo);
135             if (engineServiceInfoResponse.isSuccessful()) {
136                 engineServiceKey = engineServiceInfoResponse.getEngineServiceKey();
137                 engineKeyArray = engineServiceInfoResponse.getEngineKeyArray();
138                 apexModelKey = engineServiceInfoResponse.getApexModelKey();
139             }
140         } catch (final Exception e) {
141             LOGGER.error("cound not handshake with server {}:{}", hostName, port, e);
142             client.stopClient();
143             throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port, e);
144         }
145
146     }
147
148     /**
149      * Get the engine service key.
150      *
151      * @return the engine service key
152      */
153     public AxArtifactKey getApexModelKey() {
154         return apexModelKey;
155     }
156
157     /**
158      * Get the keys of the engines on this engine service.
159      *
160      * @return the engine key array
161      */
162     public AxArtifactKey[] getEngineKeyArray() {
163         return engineKeyArray;
164     }
165
166     /**
167      * Get the engine service key.
168      *
169      * @return the engine service key
170      */
171     public AxArtifactKey getKey() {
172         return engineServiceKey;
173     }
174
175     /**
176      * Close the EngDep connection to the Apex server.
177      */
178     public void close() {
179         LOGGER.debug("closing connection to server {}:{} . . .", hostName, port);
180
181         client.stopClient();
182
183         LOGGER.debug("closed connection to server {}:{} . . .", hostName, port);
184     }
185
186     /**
187      * Deploy an Apex model on the Apex engine service.
188      *
189      * @param modelFileName the name of the model file containing the model to deploy
190      * @param ignoreConflicts true if conflicts between context in polices is to be ignored
191      * @param force true if the model is to be applied even if it is incompatible with the existing model
192      * @throws ApexException on Apex errors
193      */
194     public void deployModel(final String modelFileName, final boolean ignoreConflicts, final boolean force)
195             throws ApexException {
196         if (engineServiceKey == null || engineKeyArray == null || engineKeyArray.length == 0) {
197             LOGGER.error("cound not deploy apex model, deployer is not initialized");
198             throw new ApexDeploymentException("cound not deploy apex model, deployer is not initialized");
199         }
200
201         // Get the model file as a string
202         URL apexModelUrl = ResourceUtils.getLocalFile(modelFileName);
203         if (apexModelUrl == null) {
204             apexModelUrl = ResourceUtils.getUrlResource(modelFileName);
205             if (apexModelUrl == null) {
206                 LOGGER.error("cound not create apex model, could not read from file {}", modelFileName);
207                 throw new ApexDeploymentException(
208                         "cound not create apex model, could not read from file " + modelFileName);
209             }
210         }
211
212         try {
213             deployModel(modelFileName, apexModelUrl.openStream(), ignoreConflicts, force);
214         } catch (final Exception deployException) {
215             final String errorMessage = "could not deploy apex model from " + modelFileName;
216             LOGGER.error(errorMessage, deployException);
217             throw new ApexDeploymentException(errorMessage, deployException);
218         }
219     }
220
221     /**
222      * Deploy an Apex model on the Apex engine service.
223      *
224      * @param modelFileName the name of the model file containing the model to deploy
225      * @param modelInputStream the stream that holds the Apex model
226      * @param ignoreConflicts true if conflicts between context in polices is to be ignored
227      * @param force true if the model is to be applied even if it is incompatible with the existing model
228      * @throws ApexException on model deployment errors
229      */
230     public void deployModel(final String modelFileName, final InputStream modelInputStream,
231             final boolean ignoreConflicts, final boolean force) throws ApexException {
232         // Read the policy model from the stream
233         final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
234         modelReader.setValidateFlag(!ignoreConflicts);
235         final AxPolicyModel apexPolicyModel = modelReader.read(modelInputStream);
236
237         // Deploy the model
238         deployModel(apexPolicyModel, ignoreConflicts, force);
239     }
240
241     /**
242      * Deploy an Apex model on the Apex engine service.
243      *
244      * @param apexPolicyModel the name of the model to deploy
245      * @param ignoreConflicts true if conflicts between context in polices is to be ignored
246      * @param force true if the model is to be applied even if it is incompatible with the existing model
247      * @throws ApexException on model deployment errors
248      */
249     public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force)
250             throws ApexException {
251         // Write the model into a byte array
252         final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
253         final ApexModelWriter<AxPolicyModel> modelWriter = new ApexModelWriter<>(AxPolicyModel.class);
254         modelWriter.write(apexPolicyModel, baOutputStream);
255
256         // Create and send Update message
257         final UpdateModel umMessage =
258                 new UpdateModel(engineServiceKey, baOutputStream.toString(), ignoreConflicts, force);
259
260         LOGGER.debug("sending update message {} to server {}:{} . . .", umMessage, hostName, port);
261         client.sendMessage(umMessage);
262         LOGGER.debug("sent update message to server {}:{} . . .", hostName, port);
263
264         // Check if we got a response
265         final Response response = getResponse(umMessage);
266         if (!response.isSuccessful()) {
267             LOGGER.warn(FAILED_RESPONSE + "{} received from server {}:{}", response.getMessageData(), hostName, port);
268             throw new ApexException(
269                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port);
270         }
271     }
272
273     /**
274      * Start an Apex engine on the engine service.
275      *
276      * @param engineKey the key of the engine to start
277      * @throws ApexDeploymentException on messaging errors
278      */
279     public void startEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
280         final StartEngine startEngineMessage = new StartEngine(engineKey);
281         LOGGER.debug("sending start engine {} to server {}:{} . . .", startEngineMessage, hostName, port);
282         client.sendMessage(startEngineMessage);
283         LOGGER.debug("sent start engine message to server {}:{} . . .", hostName, port);
284
285         // Check if we got a response
286         final Response response = getResponse(startEngineMessage);
287         if (!response.isSuccessful()) {
288             final String message =
289                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
290             LOGGER.warn(message);
291             throw new ApexDeploymentException(message);
292         }
293     }
294
295     /**
296      * Stop an Apex engine on the engine service.
297      *
298      * @param engineKey the key of the engine to stop
299      * @throws ApexDeploymentException on messaging errors
300      */
301     public void stopEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
302         final StopEngine stopEngineMessage = new StopEngine(engineKey);
303         LOGGER.debug("sending stop engine {} to server {}:{} . . .", stopEngineMessage, hostName, port);
304         client.sendMessage(stopEngineMessage);
305         LOGGER.debug("sent stop engine message to server {}:{} . . .", hostName, port);
306
307         // Check if we got a response
308         final Response response = getResponse(stopEngineMessage);
309         if (!response.isSuccessful()) {
310             final String message =
311                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
312             LOGGER.warn(message);
313             throw new ApexDeploymentException(message);
314         }
315     }
316
317     /**
318      * Start periodic events on an Apex engine on the engine service.
319      *
320      * @param engineKey the key of the engine to start periodic events on
321      * @param period the period in milliseconds between periodic events
322      * @throws ApexDeploymentException on messaging errors
323      */
324     public void startPerioidicEvents(final AxArtifactKey engineKey, final long period) throws ApexDeploymentException {
325         final StartPeriodicEvents startPerioidicEventsMessage = new StartPeriodicEvents(engineKey);
326         startPerioidicEventsMessage.setMessageData(Long.toString(period));
327         LOGGER.debug("sending start perioidic events {} to server {}:{} . . .", startPerioidicEventsMessage, hostName,
328                 port);
329         client.sendMessage(startPerioidicEventsMessage);
330         LOGGER.debug("sent start perioidic events message to server {}:{} . . .", hostName, port);
331
332         // Check if we got a response
333         final Response response = getResponse(startPerioidicEventsMessage);
334         if (!response.isSuccessful()) {
335             final String message =
336                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
337             LOGGER.warn(message);
338             throw new ApexDeploymentException(message);
339         }
340     }
341
342     /**
343      * Stop periodic events on an Apex engine on the engine service.
344      *
345      * @param engineKey the key of the engine to stop periodic events on
346      * @throws ApexDeploymentException on messaging errors
347      */
348     public void stopPerioidicEvents(final AxArtifactKey engineKey) throws ApexDeploymentException {
349         final StopPeriodicEvents stopPerioidicEventsMessage = new StopPeriodicEvents(engineKey);
350         LOGGER.debug("sending stop perioidic events {} to server {}:{} . . .", stopPerioidicEventsMessage, hostName,
351                 port);
352         client.sendMessage(stopPerioidicEventsMessage);
353         LOGGER.debug("sent stop perioidic events message to server {}:{} . . .", hostName, port);
354
355         // Check if we got a response
356         final Response response = getResponse(stopPerioidicEventsMessage);
357         if (!response.isSuccessful()) {
358             final String message =
359                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
360             LOGGER.warn(message);
361             throw new ApexDeploymentException(message);
362         }
363     }
364
365     /**
366      * Get the status of an Apex engine.
367      *
368      * @param engineKey the key of the engine to get the status of
369      * @return an engine model containing the status of the engine for the given key
370      * @throws ApexException the apex exception
371      */
372     public AxEngineModel getEngineStatus(final AxArtifactKey engineKey) throws ApexException {
373         final GetEngineStatus engineStatusMessage = new GetEngineStatus(engineKey);
374         LOGGER.debug("sending get engine status message {} to server {}:{} . . .", engineStatusMessage, hostName, port);
375         client.sendMessage(engineStatusMessage);
376         LOGGER.debug("sent get engine status message to server {}:{} . . .", hostName, port);
377
378         // Check if we got a response
379         final Response response = getResponse(engineStatusMessage);
380         if (!response.isSuccessful()) {
381             final String message =
382                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
383             LOGGER.warn(message);
384             throw new ApexException(message);
385         }
386
387         final ByteArrayInputStream baInputStream = new ByteArrayInputStream(response.getMessageData().getBytes());
388         final ApexModelReader<AxEngineModel> modelReader = new ApexModelReader<>(AxEngineModel.class);
389         modelReader.setValidateFlag(false);
390         return modelReader.read(baInputStream);
391     }
392
393     /**
394      * Get the runtime information of an Apex engine.
395      *
396      * @param engineKey the key of the engine to get information for
397      * @return an engine model containing information on the engine for the given key
398      * @throws ApexException the apex exception
399      */
400     public String getEngineInfo(final AxArtifactKey engineKey) throws ApexException {
401         final GetEngineInfo engineInfoMessage = new GetEngineInfo(engineKey);
402         LOGGER.debug("sending get engine information message {} to server {}:{} . . .", engineInfoMessage, hostName,
403                 port);
404         client.sendMessage(engineInfoMessage);
405         LOGGER.debug("sent get engine information message to server {}:{} . . .", hostName, port);
406
407         // Check if we got a response
408         final Response response = getResponse(engineInfoMessage);
409         if (!response.isSuccessful()) {
410             final String message =
411                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
412             LOGGER.warn(message);
413             throw new ApexException(message);
414         }
415
416         return response.getMessageData();
417     }
418
419     /**
420      * Check the response to a model deployment message from the Apex server.
421      *
422      * @param sentMessage the sent message
423      * @return the response message
424      * @throws ApexDeploymentException the apex deployment exception
425      */
426     private Response getResponse(final Message sentMessage) throws ApexDeploymentException {
427         // Get the amount of milliseconds we should wait for a timeout
428         int timeoutTime = sentMessage.getReplyTimeout();
429         if (timeoutTime <= 0) {
430             timeoutTime = REPLY_MESSAGE_TIMEOUT_DEFAULT;
431         }
432
433         // Wait for the required amount of milliseconds for the response from the Apex server
434         Message receivedMessage = null;
435         for (int timeWaitedSoFar = 0; receivedMessage == null && timeWaitedSoFar < timeoutTime; timeWaitedSoFar +=
436                 REPLY_MESSAGE_TIMEOUT_INCREMENT) {
437             try {
438                 receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS);
439             } catch (final InterruptedException e) {
440                 // restore the interrupt status
441                 Thread.currentThread().interrupt();
442                 LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e);
443                 throw new ApexDeploymentException(
444                         "reception of response from server interrupted " + hostName + ':' + port, e);
445             }
446         }
447
448         // Check if response to sent message
449         if (receivedMessage == null) {
450             LOGGER.warn("no response received to sent message " + sentMessage.getAction());
451             throw new ApexDeploymentException("no response received to sent message " + sentMessage.getAction());
452         }
453
454         // Check instance is a response message
455         if (!(receivedMessage instanceof Response)) {
456             LOGGER.warn("response received from server is of incorrect type {}, should be of type {}",
457                     receivedMessage.getClass().getName(), Response.class.getName());
458             throw new ApexDeploymentException("response received from server is of incorrect type "
459                     + receivedMessage.getClass().getName() + ", should be of type " + Response.class.getName());
460         }
461
462         // Cast the response message
463         final Response responseMessage = (Response) receivedMessage;
464
465         // Check if response to sent message
466         if (!responseMessage.getResponseTo().equals(sentMessage)) {
467             LOGGER.warn("response received is not response to sent message " + sentMessage.getAction());
468             throw new ApexDeploymentException(
469                     "response received is not correct response to sent message " + sentMessage.getAction());
470         }
471
472         // Check if successful
473         if (responseMessage.isSuccessful()) {
474             LOGGER.debug("response received: {} message was succssful: {}", sentMessage.getAction(),
475                     responseMessage.getMessageData());
476         } else {
477             LOGGER.debug("response received: {} message failed: {}", sentMessage.getAction(),
478                     responseMessage.getMessageData());
479         }
480
481         return responseMessage;
482     }
483
484     /**
485      * Set a deployment client for this facade. This method is for testing.
486      * 
487      * @param deploymentClient the deployment client to set
488      */
489     protected void setDeploymentClient(final DeploymentClient deploymentClient) {
490         this.client = deploymentClient;
491     }
492 }