2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.core.deployment;
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.InputStream;
27 import java.util.concurrent.TimeUnit;
29 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
30 import org.onap.policy.apex.core.protocols.Message;
31 import org.onap.policy.apex.core.protocols.engdep.messages.EngineServiceInfoResponse;
32 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineInfo;
33 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineServiceInfo;
34 import org.onap.policy.apex.core.protocols.engdep.messages.GetEngineStatus;
35 import org.onap.policy.apex.core.protocols.engdep.messages.Response;
36 import org.onap.policy.apex.core.protocols.engdep.messages.StartEngine;
37 import org.onap.policy.apex.core.protocols.engdep.messages.StartPeriodicEvents;
38 import org.onap.policy.apex.core.protocols.engdep.messages.StopEngine;
39 import org.onap.policy.apex.core.protocols.engdep.messages.StopPeriodicEvents;
40 import org.onap.policy.apex.core.protocols.engdep.messages.UpdateModel;
41 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
42 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
43 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
44 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
45 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
46 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
47 import org.onap.policy.common.utils.resources.ResourceUtils;
48 import org.slf4j.ext.XLogger;
49 import org.slf4j.ext.XLoggerFactory;
52 * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to
53 * communicate with the engine, with the EngDep protocol being carried on Java web sockets.
55 * <p>This deployer is a simple command line deployer that reads the communication parameters and the location of
56 * the XML model file as arguments.
58 * @author Liam Fallon (liam.fallon@ericsson.com)
60 public class EngineServiceFacade {
61 // Get a reference to the logger
62 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class);
64 // Repeated string constants
65 private static final String RECEIVED_FROM_SERVER = " received from server";
66 private static final String FAILED_RESPONSE = "failed response ";
68 // The default message timeout and timeout increment (the amount of time between polls) in
70 private static final int CLIENT_START_WAIT_INTERVAL = 100;
71 private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000;
72 private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100;
74 // The Apex engine host and EngDep port
75 private final String hostName;
76 private final int port;
78 // The deployment client handles the EngDep communication session towards the Apex server
79 private DeploymentClient client = null;
80 private Thread clientThread = null;
82 // Information about the Engine service we are connected to
83 private AxArtifactKey engineServiceKey = null;
84 private AxArtifactKey[] engineKeyArray = null;
85 private AxArtifactKey apexModelKey = null;
88 * Instantiates a new deployer.
90 * @param hostName the host name of the host running the Apex Engine
91 * @param port the port to use for EngDep communication with the Apex engine
93 public EngineServiceFacade(final String hostName, final int port) {
94 this.hostName = hostName;
97 // Use the deployment client to handle the EngDep communication towards the Apex server.
98 client = new DeploymentClient(hostName, port);
102 * Initializes the facade, opens an EngDep communication session with the Apex engine.
104 * @throws ApexDeploymentException thrown on deployment and communication errors
106 public void init() throws ApexDeploymentException {
108 LOGGER.debug("handshaking with server {}:{} . . .", hostName, port);
110 // Use the deployment client to handle the EngDep communication towards the Apex server.
111 // The deployment client runs a thread to monitor the session and to send messages
112 clientThread = new Thread(client);
113 clientThread.start();
115 // Wait for the connection to come up
116 while (!client.isStarted()) {
117 if (clientThread.isAlive()) {
118 ThreadUtilities.sleep(CLIENT_START_WAIT_INTERVAL);
120 LOGGER.error("cound not handshake with server {}:{}", hostName, port);
121 throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port);
125 LOGGER.debug("opened connection to server {}:{} . . .", hostName, port);
127 // Get engine service information to see what engines we're dealing with
128 final GetEngineServiceInfo engineServiceInfo = new GetEngineServiceInfo(null);
129 LOGGER.debug("sending get engine service info message {} to server {}:{} . . .", engineServiceInfo,
131 client.sendMessage(engineServiceInfo);
132 LOGGER.debug("sent get engine service info message to server {}:{} . . .", hostName, port);
134 final EngineServiceInfoResponse engineServiceInfoResponse =
135 (EngineServiceInfoResponse) getResponse(engineServiceInfo);
136 if (engineServiceInfoResponse.isSuccessful()) {
137 engineServiceKey = engineServiceInfoResponse.getEngineServiceKey();
138 engineKeyArray = engineServiceInfoResponse.getEngineKeyArray();
139 apexModelKey = engineServiceInfoResponse.getApexModelKey();
141 } catch (final Exception e) {
142 LOGGER.error("cound not handshake with server {}:{}", hostName, port, e);
144 throw new ApexDeploymentException("cound not handshake with server " + hostName + ":" + port, e);
150 * Get the engine service key.
152 * @return the engine service key
154 public AxArtifactKey getApexModelKey() {
159 * Get the keys of the engines on this engine service.
161 * @return the engine key array
163 public AxArtifactKey[] getEngineKeyArray() {
164 return engineKeyArray;
168 * Get the engine service key.
170 * @return the engine service key
172 public AxArtifactKey getKey() {
173 return engineServiceKey;
177 * Close the EngDep connection to the Apex server.
179 public void close() {
180 LOGGER.debug("closing connection to server {}:{} . . .", hostName, port);
184 LOGGER.debug("closed connection to server {}:{} . . .", hostName, port);
188 * Deploy an Apex model on the Apex engine service.
190 * @param modelFileName the name of the model file containing the model to deploy
191 * @param ignoreConflicts true if conflicts between context in polices is to be ignored
192 * @param force true if the model is to be applied even if it is incompatible with the existing model
193 * @throws ApexException on Apex errors
195 public void deployModel(final String modelFileName, final boolean ignoreConflicts, final boolean force)
196 throws ApexException {
197 if (engineServiceKey == null || engineKeyArray == null || engineKeyArray.length == 0) {
198 LOGGER.error("cound not deploy apex model, deployer is not initialized");
199 throw new ApexDeploymentException("cound not deploy apex model, deployer is not initialized");
202 // Get the model file as a string
203 URL apexModelUrl = ResourceUtils.getLocalFile(modelFileName);
204 if (apexModelUrl == null) {
205 apexModelUrl = ResourceUtils.getUrlResource(modelFileName);
206 if (apexModelUrl == null) {
207 LOGGER.error("cound not create apex model, could not read from file {}", modelFileName);
208 throw new ApexDeploymentException(
209 "cound not create apex model, could not read from file " + modelFileName);
214 deployModel(modelFileName, apexModelUrl.openStream(), ignoreConflicts, force);
215 } catch (final Exception deployException) {
216 final String errorMessage = "could not deploy apex model from " + modelFileName;
217 LOGGER.error(errorMessage, deployException);
218 throw new ApexDeploymentException(errorMessage, deployException);
223 * Deploy an Apex model on the Apex engine service.
225 * @param modelFileName the name of the model file containing the model to deploy
226 * @param modelInputStream the stream that holds the Apex model
227 * @param ignoreConflicts true if conflicts between context in polices is to be ignored
228 * @param force true if the model is to be applied even if it is incompatible with the existing model
229 * @throws ApexException on model deployment errors
231 public void deployModel(final String modelFileName, final InputStream modelInputStream,
232 final boolean ignoreConflicts, final boolean force) throws ApexException {
233 // Read the policy model from the stream
234 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
235 modelReader.setValidateFlag(!ignoreConflicts);
236 final AxPolicyModel apexPolicyModel = modelReader.read(modelInputStream);
239 deployModel(apexPolicyModel, ignoreConflicts, force);
243 * Deploy an Apex model on the Apex engine service.
245 * @param apexPolicyModel the name of the model to deploy
246 * @param ignoreConflicts true if conflicts between context in polices is to be ignored
247 * @param force true if the model is to be applied even if it is incompatible with the existing model
248 * @throws ApexException on model deployment errors
250 public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force)
251 throws ApexException {
252 // Write the model into a byte array
253 final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
254 final ApexModelWriter<AxPolicyModel> modelWriter = new ApexModelWriter<>(AxPolicyModel.class);
255 modelWriter.write(apexPolicyModel, baOutputStream);
257 // Create and send Update message
258 final UpdateModel umMessage =
259 new UpdateModel(engineServiceKey, baOutputStream.toString(), ignoreConflicts, force);
261 LOGGER.debug("sending update message {} to server {}:{} . . .", umMessage, hostName, port);
262 client.sendMessage(umMessage);
263 LOGGER.debug("sent update message to server {}:{} . . .", hostName, port);
265 // Check if we got a response
266 final Response response = getResponse(umMessage);
267 if (!response.isSuccessful()) {
268 LOGGER.warn(FAILED_RESPONSE + "{} received from server {}:{}", response.getMessageData(), hostName, port);
269 throw new ApexException(
270 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port);
275 * Start an Apex engine on the engine service.
277 * @param engineKey the key of the engine to start
278 * @throws ApexDeploymentException on messaging errors
280 public void startEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
281 final StartEngine startEngineMessage = new StartEngine(engineKey);
282 LOGGER.debug("sending start engine {} to server {}:{} . . .", startEngineMessage, hostName, port);
283 client.sendMessage(startEngineMessage);
284 LOGGER.debug("sent start engine message to server {}:{} . . .", hostName, port);
286 // Check if we got a response
287 final Response response = getResponse(startEngineMessage);
288 if (!response.isSuccessful()) {
289 final String message =
290 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
291 LOGGER.warn(message);
292 throw new ApexDeploymentException(message);
297 * Stop an Apex engine on the engine service.
299 * @param engineKey the key of the engine to stop
300 * @throws ApexDeploymentException on messaging errors
302 public void stopEngine(final AxArtifactKey engineKey) throws ApexDeploymentException {
303 final StopEngine stopEngineMessage = new StopEngine(engineKey);
304 LOGGER.debug("sending stop engine {} to server {}:{} . . .", stopEngineMessage, hostName, port);
305 client.sendMessage(stopEngineMessage);
306 LOGGER.debug("sent stop engine message to server {}:{} . . .", hostName, port);
308 // Check if we got a response
309 final Response response = getResponse(stopEngineMessage);
310 if (!response.isSuccessful()) {
311 final String message =
312 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
313 LOGGER.warn(message);
314 throw new ApexDeploymentException(message);
319 * Start periodic events on an Apex engine on the engine service.
321 * @param engineKey the key of the engine to start periodic events on
322 * @param period the period in milliseconds between periodic events
323 * @throws ApexDeploymentException on messaging errors
325 public void startPerioidicEvents(final AxArtifactKey engineKey, final long period) throws ApexDeploymentException {
326 final StartPeriodicEvents startPerioidicEventsMessage = new StartPeriodicEvents(engineKey);
327 startPerioidicEventsMessage.setMessageData(Long.toString(period));
328 LOGGER.debug("sending start perioidic events {} to server {}:{} . . .", startPerioidicEventsMessage, hostName,
330 client.sendMessage(startPerioidicEventsMessage);
331 LOGGER.debug("sent start perioidic events message to server {}:{} . . .", hostName, port);
333 // Check if we got a response
334 final Response response = getResponse(startPerioidicEventsMessage);
335 if (!response.isSuccessful()) {
336 final String message =
337 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
338 LOGGER.warn(message);
339 throw new ApexDeploymentException(message);
344 * Stop periodic events on an Apex engine on the engine service.
346 * @param engineKey the key of the engine to stop periodic events on
347 * @throws ApexDeploymentException on messaging errors
349 public void stopPerioidicEvents(final AxArtifactKey engineKey) throws ApexDeploymentException {
350 final StopPeriodicEvents stopPerioidicEventsMessage = new StopPeriodicEvents(engineKey);
351 LOGGER.debug("sending stop perioidic events {} to server {}:{} . . .", stopPerioidicEventsMessage, hostName,
353 client.sendMessage(stopPerioidicEventsMessage);
354 LOGGER.debug("sent stop perioidic events message to server {}:{} . . .", hostName, port);
356 // Check if we got a response
357 final Response response = getResponse(stopPerioidicEventsMessage);
358 if (!response.isSuccessful()) {
359 final String message =
360 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
361 LOGGER.warn(message);
362 throw new ApexDeploymentException(message);
367 * Get the status of an Apex engine.
369 * @param engineKey the key of the engine to get the status of
370 * @return an engine model containing the status of the engine for the given key
371 * @throws ApexException the apex exception
373 public AxEngineModel getEngineStatus(final AxArtifactKey engineKey) throws ApexException {
374 final GetEngineStatus engineStatusMessage = new GetEngineStatus(engineKey);
375 LOGGER.debug("sending get engine status message {} to server {}:{} . . .", engineStatusMessage, hostName, port);
376 client.sendMessage(engineStatusMessage);
377 LOGGER.debug("sent get engine status message to server {}:{} . . .", hostName, port);
379 // Check if we got a response
380 final Response response = getResponse(engineStatusMessage);
381 if (!response.isSuccessful()) {
382 final String message =
383 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
384 LOGGER.warn(message);
385 throw new ApexException(message);
388 final ByteArrayInputStream baInputStream = new ByteArrayInputStream(response.getMessageData().getBytes());
389 final ApexModelReader<AxEngineModel> modelReader = new ApexModelReader<>(AxEngineModel.class);
390 modelReader.setValidateFlag(false);
391 return modelReader.read(baInputStream);
395 * Get the runtime information of an Apex engine.
397 * @param engineKey the key of the engine to get information for
398 * @return an engine model containing information on the engine for the given key
399 * @throws ApexException the apex exception
401 public String getEngineInfo(final AxArtifactKey engineKey) throws ApexException {
402 final GetEngineInfo engineInfoMessage = new GetEngineInfo(engineKey);
403 LOGGER.debug("sending get engine information message {} to server {}:{} . . .", engineInfoMessage, hostName,
405 client.sendMessage(engineInfoMessage);
406 LOGGER.debug("sent get engine information message to server {}:{} . . .", hostName, port);
408 // Check if we got a response
409 final Response response = getResponse(engineInfoMessage);
410 if (!response.isSuccessful()) {
411 final String message =
412 FAILED_RESPONSE + response.getMessageData() + RECEIVED_FROM_SERVER + hostName + ':' + port;
413 LOGGER.warn(message);
414 throw new ApexException(message);
417 return response.getMessageData();
421 * Check the response to a model deployment message from the Apex server.
423 * @param sentMessage the sent message
424 * @return the response message
425 * @throws ApexDeploymentException the apex deployment exception
427 private Response getResponse(final Message sentMessage) throws ApexDeploymentException {
428 // Get the amount of milliseconds we should wait for a timeout
429 int timeoutTime = sentMessage.getReplyTimeout();
430 if (timeoutTime <= 0) {
431 timeoutTime = REPLY_MESSAGE_TIMEOUT_DEFAULT;
434 // Wait for the required amount of milliseconds for the response from the Apex server
435 Message receivedMessage = null;
436 for (int timeWaitedSoFar = 0; receivedMessage == null && timeWaitedSoFar < timeoutTime; timeWaitedSoFar +=
437 REPLY_MESSAGE_TIMEOUT_INCREMENT) {
439 receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS);
440 } catch (final InterruptedException e) {
441 // restore the interrupt status
442 Thread.currentThread().interrupt();
443 LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e);
444 throw new ApexDeploymentException(
445 "reception of response from server interrupted " + hostName + ':' + port, e);
449 // Check if response to sent message
450 if (receivedMessage == null) {
451 LOGGER.warn("no response received to sent message " + sentMessage.getAction());
452 throw new ApexDeploymentException("no response received to sent message " + sentMessage.getAction());
455 // Check instance is a response message
456 if (!(receivedMessage instanceof Response)) {
457 LOGGER.warn("response received from server is of incorrect type {}, should be of type {}",
458 receivedMessage.getClass().getName(), Response.class.getName());
459 throw new ApexDeploymentException("response received from server is of incorrect type "
460 + receivedMessage.getClass().getName() + ", should be of type " + Response.class.getName());
463 // Cast the response message
464 final Response responseMessage = (Response) receivedMessage;
466 // Check if response to sent message
467 if (!responseMessage.getResponseTo().equals(sentMessage)) {
468 LOGGER.warn("response received is not response to sent message " + sentMessage.getAction());
469 throw new ApexDeploymentException(
470 "response received is not correct response to sent message " + sentMessage.getAction());
473 // Check if successful
474 if (responseMessage.isSuccessful()) {
475 LOGGER.debug("response received: {} message was succssful: {}", sentMessage.getAction(),
476 responseMessage.getMessageData());
478 LOGGER.debug("response received: {} message failed: {}", sentMessage.getAction(),
479 responseMessage.getMessageData());
482 return responseMessage;
486 * Set a deployment client for this facade. This method is for testing.
488 * @param deploymentClient the deployment client to set
490 protected void setDeploymentClient(final DeploymentClient deploymentClient) {
491 this.client = deploymentClient;