Merge "Add period after inheritDoc for Sonar"
[policy/apex-pdp.git] / core / core-deployment / src / main / java / org / onap / policy / apex / core / deployment / DeploymentClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.core.deployment;
22
23 import com.google.common.eventbus.Subscribe;
24
25 import java.net.URI;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
31 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
32 import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
33 import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
34 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
35 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
36 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
37 import org.onap.policy.apex.core.protocols.Message;
38 import org.slf4j.ext.XLogger;
39 import org.slf4j.ext.XLoggerFactory;
40
41 /**
42  * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a
43  * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the
44  * client thread and a receiving queue to queue messages received from the Apex engine.
45  *
46  * @author Liam Fallon (liam.fallon@ericsson.com)
47  */
48 public class DeploymentClient implements Runnable {
49     private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class);
50
51     private static final int CLIENT_STOP_WAIT_INTERVAL = 100;
52     private static final int CLIENT_SEND_QUEUE_TIMEOUT = 50;
53
54     // Host and port to use for EngDep messaging
55     private String host = null;
56     private int port = 0;
57
58     // Messaging service is used to transmit and receive messages over the web socket
59     private static MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>();
60     private MessagingService<Message> service = null;
61
62     // Send and receive queues for message buffering
63     private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<>();
64     private final BlockingQueue<Message> receiveQueue = new LinkedBlockingQueue<>();
65
66     // Thread management fields
67     private boolean started = false;
68     private Thread thisThread = null;
69     
70     // Number of messages processed
71     private long messagesSent = 0;
72     private long messagesReceived = 0;
73
74     /**
75      * Instantiates a new deployment client.
76      *
77      * @param host the host name that the EngDep server is running on
78      * @param port the port the port the EngDep server is using
79      */
80     public DeploymentClient(final String host, final int port) {
81         this.host = host;
82         this.port = port;
83     }
84
85     /*
86      * (non-Javadoc)
87      *
88      * @see java.lang.Runnable#run()
89      */
90     @Override
91     public void run() {
92         LOGGER.debug("engine<-->deployment to \"ws://{}:{}\" thread starting . . .", host, port);
93
94         // Set up the thread name
95         thisThread = Thread.currentThread();
96         thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
97
98         try {
99             // Establish a connection to the Apex server for EngDep message communication over Web
100             // Sockets
101             service = factory.createClient(new URI("ws://" + host + ":" + port));
102             service.addMessageListener(new DeploymentClientListener());
103
104             service.startConnection();
105             started = true;
106             LOGGER.debug("engine<-->deployment client thread started");
107         } catch (final Exception e) {
108             LOGGER.error("engine<-->deployment client thread exception", e);
109             return;
110         }
111         // Loop forever, sending messages as they appear on the queue
112         while (started && !thisThread.isInterrupted()) {
113             started = sendMessages();
114         }
115
116         // Thread has been interrupted
117         thisThread = null;
118         LOGGER.debug("engine<-->deployment client thread finished");
119     }
120
121     /**
122      * Send messages off the queue.
123      */
124     private boolean sendMessages() {
125         try {
126             final Message messageForSending = sendQueue.poll(CLIENT_SEND_QUEUE_TIMEOUT, TimeUnit.MILLISECONDS);
127             if (messageForSending == null) {
128                 return true;
129             }
130
131             // Send the message in its message holder
132             final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
133             messageHolder.addMessage(messageForSending);
134             service.send(messageHolder);
135             messagesSent++;
136         } catch (final InterruptedException e) {
137             // Message sending has been interrupted, we are finished
138             LOGGER.debug("engine<-->deployment client interrupted");
139             // restore the interrupt status
140             thisThread.interrupt();
141             return false;
142         }
143         
144         return true;
145     }
146
147     /**
148      * Gets the host.
149      *
150      * @return the host
151      */
152     public String getHost() {
153         return host;
154     }
155
156     /**
157      * Gets the port.
158      *
159      * @return the port
160      */
161     public int getPort() {
162         return port;
163     }
164
165     /**
166      * Send an EngDep message to the Apex server.
167      *
168      * @param message the message to send to the Apex server
169      */
170     public void sendMessage(final Message message) {
171         sendQueue.add(message);
172     }
173
174     /**
175      * Stop the deployment client.
176      */
177     public void stopClient() {
178         LOGGER.debug("engine<-->deployment test client stopping . . .");
179         thisThread.interrupt();
180
181         // Wait for the thread to stop
182         ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL);
183
184         // Close the Web Services connection
185         if (service != null) {
186             service.stopConnection();
187         }
188         started = false;
189         LOGGER.debug("engine<-->deployment test client stopped . . .");
190     }
191
192     /**
193      * Checks if the client thread is started.
194      *
195      * @return true, if the client thread is started
196      */
197     public boolean isStarted() {
198         return started;
199     }
200
201     /**
202      * Allows users of this class to get a reference to the receive queue to receove messages.
203      *
204      * @return the receive queue
205      */
206     public BlockingQueue<Message> getReceiveQueue() {
207         return receiveQueue;
208     }
209
210     /**
211      * Get the number of messages received by the client.
212      * @return the number of messages received by the client
213      */
214     public long getMessagesReceived() {
215         return messagesReceived;
216     }
217
218     /**
219      * Get the number of messages sent by the client.
220      * @return the number of messages sent by the client
221      */
222     public long getMessagesSent() {
223         return messagesSent;
224     }
225
226     /**
227      * The listener interface for receiving deploymentClient events. The class that is interested in processing a
228      * deploymentClient event implements this interface, and the object created with that class is registered with a
229      * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event
230      * occurs, that object's appropriate method is invoked.
231      *
232      * @see DeploymentClientEvent
233      */
234     private class DeploymentClientListener implements MessageListener<Message> {
235         /*
236          * (non-Javadoc)
237          *
238          * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap. policy.apex.core.
239          * infrastructure.messaging.impl.ws.messageblock. MessageBlock)
240          */
241         @Subscribe
242         @Override
243         public void onMessage(final MessageBlock<Message> messageData) {
244             messagesReceived++;
245             receiveQueue.addAll(messageData.getMessages());
246         }
247
248         /*
249          * (non-Javadoc)
250          *
251          * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. String)
252          */
253         @Override
254         public void onMessage(final String messageString) {
255             messagesReceived++;
256             throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol");
257         }
258     }
259 }