Sonar Fixes
[policy/apex-pdp.git] / core / core-infrastructure / src / main / java / org / onap / policy / apex / core / infrastructure / messaging / impl / ws / RawMessageHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.core.infrastructure.messaging.impl.ws;
23
24 import com.google.common.eventbus.Subscribe;
25 import java.io.ByteArrayInputStream;
26 import java.io.IOException;
27 import java.io.ObjectInputStream;
28 import java.nio.ByteBuffer;
29 import java.util.List;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingDeque;
32 import java.util.concurrent.TimeUnit;
33 import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
34 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
35 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
36 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler;
37 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
38 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
39 import org.slf4j.ext.XLogger;
40 import org.slf4j.ext.XLoggerFactory;
41
42 /**
43  * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the
44  * DataHandler instance that has subscribed to the RawMessageHandler instance.
45  *
46  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
47  * @param <M> the generic type of message being received
48  */
49 public class RawMessageHandler<M> implements WebSocketMessageListener<M>, Runnable {
50     // The logger for this class
51     private static final XLogger LOGGER = XLoggerFactory.getXLogger(RawMessageHandler.class);
52
53     // Repeated string constants
54     private static final String RAW_MESSAGE_LISTENING_INTERRUPTED = "raw message listening has been interrupted";
55
56     // The amount of time to sleep during shutdown for the thread of this message handler to stop
57     private static final int SHUTDOWN_WAIT_TIME = 10;
58
59     // The timeout to wait between queue poll timeouts in milliseconds
60     private static final long QUEUE_POLL_TIMEOUT = 50;
61
62     // A queue that temporarily holds message blocks
63     private final BlockingQueue<MessageBlock<M>> messageBlockQueue = new LinkedBlockingDeque<>();
64
65     // A queue that temporarily holds message blocks
66     private final BlockingQueue<String> stringMessageQueue = new LinkedBlockingDeque<>();
67
68     // Client applications that have subscribed for messages
69     private final MessageBlockHandler<M> dataHandler = new MessageBlockHandler<>("data-processor");
70
71     // The thread that the raw message handler is receiving messages on
72     private Thread thisThread = null;
73
74     /**
75      * This method is called by the class with which this message listener has been registered.
76      *
77      * @param incomingData the data forwarded by the message reception class
78      */
79     @Override
80     @Subscribe
81     public void onMessage(final RawMessageBlock incomingData) {
82         // Sanity check and get incoming data
83         ByteBuffer dataByteBuffer = null;
84         if (incomingData != null && incomingData.getMessage() != null) {
85             dataByteBuffer = incomingData.getMessage();
86         } else {
87             return;
88         }
89
90         // Read the messages from the web socket and place them on the message queue for handling by
91         // the queue
92         // processing thread
93
94         try (final ByteArrayInputStream stream = new ByteArrayInputStream(dataByteBuffer.array());
95                         final ObjectInputStream ois = new ObjectInputStream(stream)) {
96             @SuppressWarnings("unchecked")
97             final MessageHolder<M> messageHolder = (MessageHolder<M>) ois.readObject();
98
99             if (LOGGER.isDebugEnabled()) {
100                 LOGGER.debug("message {} recieved from the client {} ", messageHolder,
101                                 messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress());
102             }
103
104             if (messageHolder != null) {
105                 final List<M> messages = messageHolder.getMessages();
106                 if (messages != null) {
107                     messageBlockQueue.add(new MessageBlock<>(messages, incomingData.getConn()));
108                 }
109             }
110         } catch (final IOException | ClassNotFoundException e) {
111             LOGGER.error("Failed to process message received");
112             LOGGER.catching(e);
113         }
114     }
115
116     /**
117      * This method is called when a string message is received on a web socket and is to be forwarded to a listener.
118      *
119      * @param messageString the message string
120      */
121     @Override
122     @Subscribe
123     public void onMessage(final String messageString) {
124         if (messageString == null) {
125             return;
126         }
127         if (LOGGER.isDebugEnabled()) {
128             LOGGER.debug("message {} recieved from the client {} ", messageString);
129         }
130         stringMessageQueue.add(messageString);
131     }
132
133     /**
134      * This method is called when a message is received on a web socket and is to be forwarded to a listener.
135      *
136      * @param data the message data containing a message
137      */
138     @Override
139     public void onMessage(final MessageBlock<M> data) {
140         throw new UnsupportedOperationException("this operation is not supported");
141     }
142
143     /**
144      * This thread monitors the message queue and processes messages as they appear on the queue.
145      *
146      * @see java.lang.Runnable#run()
147      */
148     @Override
149     public void run() {
150         LOGGER.debug("raw message listening started");
151         thisThread = Thread.currentThread();
152
153         // Run until termination
154         while (thisThread.isAlive() && !thisThread.isInterrupted()) {
155             try {
156                 // Read message block messages from the queue and pass it to the data handler
157                 MessageBlock<M> messageBlock = null;
158                 while ((messageBlock = messageBlockQueue.poll(1, TimeUnit.MILLISECONDS)) != null) {
159                     dataHandler.post(messageBlock);
160                 }
161             } catch (final InterruptedException e) {
162                 // restore the interrupt status
163                 Thread.currentThread().interrupt();
164                 LOGGER.debug(RAW_MESSAGE_LISTENING_INTERRUPTED);
165                 break;
166             }
167
168             try {
169                 // Read string messages from the queue and pass it to the data handler
170                 String stringMessage = null;
171                 while ((stringMessage = stringMessageQueue.poll(1, TimeUnit.MILLISECONDS)) != null) {
172                     dataHandler.post(stringMessage);
173                 }
174             } catch (final InterruptedException e) {
175                 // restore the interrupt status
176                 Thread.currentThread().interrupt();
177                 LOGGER.debug(RAW_MESSAGE_LISTENING_INTERRUPTED);
178                 break;
179             }
180
181             // Wait for new messages
182             try {
183                 Thread.sleep(QUEUE_POLL_TIMEOUT);
184             } catch (final InterruptedException e) {
185                 // restore the interrupt status
186                 Thread.currentThread().interrupt();
187                 LOGGER.debug(RAW_MESSAGE_LISTENING_INTERRUPTED);
188                 break;
189             }
190         }
191
192         LOGGER.debug("raw message listening stopped");
193     }
194
195     /**
196      * Shutdown the message handler.
197      */
198     public void shutdown() {
199         LOGGER.entry("shutting down raw message listening . . .");
200
201         // Interrupt the message handling thread
202         thisThread.interrupt();
203
204         // Wait for thread shutdown
205         while (thisThread.isAlive()) {
206             ThreadUtilities.sleep(SHUTDOWN_WAIT_TIME);
207         }
208
209         LOGGER.exit("shut down raw message listening");
210     }
211
212     /**
213      * Register a data forwarder to which messages coming in on the web socket will be forwarded.
214      *
215      * @param listener The listener to register
216      */
217     @Override
218     public void registerDataForwarder(final MessageListener<M> listener) {
219         stateCheck(listener);
220         dataHandler.registerMessageHandler(listener);
221     }
222
223     /**
224      * Unregister a data forwarder that was previously registered on the web socket listener.
225      *
226      * @param listener The listener to unregister
227      */
228     @Override
229     public void unRegisterDataForwarder(final MessageListener<M> listener) {
230         stateCheck(listener);
231         dataHandler.unRegisterMessageHandler(listener);
232     }
233
234     /**
235      * Sanity check for the listener and data handler.
236      *
237      * @param listener the listener to check
238      */
239     private void stateCheck(final MessageListener<M> listener) {
240         if (listener == null) {
241             throw new IllegalArgumentException("The listener object cannot be null");
242         }
243     }
244 }