Merge "Adding plugins-event module to apex-pdp"
[policy/apex-pdp.git] / core / core-deployment / src / main / java / org / onap / policy / apex / core / deployment / DeploymentClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.core.deployment;
22
23 import java.net.URI;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingQueue;
26
27 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
28 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
29 import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
30 import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
31 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
32 import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
33 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
34 import org.onap.policy.apex.core.protocols.Message;
35 import org.slf4j.ext.XLogger;
36 import org.slf4j.ext.XLoggerFactory;
37
38 import com.google.common.eventbus.Subscribe;
39
40 /**
41  * The Class DeploymentClient handles the client side of an EngDep communication session with an
42  * Apex server. It runs a thread to handle message sending and session monitoring. It uses a sending
43  * queue to queue messages for sending by the client thread and a receiving queue to queue messages
44  * received from the Apex engine.
45  *
46  * @author Liam Fallon (liam.fallon@ericsson.com)
47  */
48 public class DeploymentClient implements Runnable {
49     private static final XLogger LOGGER = XLoggerFactory.getXLogger(DeploymentClient.class);
50
51     private static final int CLIENT_STOP_WAIT_INTERVAL = 100;
52
53     // Host and port to use for EngDep messaging
54     private String host = null;
55     private int port = 0;
56
57     // Messaging service is used to transmit and receive messages over the web socket
58     private static MessagingServiceFactory<Message> factory = new MessagingServiceFactory<>();
59     private MessagingService<Message> service = null;
60
61     // Send and receive queues for message buffering
62     private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<>();
63     private final BlockingQueue<Message> receiveQueue = new LinkedBlockingQueue<>();
64
65     // Thread management fields
66     private boolean started = false;
67     private Thread thisThread = null;
68
69     /**
70      * Instantiates a new deployment client.
71      *
72      * @param host the host name that the EngDep server is running on
73      * @param port the port the port the EngDep server is using
74      */
75     public DeploymentClient(final String host, final int port) {
76         this.host = host;
77         this.port = port;
78     }
79
80     /*
81      * (non-Javadoc)
82      *
83      * @see java.lang.Runnable#run()
84      */
85     @Override
86     public void run() {
87         LOGGER.debug("engine<-->deployment to \"ws://" + host + ":" + port + "\" thread starting . . .");
88
89         // Set up the thread name
90         thisThread = Thread.currentThread();
91         thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
92
93         try {
94             // Establish a connection to the Apex server for EngDep message communication over Web
95             // Sockets
96             service = factory.createClient(new URI("ws://" + host + ":" + port));
97             service.addMessageListener(new DeploymentClientListener());
98
99             service.startConnection();
100             started = true;
101             LOGGER.debug("engine<-->deployment client thread started");
102         } catch (final Exception e) {
103             LOGGER.error("engine<-->deployment client thread exception", e);
104             return;
105         }
106         // Loop forever, sending messages as they appear on the queue
107         while (true) {
108             try {
109                 final Message messageForSending = sendQueue.take();
110                 sendMessage(messageForSending);
111             } catch (final InterruptedException e) {
112                 // Message sending has been interrupted, we are finished
113                 LOGGER.debug("engine<-->deployment client interrupted");
114                 // restore the interrupt status
115                 thisThread.interrupt();
116                 break;
117             }
118         }
119
120         // Thread has been interrupted
121         thisThread = null;
122         LOGGER.debug("engine<-->deployment client thread finished");
123     }
124
125     /**
126      * Send an EngDep message to the Apex server.
127      *
128      * @param message the message to send to the Apex server
129      */
130     public void sendMessage(final Message message) {
131         final MessageHolder<Message> messageHolder = new MessageHolder<>(MessagingUtils.getHost());
132
133         // Send the message in its message holder
134         messageHolder.addMessage(message);
135         service.send(messageHolder);
136     }
137
138     /**
139      * Stop the deployment client.
140      */
141     public void stopClient() {
142         LOGGER.debug("engine<-->deployment test client stopping . . .");
143         thisThread.interrupt();
144
145         // Wait for the thread to stop
146         while (thisThread != null && thisThread.isAlive()) {
147             ThreadUtilities.sleep(CLIENT_STOP_WAIT_INTERVAL);
148         }
149
150         // Close the Web Services connection
151         service.stopConnection();
152         started = false;
153         LOGGER.debug("engine<-->deployment test client stopped . . .");
154     }
155
156     /**
157      * Checks if the client thread is started.
158      *
159      * @return true, if the client thread is started
160      */
161     public boolean isStarted() {
162         return started;
163     }
164
165     /**
166      * Allows users of this class to get a reference to the receive queue to receove messages.
167      *
168      * @return the receive queue
169      */
170     public BlockingQueue<Message> getReceiveQueue() {
171         return receiveQueue;
172     }
173
174     /**
175      * The listener interface for receiving deploymentClient events. The class that is interested in
176      * processing a deploymentClient event implements this interface, and the object created with
177      * that class is registered with a component using the component's
178      * {@code addDeploymentClientListener} method. When the deploymentClient event occurs, that
179      * object's appropriate method is invoked.
180      *
181      * @see DeploymentClientEvent
182      */
183     private class DeploymentClientListener implements MessageListener<Message> {
184         /*
185          * (non-Javadoc)
186          *
187          * @see
188          * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.
189          * policy.apex.core. infrastructure.messaging.impl.ws.messageblock. MessageBlock)
190          */
191         @Subscribe
192         @Override
193         public void onMessage(final MessageBlock<Message> messageData) {
194             receiveQueue.addAll(messageData.getMessages());
195         }
196
197         /*
198          * (non-Javadoc)
199          *
200          * @see
201          * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.
202          * String)
203          */
204         @Override
205         public void onMessage(final String messageString) {
206             throw new UnsupportedOperationException("String mesages are not supported on the EngDep protocol");
207         }
208     }
209 }