17391fb89ea54851cb2cd3ad9d9c117852e5f38d
[policy/apex-pdp.git] / core / core-infrastructure / src / main / java / org / onap / policy / apex / core / infrastructure / messaging / impl / ws / client / InternalMessageBusClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client;
22
23 import java.net.URI;
24 import java.nio.ByteBuffer;
25
26 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
27 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.RawMessageHandler;
28 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler;
29 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.slf4j.ext.XLogger;
32 import org.slf4j.ext.XLoggerFactory;
33
34 /**
35  * The Class InternalMessageBusClient handles the client side of a web socket and handles the callback mechanism used to
36  * receive messages on the web socket.
37  *
38  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
39  * @param <M> the generic type of message being handled
40  */
41 abstract class InternalMessageBusClient<M> extends WebSocketClientImpl {
42     private static final int THREAD_FACTORY_STACK_SIZE = 256;
43
44     // The logger for this class
45     private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusClient.class);
46
47     // Name of the event bus.
48     private static final String RAW_EVENT_BUS = "Raw-Event-Bus";
49
50     // This instance handles the raw data received from the web socket
51     private final RawMessageHandler<M> rawMessageHandler = new RawMessageHandler<>();
52
53     // The message block handler to which to pass messages coming in on this client
54     private MessageBlockHandler<M> messageBlockHandler = null;
55
56     // The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that
57     // thread. These fields hold the thread and
58     // the thread factory for creating threads.
59     private ApplicationThreadFactory threadFactory =
60             new ApplicationThreadFactory("ws-client-thread", THREAD_FACTORY_STACK_SIZE);
61     private Thread forwarderThread = null;
62
63     /**
64      * Construct the class and start the forwarding thread for received messages.
65      *
66      * @param serverUri the server URI to connect to
67      */
68     InternalMessageBusClient(final URI serverUri) {
69         // Call the super class to create the web socket
70         super(serverUri);
71         LOGGER.entry(serverUri.toString());
72
73         // Create the data handler for forwarding messages
74         messageBlockHandler = new MessageBlockHandler<>(RAW_EVENT_BUS);
75         messageBlockHandler.registerMessageHandler(rawMessageHandler);
76
77         // Create the thread that manages the queue in the data handler
78         forwarderThread = threadFactory.newThread(rawMessageHandler);
79         forwarderThread.start();
80
81         LOGGER.exit();
82     }
83
84     /**
85      * Callback for binary messages received from the remote host.
86      *
87      * @param rawMessage the received raw message
88      * @see org.java_websocket.client.WebSocketClient#onMessage(java.nio.ByteBuffer)
89      */
90     @Override
91     public void onMessage(final ByteBuffer rawMessage) {
92         // Post the message to the data handler for forwarding to its listeners
93         messageBlockHandler.post(new RawMessageBlock(rawMessage, null));
94     }
95
96     /**
97      * Callback for binary messages received from the remote host.
98      *
99      * @param stringMessage the string message
100      * @see org.java_websocket.client.WebSocketClient#onMessage(java.lang.String)
101      */
102     @Override
103     public final void onMessage(final String stringMessage) {
104         messageBlockHandler.post(stringMessage);
105     }
106
107     /**
108      * Register a subscriber class to the raw message handler.
109      *
110      * @param listener a simple class, that listens for the events from Event
111      */
112     public void addMessageListener(final MessageListener<M> listener) {
113         rawMessageHandler.registerDataForwarder(listener);
114     }
115
116     /**
117      * Removes the message listener.
118      *
119      * @param listener the listener
120      */
121     public void removeMessageListener(final MessageListener<M> listener) {
122         rawMessageHandler.unRegisterDataForwarder(listener);
123     }
124
125     /**
126      * Stop the thread handling message forwarding.
127      */
128     protected void stopListener() {
129         rawMessageHandler.shutdown();
130     }
131 }