2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.core.deployment;
24 import com.google.common.eventbus.Subscribe;
25 import java.net.InetAddress;
27 import java.net.UnknownHostException;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
32 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
33 import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
34 import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
35 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
36 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.core.protocols.Message;
39 import org.slf4j.ext.XLogger;
40 import org.slf4j.ext.XLoggerFactory;
43 * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a
44 * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the
45 * client thread and a receiving queue to queue messages received from the Apex engine.
47 * @author Liam Fallon (liam.fallon@ericsson.com)
49 public class DeploymentClient implements Runnable {
50 private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class);
52 private static final int CLIENT_STOP_WAIT_INTERVAL = 100;
53 private static final int CLIENT_SEND_QUEUE_TIMEOUT = 50;
55 // Host and port to use for EngDep messaging
56 private String host = null;
59 // Messaging service is used to transmit and receive messages over the web socket
60 private static MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>();
61 private MessagingService<Message> service = null;
63 // Send and receive queues for message buffering
64 private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<>();
65 private final BlockingQueue<Message> receiveQueue = new LinkedBlockingQueue<>();
67 // Thread management fields
68 private boolean started = false;
69 private Thread thisThread = null;
71 // Number of messages processed
72 private long messagesSent = 0;
73 private long messagesReceived = 0;
76 * Instantiates a new deployment client.
78 * @param host the host name that the EngDep server is running on
79 * @param port the port the port the EngDep server is using
81 public DeploymentClient(final String host, final int port) {
91 LOGGER.debug("engine<-->deployment to \"ws://{}:{}\" thread starting . . .", host, port);
93 // Set up the thread name
94 thisThread = Thread.currentThread();
95 thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
98 // Establish a connection to the Apex server for EngDep message communication over Web
100 service = factory.createClient(new URI("ws://" + host + ":" + port));
101 service.addMessageListener(new DeploymentClientListener());
103 service.startConnection();
105 LOGGER.debug("engine<-->deployment client thread started");
106 } catch (final Exception e) {
107 LOGGER.error("engine<-->deployment client thread exception", e);
110 // Loop forever, sending messages as they appear on the queue
111 while (started && !thisThread.isInterrupted()) {
112 started = sendMessages();
115 // Thread has been interrupted
117 LOGGER.debug("engine<-->deployment client thread finished");
121 * Send messages off the queue.
123 private boolean sendMessages() {
125 final Message messageForSending = sendQueue.poll(CLIENT_SEND_QUEUE_TIMEOUT, TimeUnit.MILLISECONDS);
126 if (messageForSending == null) {
130 // Send the message in its message holder
131 InetAddress local = getLocalAddress();
132 final MessageHolder<Message> messageHolder = new MessageHolder<>(local);
133 messageHolder.addMessage(messageForSending);
134 service.send(messageHolder);
136 } catch (final InterruptedException e) {
137 // Message sending has been interrupted, we are finished
138 LOGGER.debug("engine<-->deployment client interrupted");
139 // restore the interrupt status
140 thisThread.interrupt();
148 * Get the local address for the WS MessageHolder, or null if there is a problem.
150 private InetAddress getLocalAddress() {
152 return MessagingUtils.getLocalHostLanAddress();
154 catch (UnknownHostException e) {
155 LOGGER.debug("engine<-->deployment client failed to find the localhost address - continuing ...", e);
165 public String getHost() {
174 public int getPort() {
179 * Send an EngDep message to the Apex server.
181 * @param message the message to send to the Apex server
183 public void sendMessage(final Message message) {
184 sendQueue.add(message);
188 * Stop the deployment client.
190 public void stopClient() {
191 LOGGER.debug("engine<-->deployment test client stopping . . .");
192 thisThread.interrupt();
194 // Wait for the thread to stop
195 ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL);
197 // Close the Web Services connection
198 if (service != null) {
199 service.stopConnection();
202 LOGGER.debug("engine<-->deployment test client stopped . . .");
206 * Checks if the client thread is started.
208 * @return true, if the client thread is started
210 public boolean isStarted() {
215 * Allows users of this class to get a reference to the receive queue to receove messages.
217 * @return the receive queue
219 public BlockingQueue<Message> getReceiveQueue() {
224 * Get the number of messages received by the client.
225 * @return the number of messages received by the client
227 public long getMessagesReceived() {
228 return messagesReceived;
232 * Get the number of messages sent by the client.
233 * @return the number of messages sent by the client
235 public long getMessagesSent() {
240 * The listener interface for receiving deploymentClient events. The class that is interested in processing a
241 * deploymentClient event implements this interface, and the object created with that class is registered with a
242 * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event
243 * occurs, that object's appropriate method is invoked.
245 * @see DeploymentClientEvent
247 private class DeploymentClientListener implements MessageListener<Message> {
253 public void onMessage(final MessageBlock<Message> messageData) {
255 receiveQueue.addAll(messageData.getMessages());
262 public void onMessage(final String messageString) {
264 throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol");