2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.core.deployment;
23 import com.google.common.eventbus.Subscribe;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
30 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
31 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
32 import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
33 import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
34 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
35 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
36 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
37 import org.onap.policy.apex.core.protocols.Message;
38 import org.slf4j.ext.XLogger;
39 import org.slf4j.ext.XLoggerFactory;
42 * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a
43 * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the
44 * client thread and a receiving queue to queue messages received from the Apex engine.
46 * @author Liam Fallon (liam.fallon@ericsson.com)
48 public class DeploymentClient implements Runnable {
49 private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class);
51 private static final int CLIENT_STOP_WAIT_INTERVAL = 100;
52 private static final int CLIENT_SEND_QUEUE_TIMEOUT = 50;
54 // Host and port to use for EngDep messaging
55 private String host = null;
58 // Messaging service is used to transmit and receive messages over the web socket
59 private static MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>();
60 private MessagingService<Message> service = null;
62 // Send and receive queues for message buffering
63 private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<>();
64 private final BlockingQueue<Message> receiveQueue = new LinkedBlockingQueue<>();
66 // Thread management fields
67 private boolean started = false;
68 private Thread thisThread = null;
70 // Number of messages processed
71 private long messagesSent = 0;
72 private long messagesReceived = 0;
75 * Instantiates a new deployment client.
77 * @param host the host name that the EngDep server is running on
78 * @param port the port the port the EngDep server is using
80 public DeploymentClient(final String host, final int port) {
88 * @see java.lang.Runnable#run()
92 LOGGER.debug("engine<-->deployment to \"ws://{}:{}\" thread starting . . .", host, port);
94 // Set up the thread name
95 thisThread = Thread.currentThread();
96 thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
99 // Establish a connection to the Apex server for EngDep message communication over Web
101 service = factory.createClient(new URI("ws://" + host + ":" + port));
102 service.addMessageListener(new DeploymentClientListener());
104 service.startConnection();
106 LOGGER.debug("engine<-->deployment client thread started");
107 } catch (final Exception e) {
108 LOGGER.error("engine<-->deployment client thread exception", e);
111 // Loop forever, sending messages as they appear on the queue
112 while (started && !thisThread.isInterrupted()) {
113 started = sendMessages();
116 // Thread has been interrupted
118 LOGGER.debug("engine<-->deployment client thread finished");
122 * Send messages off the queue.
124 private boolean sendMessages() {
126 final Message messageForSending = sendQueue.poll(CLIENT_SEND_QUEUE_TIMEOUT, TimeUnit.MILLISECONDS);
127 if (messageForSending == null) {
131 // Send the message in its message holder
132 final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
133 messageHolder.addMessage(messageForSending);
134 service.send(messageHolder);
136 } catch (final InterruptedException e) {
137 // Message sending has been interrupted, we are finished
138 LOGGER.debug("engine<-->deployment client interrupted");
139 // restore the interrupt status
140 thisThread.interrupt();
152 public String getHost() {
161 public int getPort() {
166 * Send an EngDep message to the Apex server.
168 * @param message the message to send to the Apex server
170 public void sendMessage(final Message message) {
171 sendQueue.add(message);
175 * Stop the deployment client.
177 public void stopClient() {
178 LOGGER.debug("engine<-->deployment test client stopping . . .");
179 thisThread.interrupt();
181 // Wait for the thread to stop
182 ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL);
184 // Close the Web Services connection
185 if (service != null) {
186 service.stopConnection();
189 LOGGER.debug("engine<-->deployment test client stopped . . .");
193 * Checks if the client thread is started.
195 * @return true, if the client thread is started
197 public boolean isStarted() {
202 * Allows users of this class to get a reference to the receive queue to receove messages.
204 * @return the receive queue
206 public BlockingQueue<Message> getReceiveQueue() {
211 * Get the number of messages received by the client.
212 * @return the number of messages received by the client
214 public long getMessagesReceived() {
215 return messagesReceived;
219 * Get the number of messages sent by the client.
220 * @return the number of messages sent by the client
222 public long getMessagesSent() {
227 * The listener interface for receiving deploymentClient events. The class that is interested in processing a
228 * deploymentClient event implements this interface, and the object created with that class is registered with a
229 * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event
230 * occurs, that object's appropriate method is invoked.
232 * @see DeploymentClientEvent
234 private class DeploymentClientListener implements MessageListener<Message> {
238 * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap. policy.apex.core.
239 * infrastructure.messaging.impl.ws.messageblock. MessageBlock)
243 public void onMessage(final MessageBlock<Message> messageData) {
245 receiveQueue.addAll(messageData.getMessages());
251 * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. String)
254 public void onMessage(final String messageString) {
256 throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol");