f9848534325dff6248067287117fa6e99e644258
[policy/apex-pdp.git] / core / core-deployment / src / main / java / org / onap / policy / apex / core / deployment / EngineServiceFacade.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.core.deployment;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.InputStream;
26 import java.net.URL;
27 import java.util.concurrent.TimeUnit;
28
29 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
30 import org.onap.policy.apex.core.protocols.Message;
31 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
32 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
33 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
34 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
35 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
36 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
37 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
38 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
39 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
40 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
41 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
42 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
43 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
44 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
45 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
46 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
47 import org.onap.policy.common.utils.resources.ResourceUtils;
48 import org.slf4j.ext.XLogger;
49 import org.slf4j.ext.XLoggerFactory;
50
51 /**
52  * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to
53  * communicate with the engine, with the EngDep protocol being carried on Java web sockets.
54  *
55  * <p>This deployer is a simple command line deployer that reads the communication parameters and the location of
56  * the XML model file as arguments.
57  *
58  * @author Liam Fallon (liam.fallon@ericsson.com)
59  */
60 public class EngineServiceFacade {
61     // Get a reference to the logger
62     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class);
63
64     // Repeated string constants
65     private static final String RECEIVED_FROM_SERVER = " received from server";
66     private static final String FAILED_RESPONSE = "failed response ";
67
68     // The default message timeout and timeout increment (the amount of time between polls) in
69     // milliseconds
70     private static final int CLIENT_START_WAIT_INTERVAL = 100;
71     private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000;
72     private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100;
73
74     // The Apex engine host and EngDep port
75     private final String hostName;
76     private final int port;
77
78     // The deployment client handles the EngDep communication session towards the Apex server
79     private DeploymentClient client = null;
80     private Thread clientThread = null;
81
82     // Information about the Engine service we are connected to
83     private AxArtifactKey engineServiceKey = null;
84     private AxArtifactKey[] engineKeyArray = null;
85     private AxArtifactKey apexModelKey = null;
86
87     /**
88      * Instantiates a new deployer.
89      *
90      * @param hostName the host name of the host running the Apex Engine
91      * @param port the port to use for EngDep communication with the Apex engine
92      */
93     public EngineServiceFacade(final String hostName, final int port) {
94         this.hostName = hostName;
95         this.port = port;
96
97         // Use the deployment client to handle the EngDep communication towards the Apex server.
98         client = new DeploymentClient(hostName, port);
99     }
100
101     /**
102      * Initializes the facade, opens an EngDep communication session with the Apex engine.
103      *
104      * @throws ApexDeploymentException thrown on deployment and communication errors
105      */
106     public void init() throws ApexDeploymentException {
107         try {
108             LOGGER.debug("handshaking with server {}:{} . . .", hostName, port);
109
110             // Use the deployment client to handle the EngDep communication towards the Apex server.
111             // The deployment client runs a thread to monitor the session and to send messages
112             clientThread = new Thread(client);
113             clientThread.start();
114
115             // Wait for the connection to come up
116             while (!client.isStarted()) {
117                 if (clientThread.isAlive()) {
118                     ThreadUtilities.sleep(CLIENT_START_WAIT_INTERVAL);
119                 } else {
120                     LOGGER.error("cound not handshake with server {}:{}", hostName, port);
121                     throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port);
122                 }
123             }
124
125             LOGGER.debug("opened connection to server {}:{} . . .", hostName, port);
126
127             // Get engine service information to see what engines we're dealing with
128             final GetEngineServiceInfo engineServiceInfo = new GetEngineServiceInfo(null);
129             LOGGER.debug("sending get engine service info message {} to server {}:{} . . .", engineServiceInfo,
130                     hostName, port);
131             client.sendMessage(engineServiceInfo);
132             LOGGER.debug("sent get engine service info message to server {}:{} . . .", hostName, port);
133
134             final EngineServiceInfoResponse engineServiceInfoResponse =
135                     (EngineServiceInfoResponse) getResponse(engineServiceInfo);
136             if (engineServiceInfoResponse.isSuccessful()) {
137                 engineServiceKey = engineServiceInfoResponse.getEngineServiceKey();
138                 engineKeyArray = engineServiceInfoResponse.getEngineKeyArray();
139                 apexModelKey = engineServiceInfoResponse.getApexModelKey();
140             }
141         } catch (final Exception e) {
142             LOGGER.error("cound not handshake with server {}:{}", hostName, port, e);
143             client.stopClient();
144             throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port, e);
145         }
146
147     }
148
149     /**
150      * Get the engine service key.
151      *
152      * @return the engine service key
153      */
154     public AxArtifactKey getApexModelKey() {
155         return apexModelKey;
156     }
157
158     /**
159      * Get the keys of the engines on this engine service.
160      *
161      * @return the engine key array
162      */
163     public AxArtifactKey[] getEngineKeyArray() {
164         return engineKeyArray;
165     }
166
167     /**
168      * Get the engine service key.
169      *
170      * @return the engine service key
171      */
172     public AxArtifactKey getKey() {
173         return engineServiceKey;
174     }
175
176     /**
177      * Close the EngDep connection to the Apex server.
178      */
179     public void close() {
180         LOGGER.debug("closing connection to server {}:{} . . .", hostName, port);
181
182         client.stopClient();
183
184         LOGGER.debug("closed connection to server {}:{} . . .", hostName, port);
185     }
186
187     /**
188      * Deploy an Apex model on the Apex engine service.
189      *
190      * @param modelFileName the name of the model file containing the model to deploy
191      * @param ignoreConflicts true if conflicts between context in polices is to be ignored
192      * @param force true if the model is to be applied even if it is incompatible with the existing model
193      * @throws ApexException on Apex errors
194      */
195     public void deployModel(final String modelFileName, final boolean ignoreConflicts, final boolean force)
196             throws ApexException {
197         if (engineServiceKey == null || engineKeyArray == null || engineKeyArray.length == 0) {
198             LOGGER.error("cound not deploy apex model, deployer is not initialized");
199             throw new ApexDeploymentException("cound not deploy apex model, deployer is not initialized");
200         }
201
202         // Get the model file as a string
203         URL apexModelUrl = ResourceUtils.getLocalFile(modelFileName);
204         if (apexModelUrl == null) {
205             apexModelUrl = ResourceUtils.getUrlResource(modelFileName);
206             if (apexModelUrl == null) {
207                 LOGGER.error("cound not create apex model, could not read from file {}", modelFileName);
208                 throw new ApexDeploymentException(
209                         "cound not create apex model, could not read from file " + modelFileName);
210             }
211         }
212
213         try {
214             deployModel(modelFileName, apexModelUrl.openStream(), ignoreConflicts, force);
215         } catch (final Exception deployException) {
216             final String errorMessage = "could not deploy apex model from " + modelFileName;
217             LOGGER.error(errorMessage, deployException);
218             throw new ApexDeploymentException(errorMessage, deployException);
219         }
220     }
221
222     /**
223      * Deploy an Apex model on the Apex engine service.
224      *
225      * @param modelFileName the name of the model file containing the model to deploy
226      * @param modelInputStream the stream that holds the Apex model
227      * @param ignoreConflicts true if conflicts between context in polices is to be ignored
228      * @param force true if the model is to be applied even if it is incompatible with the existing model
229      * @throws ApexException on model deployment errors
230      */
231     public void deployModel(final String modelFileName, final InputStream modelInputStream,
232             final boolean ignoreConflicts, final boolean force) throws ApexException {
233         // Read the policy model from the stream
234         final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
235         modelReader.setValidateFlag(!ignoreConflicts);
236         final AxPolicyModel apexPolicyModel = modelReader.read(modelInputStream);
237
238         // Deploy the model
239         deployModel(apexPolicyModel, ignoreConflicts, force);
240     }
241
242     /**
243      * Deploy an Apex model on the Apex engine service.
244      *
245      * @param apexPolicyModel the name of the model to deploy
246      * @param ignoreConflicts true if conflicts between context in polices is to be ignored
247      * @param force true if the model is to be applied even if it is incompatible with the existing model
248      * @throws ApexException on model deployment errors
249      */
250     public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force)
251             throws ApexException {
252         // Write the model into a byte array
253         final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
254         final ApexModelWriter<AxPolicyModel> modelWriter = new ApexModelWriter<>(AxPolicyModel.class);
255         modelWriter.write(apexPolicyModel, baOutputStream);
256
257         // Create and send Update message
258         final UpdateModel umMessage =
259                 new UpdateModel(engineServiceKey, baOutputStream.toString(), ignoreConflicts, force);
260
261         LOGGER.debug("sending update message {} to server {}:{} . . .", umMessage, hostName, port);
262         client.sendMessage(umMessage);
263         LOGGER.debug("sent update message to server {}:{} . . .", hostName, port);
264
265         // Check if we got a response
266         final Response response = getResponse(umMessage);
267         if (!response.isSuccessful()) {
268             LOGGER.warn(FAILED_RESPONSE + "{} received from server {}:{}", response.getMessageData(), hostName, port);
269             throw new ApexException(
270                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port);
271         }
272     }
273
274     /**
275      * Start an Apex engine on the engine service.
276      *
277      * @param engineKey the key of the engine to start
278      * @throws ApexDeploymentException on messaging errors
279      */
280     public void startEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
281         final StartEngine startEngineMessage = new StartEngine(engineKey);
282         LOGGER.debug("sending start engine {} to server {}:{} . . .", startEngineMessage, hostName, port);
283         client.sendMessage(startEngineMessage);
284         LOGGER.debug("sent start engine message to server {}:{} . . .", hostName, port);
285
286         // Check if we got a response
287         final Response response = getResponse(startEngineMessage);
288         if (!response.isSuccessful()) {
289             final String message =
290                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
291             LOGGER.warn(message);
292             throw new ApexDeploymentException(message);
293         }
294     }
295
296     /**
297      * Stop an Apex engine on the engine service.
298      *
299      * @param engineKey the key of the engine to stop
300      * @throws ApexDeploymentException on messaging errors
301      */
302     public void stopEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
303         final StopEngine stopEngineMessage = new StopEngine(engineKey);
304         LOGGER.debug("sending stop engine {} to server {}:{} . . .", stopEngineMessage, hostName, port);
305         client.sendMessage(stopEngineMessage);
306         LOGGER.debug("sent stop engine message to server {}:{} . . .", hostName, port);
307
308         // Check if we got a response
309         final Response response = getResponse(stopEngineMessage);
310         if (!response.isSuccessful()) {
311             final String message =
312                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
313             LOGGER.warn(message);
314             throw new ApexDeploymentException(message);
315         }
316     }
317
318     /**
319      * Start periodic events on an Apex engine on the engine service.
320      *
321      * @param engineKey the key of the engine to start periodic events on
322      * @param period the period in milliseconds between periodic events
323      * @throws ApexDeploymentException on messaging errors
324      */
325     public void startPerioidicEvents(final AxArtifactKey engineKey, final long period) throws ApexDeploymentException {
326         final StartPeriodicEvents startPerioidicEventsMessage = new StartPeriodicEvents(engineKey);
327         startPerioidicEventsMessage.setMessageData(Long.toString(period));
328         LOGGER.debug("sending start perioidic events {} to server {}:{} . . .", startPerioidicEventsMessage, hostName,
329                 port);
330         client.sendMessage(startPerioidicEventsMessage);
331         LOGGER.debug("sent start perioidic events message to server {}:{} . . .", hostName, port);
332
333         // Check if we got a response
334         final Response response = getResponse(startPerioidicEventsMessage);
335         if (!response.isSuccessful()) {
336             final String message =
337                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
338             LOGGER.warn(message);
339             throw new ApexDeploymentException(message);
340         }
341     }
342
343     /**
344      * Stop periodic events on an Apex engine on the engine service.
345      *
346      * @param engineKey the key of the engine to stop periodic events on
347      * @throws ApexDeploymentException on messaging errors
348      */
349     public void stopPerioidicEvents(final AxArtifactKey engineKey) throws ApexDeploymentException {
350         final StopPeriodicEvents stopPerioidicEventsMessage = new StopPeriodicEvents(engineKey);
351         LOGGER.debug("sending stop perioidic events {} to server {}:{} . . .", stopPerioidicEventsMessage, hostName,
352                 port);
353         client.sendMessage(stopPerioidicEventsMessage);
354         LOGGER.debug("sent stop perioidic events message to server {}:{} . . .", hostName, port);
355
356         // Check if we got a response
357         final Response response = getResponse(stopPerioidicEventsMessage);
358         if (!response.isSuccessful()) {
359             final String message =
360                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
361             LOGGER.warn(message);
362             throw new ApexDeploymentException(message);
363         }
364     }
365
366     /**
367      * Get the status of an Apex engine.
368      *
369      * @param engineKey the key of the engine to get the status of
370      * @return an engine model containing the status of the engine for the given key
371      * @throws ApexException the apex exception
372      */
373     public AxEngineModel getEngineStatus(final AxArtifactKey engineKey) throws ApexException {
374         final GetEngineStatus engineStatusMessage = new GetEngineStatus(engineKey);
375         LOGGER.debug("sending get engine status message {} to server {}:{} . . .", engineStatusMessage, hostName, port);
376         client.sendMessage(engineStatusMessage);
377         LOGGER.debug("sent get engine status message to server {}:{} . . .", hostName, port);
378
379         // Check if we got a response
380         final Response response = getResponse(engineStatusMessage);
381         if (!response.isSuccessful()) {
382             final String message =
383                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
384             LOGGER.warn(message);
385             throw new ApexException(message);
386         }
387
388         final ByteArrayInputStream baInputStream = new ByteArrayInputStream(response.getMessageData().getBytes());
389         final ApexModelReader<AxEngineModel> modelReader = new ApexModelReader<>(AxEngineModel.class);
390         modelReader.setValidateFlag(false);
391         return modelReader.read(baInputStream);
392     }
393
394     /**
395      * Get the runtime information of an Apex engine.
396      *
397      * @param engineKey the key of the engine to get information for
398      * @return an engine model containing information on the engine for the given key
399      * @throws ApexException the apex exception
400      */
401     public String getEngineInfo(final AxArtifactKey engineKey) throws ApexException {
402         final GetEngineInfo engineInfoMessage = new GetEngineInfo(engineKey);
403         LOGGER.debug("sending get engine information message {} to server {}:{} . . .", engineInfoMessage, hostName,
404                 port);
405         client.sendMessage(engineInfoMessage);
406         LOGGER.debug("sent get engine information message to server {}:{} . . .", hostName, port);
407
408         // Check if we got a response
409         final Response response = getResponse(engineInfoMessage);
410         if (!response.isSuccessful()) {
411             final String message =
412                     FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
413             LOGGER.warn(message);
414             throw new ApexException(message);
415         }
416
417         return response.getMessageData();
418     }
419
420     /**
421      * Check the response to a model deployment message from the Apex server.
422      *
423      * @param sentMessage the sent message
424      * @return the response message
425      * @throws ApexDeploymentException the apex deployment exception
426      */
427     private Response getResponse(final Message sentMessage) throws ApexDeploymentException {
428         // Get the amount of milliseconds we should wait for a timeout
429         int timeoutTime = sentMessage.getReplyTimeout();
430         if (timeoutTime <= 0) {
431             timeoutTime = REPLY_MESSAGE_TIMEOUT_DEFAULT;
432         }
433
434         // Wait for the required amount of milliseconds for the response from the Apex server
435         Message receivedMessage = null;
436         for (int timeWaitedSoFar = 0; receivedMessage == null && timeWaitedSoFar < timeoutTime; timeWaitedSoFar +=
437                 REPLY_MESSAGE_TIMEOUT_INCREMENT) {
438             try {
439                 receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS);
440             } catch (final InterruptedException e) {
441                 // restore the interrupt status
442                 Thread.currentThread().interrupt();
443                 LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e);
444                 throw new ApexDeploymentException(
445                         "reception of response from server interrupted " + hostName + ':' + port, e);
446             }
447         }
448
449         // Check if response to sent message
450         if (receivedMessage == null) {
451             LOGGER.warn("no response received to sent message " + sentMessage.getAction());
452             throw new ApexDeploymentException("no response received to sent message " + sentMessage.getAction());
453         }
454
455         // Check instance is a response message
456         if (!(receivedMessage instanceof Response)) {
457             LOGGER.warn("response received from server is of incorrect type {}, should be of type {}",
458                     receivedMessage.getClass().getName(), Response.class.getName());
459             throw new ApexDeploymentException("response received from server is of incorrect type "
460                     + receivedMessage.getClass().getName() + ", should be of type " + Response.class.getName());
461         }
462
463         // Cast the response message
464         final Response responseMessage = (Response) receivedMessage;
465
466         // Check if response to sent message
467         if (!responseMessage.getResponseTo().equals(sentMessage)) {
468             LOGGER.warn("response received is not response to sent message " + sentMessage.getAction());
469             throw new ApexDeploymentException(
470                     "response received is not correct response to sent message " + sentMessage.getAction());
471         }
472
473         // Check if successful
474         if (responseMessage.isSuccessful()) {
475             LOGGER.debug("response received: {} message was succssful: {}", sentMessage.getAction(),
476                     responseMessage.getMessageData());
477         } else {
478             LOGGER.debug("response received: {} message failed: {}", sentMessage.getAction(),
479                     responseMessage.getMessageData());
480         }
481
482         return responseMessage;
483     }
484
485     /**
486      * Set a deployment client for this facade. This method is for testing.
487      * 
488      * @param deploymentClient the deployment client to set
489      */
490     protected void setDeploymentClient(final DeploymentClient deploymentClient) {
491         this.client = deploymentClient;
492     }
493 }