package org.onap.ccsdk.features.sdnr.wt.websocketmanager2;
import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.json.JSONObject;
private static final String APPLICATION_NAME = WebSocketManager.class.getName();
private static final int PORT = 8181;
private final EventInputCallback rpcEventInputCallback;
-
+ private final AkkaConfig akkaConfig;
/**
* timeout for websocket with no messages in ms
*/
private final ArrayList<URI> clusterNodeClients = new ArrayList<>();
public WebSocketManager() {
- super();
- rpcEventInputCallback = message -> {
- LOG.debug("onMessagePushed: " + message);
- SyncWebSocketClient client;
- for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
- client = new SyncWebSocketClient(clientURI);
- LOG.debug("try to push message to " + client.getURI());
- client.openAndSendAndCloseSync(message);
+ this(null, null);
+ }
+
+ public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) {
+ super();
+ this.akkaConfig = akkaconfig;
+ if (cb != null) {
+ this.rpcEventInputCallback = cb;
+ } else {
+ this.rpcEventInputCallback = message -> {
+ LOG.debug("onMessagePushed: " + message);
+ SyncWebSocketClient client;
+ for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
+ client = new SyncWebSocketClient(clientURI);
+ LOG.debug("try to push message to " + client.getURI());
+ client.openAndSendAndCloseSync(message);
+ }
+ };
}
- };
LOG.info("Create servlet for {}", APPLICATION_NAME);
}
// register Socket as the WebSocket to create on Upgrade
factory.register(WebSocketManagerSocket.class);
- AkkaConfig cfg = null;
- try {
- cfg = AkkaConfig.load();
- } catch (Exception e) {
- LOG.warn("problem loading akka config: " + e.getMessage());
+ AkkaConfig cfg = this.akkaConfig;
+ if (cfg == null) {
+ try {
+ cfg = AkkaConfig.load();
+ } catch (Exception e) {
+ LOG.warn("problem loading akka config: " + e.getMessage());
+ }
}
if (cfg != null && cfg.isCluster()) {
this.initWSClients(cfg.getClusterConfig());
public ListenableFuture<RpcResult<WebsocketEventOutput>> websocketEvent(WebsocketEventInput input) {
LOG.debug("Send message '{}'", input);
RpcResultBuilder<WebsocketEventOutput> result;
- try {
- WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder();
- final String s = input.getXmlEvent();
- WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), s);
- outputBuilder.setResponse("OK");
+
+ final String eventAsXmlString = input.getXmlEvent();
+ if (eventAsXmlString != null) {
+ WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), eventAsXmlString);
try {
JSONObject o = new JSONObject();
o.put(WebSocketManagerSocket.KEY_NODENAME, input.getNodeName());
o.put(WebSocketManagerSocket.KEY_EVENTTYPE, input.getEventType());
o.put(WebSocketManagerSocket.KEY_XMLEVENT, input.getXmlEvent());
this.rpcEventInputCallback.onMessagePushed(o.toString());
+
+ WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder();
+ outputBuilder.setResponse("OK");
+ result = RpcResultBuilder.success(outputBuilder);
} catch (Exception err) {
LOG.warn("problem pushing messsage to other nodes: " + err.getMessage());
+ result = RpcResultBuilder.failed();
+ result.withError(ErrorType.APPLICATION, "Exception", err);
}
- result = RpcResultBuilder.success(outputBuilder);
- } catch (Exception e) {
- LOG.warn("Socketproblem: {}", e);
+ } else {
+ String msg = "Emtpy event received";
+ LOG.warn(msg);
result = RpcResultBuilder.failed();
- result.withError(ErrorType.APPLICATION, "Exception", e);
+ result.withError(ErrorType.APPLICATION, msg);
}
return result.buildFuture();
}
* Private functions
*/
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ if (req.getHeader("Upgrade") != null) {
+ /* Accept upgrade request */
+ resp.setStatus(101);
+ resp.setHeader("Upgrade", "XYZP");
+ resp.setHeader("Connection", "Upgrade");
+ resp.setHeader("OtherHeaderB", "Value");
+ }
+ }
+
private void initWSClients(ClusterConfig clusterConfig) {
for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) {
if (clusterConfig.isMe(nodeConfig)) {