a436bd7e32c62b5daf2fb7092a7251a06184c2f1
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server;
22
23 import java.net.InetSocketAddress;
24 import java.nio.ByteBuffer;
25
26 import org.java_websocket.WebSocket;
27 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
28 import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
29 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.RawMessageHandler;
30 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler;
31 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
32 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
33 import org.slf4j.ext.XLogger;
34 import org.slf4j.ext.XLoggerFactory;
35
36 /**
37  * The Class InternalMessageBusServer handles the server side of a web socket and handles the callback mechanism used to
38  * receive messages on the web socket.
39  *
40  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
41  * @param <M> the generic type
42  */
43 abstract class InternalMessageBusServer<M> extends WebSocketServerImpl implements MessagingService<M> {
44     // Logger for this class
45     private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusServer.class);
46
47     private static final int THREAD_FACTORY_STACK_SIZE = 256;
48
49     // Name of the event bus.
50     private static final String RAW_EVENT_BUS = "Raw-Event-Bus";
51
52     // This instance handles the raw data received from the web socket
53     private final RawMessageHandler<M> rawMessageHandler = new RawMessageHandler<>();
54
55     // The message block handler to which to pass messages coming in on this client
56     private MessageBlockHandler<M> messageBlockHandler = null;
57
58     // The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that
59     // thread. These fields hold the thread and
60     // the thread factory for creating threads.
61     private ApplicationThreadFactory threadFactory =
62             new ApplicationThreadFactory("ws-server-thread", THREAD_FACTORY_STACK_SIZE);
63     private Thread forwarderThread = null;
64
65     /**
66      * Construct the class and start the forwarding thread for received messages.
67      *
68      * @param address the address of the server machine
69      */
70     protected InternalMessageBusServer(final InetSocketAddress address) {
71         // Call the super class to create the web socket
72         super(address);
73         LOGGER.entry(address.getAddress().getHostAddress() + ":" + address.getPort());
74
75         // Create the data handler for forwarding messages
76         messageBlockHandler = new MessageBlockHandler<>(RAW_EVENT_BUS);
77         messageBlockHandler.registerMessageHandler(rawMessageHandler);
78
79         // Create the thread that manages the queue in the data handler
80         forwarderThread = threadFactory.newThread(rawMessageHandler);
81         forwarderThread.start();
82
83         LOGGER.exit();
84     }
85
86     /**
87      * Callback for binary messages received from the remote host.
88      *
89      * @param webSocket the web socket on which the raw message was received
90      * @param rawMessage the received raw message
91      * @see #onMessage(WebSocket, String)
92      */
93     @Override
94     public void onMessage(final WebSocket webSocket, final ByteBuffer rawMessage) {
95         messageBlockHandler.post(new RawMessageBlock(rawMessage, webSocket));
96     }
97
98     /*
99      * (non-Javadoc)
100      *
101      * @see org.java_websocket.server.WebSocketServer#onMessage(org.java_websocket.WebSocket, java.lang.String)
102      */
103     @Override
104     public void onMessage(final WebSocket webSocket, final String stringMessage) {
105         messageBlockHandler.post(stringMessage);
106     }
107
108     /**
109      * Register a subscriber class to the raw message handler.
110      *
111      * @param subscriber the subscriber
112      */
113     @Override
114     public void addMessageListener(final MessageListener<M> subscriber) {
115         rawMessageHandler.registerDataForwarder(subscriber);
116     }
117
118     /**
119      * Removes the message listener.
120      *
121      * @param subscriber the subscriber
122      */
123     @Override
124     public void removeMessageListener(final MessageListener<M> subscriber) {
125         rawMessageHandler.unRegisterDataForwarder(subscriber);
126     }
127
128     /**
129      * Stop the thread handling message forwarding.
130      */
131     protected void stopListener() {
132         rawMessageHandler.shutdown();
133     }
134 }