Changes for checkstyle 8.32
[policy/apex-pdp.git] / core / core-deployment / src / main / java / org / onap / policy / apex / core / deployment / DeploymentClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.core.deployment;
23
24 import com.google.common.eventbus.Subscribe;
25 import java.net.InetAddress;
26 import java.net.URI;
27 import java.net.UnknownHostException;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
32 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
33 import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
34 import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
35 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
36 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.core.protocols.Message;
39 import org.slf4j.ext.XLogger;
40 import org.slf4j.ext.XLoggerFactory;
41
42 /**
43  * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a
44  * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the
45  * client thread and a receiving queue to queue messages received from the Apex engine.
46  *
47  * @author Liam Fallon (liam.fallon@ericsson.com)
48  */
49 public class DeploymentClient implements Runnable {
50     private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class);
51
52     private static final int CLIENT_STOP_WAIT_INTERVAL = 100;
53     private static final int CLIENT_SEND_QUEUE_TIMEOUT = 50;
54
55     // Host and port to use for EngDep messaging
56     private String host = null;
57     private int port = 0;
58
59     // Messaging service is used to transmit and receive messages over the web socket
60     private static MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>();
61     private MessagingService<Message> service = null;
62
63     // Send and receive queues for message buffering
64     private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<>();
65     private final BlockingQueue<Message> receiveQueue = new LinkedBlockingQueue<>();
66
67     // Thread management fields
68     private boolean started = false;
69     private Thread thisThread = null;
70
71     // Number of messages processed
72     private long messagesSent = 0;
73     private long messagesReceived = 0;
74
75     /**
76      * Instantiates a new deployment client.
77      *
78      * @param host the host name that the EngDep server is running on
79      * @param port the port the port the EngDep server is using
80      */
81     public DeploymentClient(final String host, final int port) {
82         this.host = host;
83         this.port = port;
84     }
85
86     /**
87      * {@inheritDoc}.
88      */
89     @Override
90     public void run() {
91         LOGGER.debug("engine<-->deployment to \"ws://{}:{}\" thread starting . . .", host, port);
92
93         // Set up the thread name
94         thisThread = Thread.currentThread();
95         thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
96
97         try {
98             // Establish a connection to the Apex server for EngDep message communication over Web
99             // Sockets
100             service = factory.createClient(new URI("ws://" + host + ":" + port));
101             service.addMessageListener(new DeploymentClientListener());
102
103             service.startConnection();
104             started = true;
105             LOGGER.debug("engine<-->deployment client thread started");
106         } catch (final Exception e) {
107             LOGGER.error("engine<-->deployment client thread exception", e);
108             return;
109         }
110         // Loop forever, sending messages as they appear on the queue
111         while (started && !thisThread.isInterrupted()) {
112             started = sendMessages();
113         }
114
115         // Thread has been interrupted
116         thisThread = null;
117         LOGGER.debug("engine<-->deployment client thread finished");
118     }
119
120     /**
121      * Send messages off the queue.
122      */
123     private boolean sendMessages() {
124         try {
125             final Message messageForSending = sendQueue.poll(CLIENT_SEND_QUEUE_TIMEOUT, TimeUnit.MILLISECONDS);
126             if (messageForSending == null) {
127                 return true;
128             }
129
130             // Send the message in its message holder
131             InetAddress local = getLocalAddress();
132             final MessageHolder<Message> messageHolder = new MessageHolder<>(local);
133             messageHolder.addMessage(messageForSending);
134             service.send(messageHolder);
135             messagesSent++;
136         } catch (final InterruptedException e) {
137             // Message sending has been interrupted, we are finished
138             LOGGER.debug("engine<-->deployment client interrupted");
139             // restore the interrupt status
140             thisThread.interrupt();
141             return false;
142         }
143
144         return true;
145     }
146
147     /**
148      * Get the local address for the WS MessageHolder, or null if there is a problem.
149      */
150     private InetAddress getLocalAddress() {
151         try {
152             return MessagingUtils.getLocalHostLanAddress();
153         } catch (UnknownHostException e) {
154             LOGGER.debug("engine<-->deployment client failed to find the localhost address - continuing ...", e);
155             return null;
156         }
157     }
158
159     /**
160      * Gets the host.
161      *
162      * @return the host
163      */
164     public String getHost() {
165         return host;
166     }
167
168     /**
169      * Gets the port.
170      *
171      * @return the port
172      */
173     public int getPort() {
174         return port;
175     }
176
177     /**
178      * Send an EngDep message to the Apex server.
179      *
180      * @param message the message to send to the Apex server
181      */
182     public void sendMessage(final Message message) {
183         sendQueue.add(message);
184     }
185
186     /**
187      * Stop the deployment client.
188      */
189     public void stopClient() {
190         LOGGER.debug("engine<-->deployment test client stopping . . .");
191         thisThread.interrupt();
192
193         // Wait for the thread to stop
194         ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL);
195
196         // Close the Web Services connection
197         if (service != null) {
198             service.stopConnection();
199         }
200         started = false;
201         LOGGER.debug("engine<-->deployment test client stopped . . .");
202     }
203
204     /**
205      * Checks if the client thread is started.
206      *
207      * @return true, if the client thread is started
208      */
209     public boolean isStarted() {
210         return started;
211     }
212
213     /**
214      * Allows users of this class to get a reference to the receive queue to receove messages.
215      *
216      * @return the receive queue
217      */
218     public BlockingQueue<Message> getReceiveQueue() {
219         return receiveQueue;
220     }
221
222     /**
223      * Get the number of messages received by the client.
224      * @return the number of messages received by the client
225      */
226     public long getMessagesReceived() {
227         return messagesReceived;
228     }
229
230     /**
231      * Get the number of messages sent by the client.
232      * @return the number of messages sent by the client
233      */
234     public long getMessagesSent() {
235         return messagesSent;
236     }
237
238     /**
239      * The listener interface for receiving deploymentClient events. The class that is interested in processing a
240      * deploymentClient event implements this interface, and the object created with that class is registered with a
241      * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event
242      * occurs, that object's appropriate method is invoked.
243      *
244      * @see DeploymentClientEvent
245      */
246     private class DeploymentClientListener implements MessageListener<Message> {
247         /**
248          * {@inheritDoc}.
249          */
250         @Subscribe
251         @Override
252         public void onMessage(final MessageBlock<Message> messageData) {
253             messagesReceived++;
254             receiveQueue.addAll(messageData.getMessages());
255         }
256
257         /**
258          * {@inheritDoc}.
259          */
260         @Override
261         public void onMessage(final String messageString) {
262             messagesReceived++;
263             throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol");
264         }
265     }
266 }