2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.core.deployment;
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.InputStream;
27 import java.util.concurrent.TimeUnit;
28 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
29 import org.onap.policy.apex.core.protocols.Message;
30 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
31 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
32 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
33 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
34 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
35 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
36 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
37 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
38 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
39 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
40 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
41 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
42 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
43 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
44 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
45 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
46 import org.onap.policy.common.utils.resources.ResourceUtils;
47 import org.slf4j.ext.XLogger;
48 import org.slf4j.ext.XLoggerFactory;
51 * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to
52 * communicate with the engine, with the EngDep protocol being carried on Java web sockets.
54 * <p>This deployer is a simple command line deployer that reads the communication parameters and the location of
55 * the XML model file as arguments.
57 * @author Liam Fallon (liam.fallon@ericsson.com)
59 public class EngineServiceFacade {
60 // Get a reference to the logger
61 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class);
63 // Repeated string constants
64 private static final String RECEIVED_FROM_SERVER = " received from server";
65 private static final String FAILED_RESPONSE = "failed response ";
67 // The default message timeout and timeout increment (the amount of time between polls) in
69 private static final int CLIENT_START_WAIT_INTERVAL = 100;
70 private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000;
71 private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100;
73 // The Apex engine host and EngDep port
74 private final String hostName;
75 private final int port;
77 // The deployment client handles the EngDep communication session towards the Apex server
78 private DeploymentClient client = null;
79 private Thread clientThread = null;
81 // Information about the Engine service we are connected to
82 private AxArtifactKey engineServiceKey = null;
83 private AxArtifactKey[] engineKeyArray = null;
84 private AxArtifactKey apexModelKey = null;
87 * Instantiates a new deployer.
89 * @param hostName the host name of the host running the Apex Engine
90 * @param port the port to use for EngDep communication with the Apex engine
92 public EngineServiceFacade(final String hostName, final int port) {
93 this.hostName = hostName;
96 // Use the deployment client to handle the EngDep communication towards the Apex server.
97 client = new DeploymentClient(hostName, port);
101 * Initializes the facade, opens an EngDep communication session with the Apex engine.
103 * @throws ApexDeploymentException thrown on deployment and communication errors
105 public void init() throws ApexDeploymentException {
107 LOGGER.debug("handshaking with server {}:{} . . .", hostName, port);
109 // Use the deployment client to handle the EngDep communication towards the Apex server.
110 // The deployment client runs a thread to monitor the session and to send messages
111 clientThread = new Thread(client);
112 clientThread.start();
114 // Wait for the connection to come up
115 while (!client.isStarted()) {
116 if (clientThread.isAlive()) {
117 ThreadUtilities.sleep(CLIENT_START_WAIT_INTERVAL);
119 LOGGER.error("cound not handshake with server {}:{}", hostName, port);
120 throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port);
124 LOGGER.debug("opened connection to server {}:{} . . .", hostName, port);
126 // Get engine service information to see what engines we're dealing with
127 final GetEngineServiceInfo engineServiceInfo = new GetEngineServiceInfo(null);
128 LOGGER.debug("sending get engine service info message {} to server {}:{} . . .", engineServiceInfo,
130 client.sendMessage(engineServiceInfo);
131 LOGGER.debug("sent get engine service info message to server {}:{} . . .", hostName, port);
133 final EngineServiceInfoResponse engineServiceInfoResponse =
134 (EngineServiceInfoResponse) getResponse(engineServiceInfo);
135 if (engineServiceInfoResponse.isSuccessful()) {
136 engineServiceKey = engineServiceInfoResponse.getEngineServiceKey();
137 engineKeyArray = engineServiceInfoResponse.getEngineKeyArray();
138 apexModelKey = engineServiceInfoResponse.getApexModelKey();
140 } catch (final Exception e) {
141 LOGGER.error("cound not handshake with server {}:{}", hostName, port, e);
143 throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port, e);
149 * Get the engine service key.
151 * @return the engine service key
153 public AxArtifactKey getApexModelKey() {
158 * Get the keys of the engines on this engine service.
160 * @return the engine key array
162 public AxArtifactKey[] getEngineKeyArray() {
163 return engineKeyArray;
167 * Get the engine service key.
169 * @return the engine service key
171 public AxArtifactKey getKey() {
172 return engineServiceKey;
176 * Close the EngDep connection to the Apex server.
178 public void close() {
179 LOGGER.debug("closing connection to server {}:{} . . .", hostName, port);
183 LOGGER.debug("closed connection to server {}:{} . . .", hostName, port);
187 * Deploy an Apex model on the Apex engine service.
189 * @param modelFileName the name of the model file containing the model to deploy
190 * @param ignoreConflicts true if conflicts between context in polices is to be ignored
191 * @param force true if the model is to be applied even if it is incompatible with the existing model
192 * @throws ApexException on Apex errors
194 public void deployModel(final String modelFileName, final boolean ignoreConflicts, final boolean force)
195 throws ApexException {
196 if (engineServiceKey == null || engineKeyArray == null || engineKeyArray.length == 0) {
197 LOGGER.error("cound not deploy apex model, deployer is not initialized");
198 throw new ApexDeploymentException("cound not deploy apex model, deployer is not initialized");
201 // Get the model file as a string
202 URL apexModelUrl = ResourceUtils.getLocalFile(modelFileName);
203 if (apexModelUrl == null) {
204 apexModelUrl = ResourceUtils.getUrlResource(modelFileName);
205 if (apexModelUrl == null) {
206 LOGGER.error("cound not create apex model, could not read from file {}", modelFileName);
207 throw new ApexDeploymentException(
208 "cound not create apex model, could not read from file " + modelFileName);
213 deployModel(modelFileName, apexModelUrl.openStream(), ignoreConflicts, force);
214 } catch (final Exception deployException) {
215 final String errorMessage = "could not deploy apex model from " + modelFileName;
216 LOGGER.error(errorMessage, deployException);
217 throw new ApexDeploymentException(errorMessage, deployException);
222 * Deploy an Apex model on the Apex engine service.
224 * @param modelFileName the name of the model file containing the model to deploy
225 * @param modelInputStream the stream that holds the Apex model
226 * @param ignoreConflicts true if conflicts between context in polices is to be ignored
227 * @param force true if the model is to be applied even if it is incompatible with the existing model
228 * @throws ApexException on model deployment errors
230 public void deployModel(final String modelFileName, final InputStream modelInputStream,
231 final boolean ignoreConflicts, final boolean force) throws ApexException {
232 // Read the policy model from the stream
233 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
234 modelReader.setValidateFlag(!ignoreConflicts);
235 final AxPolicyModel apexPolicyModel = modelReader.read(modelInputStream);
238 deployModel(apexPolicyModel, ignoreConflicts, force);
242 * Deploy an Apex model on the Apex engine service.
244 * @param apexPolicyModel the name of the model to deploy
245 * @param ignoreConflicts true if conflicts between context in polices is to be ignored
246 * @param force true if the model is to be applied even if it is incompatible with the existing model
247 * @throws ApexException on model deployment errors
249 public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force)
250 throws ApexException {
251 // Write the model into a byte array
252 final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
253 final ApexModelWriter<AxPolicyModel> modelWriter = new ApexModelWriter<>(AxPolicyModel.class);
254 modelWriter.write(apexPolicyModel, baOutputStream);
256 // Create and send Update message
257 final UpdateModel umMessage =
258 new UpdateModel(engineServiceKey, baOutputStream.toString(), ignoreConflicts, force);
260 LOGGER.debug("sending update message {} to server {}:{} . . .", umMessage, hostName, port);
261 client.sendMessage(umMessage);
262 LOGGER.debug("sent update message to server {}:{} . . .", hostName, port);
264 // Check if we got a response
265 final Response response = getResponse(umMessage);
266 if (!response.isSuccessful()) {
267 LOGGER.warn(FAILED_RESPONSE + "{} received from server {}:{}", response.getMessageData(), hostName, port);
268 throw new ApexException(
269 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port);
274 * Start an Apex engine on the engine service.
276 * @param engineKey the key of the engine to start
277 * @throws ApexDeploymentException on messaging errors
279 public void startEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
280 final StartEngine startEngineMessage = new StartEngine(engineKey);
281 LOGGER.debug("sending start engine {} to server {}:{} . . .", startEngineMessage, hostName, port);
282 client.sendMessage(startEngineMessage);
283 LOGGER.debug("sent start engine message to server {}:{} . . .", hostName, port);
285 // Check if we got a response
286 final Response response = getResponse(startEngineMessage);
287 if (!response.isSuccessful()) {
288 final String message =
289 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
290 LOGGER.warn(message);
291 throw new ApexDeploymentException(message);
296 * Stop an Apex engine on the engine service.
298 * @param engineKey the key of the engine to stop
299 * @throws ApexDeploymentException on messaging errors
301 public void stopEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
302 final StopEngine stopEngineMessage = new StopEngine(engineKey);
303 LOGGER.debug("sending stop engine {} to server {}:{} . . .", stopEngineMessage, hostName, port);
304 client.sendMessage(stopEngineMessage);
305 LOGGER.debug("sent stop engine message to server {}:{} . . .", hostName, port);
307 // Check if we got a response
308 final Response response = getResponse(stopEngineMessage);
309 if (!response.isSuccessful()) {
310 final String message =
311 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
312 LOGGER.warn(message);
313 throw new ApexDeploymentException(message);
318 * Start periodic events on an Apex engine on the engine service.
320 * @param engineKey the key of the engine to start periodic events on
321 * @param period the period in milliseconds between periodic events
322 * @throws ApexDeploymentException on messaging errors
324 public void startPerioidicEvents(final AxArtifactKey engineKey, final long period) throws ApexDeploymentException {
325 final StartPeriodicEvents startPerioidicEventsMessage = new StartPeriodicEvents(engineKey);
326 startPerioidicEventsMessage.setMessageData(Long.toString(period));
327 LOGGER.debug("sending start perioidic events {} to server {}:{} . . .", startPerioidicEventsMessage, hostName,
329 client.sendMessage(startPerioidicEventsMessage);
330 LOGGER.debug("sent start perioidic events message to server {}:{} . . .", hostName, port);
332 // Check if we got a response
333 final Response response = getResponse(startPerioidicEventsMessage);
334 if (!response.isSuccessful()) {
335 final String message =
336 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
337 LOGGER.warn(message);
338 throw new ApexDeploymentException(message);
343 * Stop periodic events on an Apex engine on the engine service.
345 * @param engineKey the key of the engine to stop periodic events on
346 * @throws ApexDeploymentException on messaging errors
348 public void stopPerioidicEvents(final AxArtifactKey engineKey) throws ApexDeploymentException {
349 final StopPeriodicEvents stopPerioidicEventsMessage = new StopPeriodicEvents(engineKey);
350 LOGGER.debug("sending stop perioidic events {} to server {}:{} . . .", stopPerioidicEventsMessage, hostName,
352 client.sendMessage(stopPerioidicEventsMessage);
353 LOGGER.debug("sent stop perioidic events message to server {}:{} . . .", hostName, port);
355 // Check if we got a response
356 final Response response = getResponse(stopPerioidicEventsMessage);
357 if (!response.isSuccessful()) {
358 final String message =
359 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
360 LOGGER.warn(message);
361 throw new ApexDeploymentException(message);
366 * Get the status of an Apex engine.
368 * @param engineKey the key of the engine to get the status of
369 * @return an engine model containing the status of the engine for the given key
370 * @throws ApexException the apex exception
372 public AxEngineModel getEngineStatus(final AxArtifactKey engineKey) throws ApexException {
373 final GetEngineStatus engineStatusMessage = new GetEngineStatus(engineKey);
374 LOGGER.debug("sending get engine status message {} to server {}:{} . . .", engineStatusMessage, hostName, port);
375 client.sendMessage(engineStatusMessage);
376 LOGGER.debug("sent get engine status message to server {}:{} . . .", hostName, port);
378 // Check if we got a response
379 final Response response = getResponse(engineStatusMessage);
380 if (!response.isSuccessful()) {
381 final String message =
382 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
383 LOGGER.warn(message);
384 throw new ApexException(message);
387 final ByteArrayInputStream baInputStream = new ByteArrayInputStream(response.getMessageData().getBytes());
388 final ApexModelReader<AxEngineModel> modelReader = new ApexModelReader<>(AxEngineModel.class);
389 modelReader.setValidateFlag(false);
390 return modelReader.read(baInputStream);
394 * Get the runtime information of an Apex engine.
396 * @param engineKey the key of the engine to get information for
397 * @return an engine model containing information on the engine for the given key
398 * @throws ApexException the apex exception
400 public String getEngineInfo(final AxArtifactKey engineKey) throws ApexException {
401 final GetEngineInfo engineInfoMessage = new GetEngineInfo(engineKey);
402 LOGGER.debug("sending get engine information message {} to server {}:{} . . .", engineInfoMessage, hostName,
404 client.sendMessage(engineInfoMessage);
405 LOGGER.debug("sent get engine information message to server {}:{} . . .", hostName, port);
407 // Check if we got a response
408 final Response response = getResponse(engineInfoMessage);
409 if (!response.isSuccessful()) {
410 final String message =
411 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
412 LOGGER.warn(message);
413 throw new ApexException(message);
416 return response.getMessageData();
420 * Check the response to a model deployment message from the Apex server.
422 * @param sentMessage the sent message
423 * @return the response message
424 * @throws ApexDeploymentException the apex deployment exception
426 private Response getResponse(final Message sentMessage) throws ApexDeploymentException {
427 // Get the amount of milliseconds we should wait for a timeout
428 int timeoutTime = sentMessage.getReplyTimeout();
429 if (timeoutTime <= 0) {
430 timeoutTime = REPLY_MESSAGE_TIMEOUT_DEFAULT;
433 // Wait for the required amount of milliseconds for the response from the Apex server
434 Message receivedMessage = null;
435 for (int timeWaitedSoFar = 0; receivedMessage == null && timeWaitedSoFar < timeoutTime; timeWaitedSoFar +=
436 REPLY_MESSAGE_TIMEOUT_INCREMENT) {
438 receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS);
439 } catch (final InterruptedException e) {
440 // restore the interrupt status
441 Thread.currentThread().interrupt();
442 LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e);
443 throw new ApexDeploymentException(
444 "reception of response from server interrupted " + hostName + ':' + port, e);
448 // Check if response to sent message
449 if (receivedMessage == null) {
450 LOGGER.warn("no response received to sent message " + sentMessage.getAction());
451 throw new ApexDeploymentException("no response received to sent message " + sentMessage.getAction());
454 // Check instance is a response message
455 if (!(receivedMessage instanceof Response)) {
456 LOGGER.warn("response received from server is of incorrect type {}, should be of type {}",
457 receivedMessage.getClass().getName(), Response.class.getName());
458 throw new ApexDeploymentException("response received from server is of incorrect type "
459 + receivedMessage.getClass().getName() + ", should be of type " + Response.class.getName());
462 // Cast the response message
463 final Response responseMessage = (Response) receivedMessage;
465 // Check if response to sent message
466 if (!responseMessage.getResponseTo().equals(sentMessage)) {
467 LOGGER.warn("response received is not response to sent message " + sentMessage.getAction());
468 throw new ApexDeploymentException(
469 "response received is not correct response to sent message " + sentMessage.getAction());
472 // Check if successful
473 if (responseMessage.isSuccessful()) {
474 LOGGER.debug("response received: {} message was succssful: {}", sentMessage.getAction(),
475 responseMessage.getMessageData());
477 LOGGER.debug("response received: {} message failed: {}", sentMessage.getAction(),
478 responseMessage.getMessageData());
481 return responseMessage;
485 * Set a deployment client for this facade. This method is for testing.
487 * @param deploymentClient the deployment client to set
489 protected void setDeploymentClient(final DeploymentClient deploymentClient) {
490 this.client = deploymentClient;