2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.core.deployment;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingQueue;
27 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
28 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
29 import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
30 import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
31 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
32 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
33 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
34 import org.onap.policy.apex.core.protocols.Message;
35 import org.slf4j.ext.XLogger;
36 import org.slf4j.ext.XLoggerFactory;
38 import com.google.common.eventbus.Subscribe;
41 * The Class DeploymentClient handles the client side of an EngDep communication session with an
42 * Apex server. It runs a thread to handle message sending and session monitoring. It uses a sending
43 * queue to queue messages for sending by the client thread and a receiving queue to queue messages
44 * received from the Apex engine.
46 * @author Liam Fallon (liam.fallon@ericsson.com)
48 public class DeploymentClient implements Runnable {
49 private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class);
51 private static final int CLIENT_STOP_WAIT_INTERVAL = 100;
53 // Host and port to use for EngDep messaging
54 private String host = null;
57 // Messaging service is used to transmit and receive messages over the web socket
58 private static MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>();
59 private MessagingService<Message> service = null;
61 // Send and receive queues for message buffering
62 private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<>();
63 private final BlockingQueue<Message> receiveQueue = new LinkedBlockingQueue<>();
65 // Thread management fields
66 private boolean started = false;
67 private Thread thisThread = null;
70 * Instantiates a new deployment client.
72 * @param host the host name that the EngDep server is running on
73 * @param port the port the port the EngDep server is using
75 public DeploymentClient(final String host, final int port) {
83 * @see java.lang.Runnable#run()
87 LOGGER.debug("engine<-->deployment to \"ws://" + host + ":" + port + "\" thread starting . . .");
89 // Set up the thread name
90 thisThread = Thread.currentThread();
91 thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
94 // Establish a connection to the Apex server for EngDep message communication over Web
96 service = factory.createClient(new URI("ws://" + host + ":" + port));
97 service.addMessageListener(new DeploymentClientListener());
99 service.startConnection();
101 LOGGER.debug("engine<-->deployment client thread started");
102 } catch (final Exception e) {
103 LOGGER.error("engine<-->deployment client thread exception", e);
106 // Loop forever, sending messages as they appear on the queue
109 final Message messageForSending = sendQueue.take();
110 sendMessage(messageForSending);
111 } catch (final InterruptedException e) {
112 // Message sending has been interrupted, we are finished
113 LOGGER.debug("engine<-->deployment client interrupted");
114 // restore the interrupt status
115 thisThread.interrupt();
120 // Thread has been interrupted
122 LOGGER.debug("engine<-->deployment client thread finished");
126 * Send an EngDep message to the Apex server.
128 * @param message the message to send to the Apex server
130 public void sendMessage(final Message message) {
131 final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
133 // Send the message in its message holder
134 messageHolder.addMessage(message);
135 service.send(messageHolder);
139 * Stop the deployment client.
141 public void stopClient() {
142 LOGGER.debug("engine<-->deployment test client stopping . . .");
143 thisThread.interrupt();
145 // Wait for the thread to stop
146 while (thisThread != null && thisThread.isAlive()) {
147 ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL);
150 // Close the Web Services connection
151 service.stopConnection();
153 LOGGER.debug("engine<-->deployment test client stopped . . .");
157 * Checks if the client thread is started.
159 * @return true, if the client thread is started
161 public boolean isStarted() {
166 * Allows users of this class to get a reference to the receive queue to receove messages.
168 * @return the receive queue
170 public BlockingQueue<Message> getReceiveQueue() {
175 * The listener interface for receiving deploymentClient events. The class that is interested in
176 * processing a deploymentClient event implements this interface, and the object created with
177 * that class is registered with a component using the component's
178 * {@code addDeploymentClientListener} method. When the deploymentClient event occurs, that
179 * object's appropriate method is invoked.
181 * @see DeploymentClientEvent
183 private class DeploymentClientListener implements MessageListener<Message> {
188 * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.
189 * policy.apex.core. infrastructure.messaging.impl.ws.messageblock. MessageBlock)
193 public void onMessage(final MessageBlock<Message> messageData) {
194 receiveQueue.addAll(messageData.getMessages());
201 * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.
205 public void onMessage(final String messageString) {
206 throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol");