2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server;
23 import java.net.InetSocketAddress;
24 import java.nio.ByteBuffer;
25 import org.java_websocket.WebSocket;
26 import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
27 import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
28 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.RawMessageHandler;
29 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler;
30 import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
31 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
32 import org.slf4j.ext.XLogger;
33 import org.slf4j.ext.XLoggerFactory;
36 * The Class InternalMessageBusServer handles the server side of a web socket and handles the callback mechanism used to
37 * receive messages on the web socket.
39 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
40 * @param <M> the generic type
42 abstract class InternalMessageBusServer<M> extends WebSocketServerImpl implements MessagingService<M> {
43 // Logger for this class
44 private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusServer.class);
46 private static final int THREAD_FACTORY_STACK_SIZE = 256;
48 // Name of the event bus.
49 private static final String RAW_EVENT_BUS = "Raw-Event-Bus";
51 // This instance handles the raw data received from the web socket
52 private final RawMessageHandler<M> rawMessageHandler = new RawMessageHandler<>();
54 // The message block handler to which to pass messages coming in on this client
55 private MessageBlockHandler<M> messageBlockHandler = null;
57 // The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that
58 // thread. These fields hold the thread and
59 // the thread factory for creating threads.
60 private ApplicationThreadFactory threadFactory =
61 new ApplicationThreadFactory("ws-server-thread", THREAD_FACTORY_STACK_SIZE);
62 private Thread forwarderThread = null;
65 * Construct the class and start the forwarding thread for received messages.
67 * @param address the address of the server machine
69 protected InternalMessageBusServer(final InetSocketAddress address) {
70 // Call the super class to create the web socket
72 LOGGER.entry(address.getAddress().getHostAddress() + ":" + address.getPort());
74 // Create the data handler for forwarding messages
75 messageBlockHandler = new MessageBlockHandler<>(RAW_EVENT_BUS);
76 messageBlockHandler.registerMessageHandler(rawMessageHandler);
78 // Create the thread that manages the queue in the data handler
79 forwarderThread = threadFactory.newThread(rawMessageHandler);
80 forwarderThread.start();
86 * Callback for binary messages received from the remote host.
88 * @param webSocket the web socket on which the raw message was received
89 * @param rawMessage the received raw message
90 * @see #onMessage(WebSocket, String)
93 public void onMessage(final WebSocket webSocket, final ByteBuffer rawMessage) {
94 messageBlockHandler.post(new RawMessageBlock(rawMessage, webSocket));
101 public void onMessage(final WebSocket webSocket, final String stringMessage) {
102 messageBlockHandler.post(stringMessage);
106 * Register a subscriber class to the raw message handler.
108 * @param subscriber the subscriber
111 public void addMessageListener(final MessageListener<M> subscriber) {
112 rawMessageHandler.registerDataForwarder(subscriber);
116 * Removes the message listener.
118 * @param subscriber the subscriber
121 public void removeMessageListener(final MessageListener<M> subscriber) {
122 rawMessageHandler.unRegisterDataForwarder(subscriber);
126 * Stop the thread handling message forwarding.
128 protected void stopListener() {
129 rawMessageHandler.shutdown();