1 /*******************************************************************************
 
   2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
 
   6  * =================================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   8  * in compliance with the License. You may obtain a copy of the License at
 
  10  * http://www.apache.org/licenses/LICENSE-2.0
 
  12  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  14  * or implied. See the License for the specific language governing permissions and limitations under
 
  16  * ============LICENSE_END==========================================================================
 
  17  ******************************************************************************/
 
  18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager2;
 
  20 import com.google.common.util.concurrent.ListenableFuture;
 
  21 import java.io.IOException;
 
  23 import java.net.URISyntaxException;
 
  24 import java.util.ArrayList;
 
  25 import javax.servlet.ServletException;
 
  26 import javax.servlet.http.HttpServletRequest;
 
  27 import javax.servlet.http.HttpServletResponse;
 
  28 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
 
  29 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
 
  30 import org.json.JSONObject;
 
  31 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.WebSocketManagerSocket.EventInputCallback;
 
  32 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig;
 
  33 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig.ClusterConfig;
 
  34 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig.ClusterNodeInfo;
 
  35 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.websocket.SyncWebSocketClient;
 
  36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventInput;
 
  37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventOutput;
 
  38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventOutputBuilder;
 
  39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService;
 
  40 import org.opendaylight.yangtools.yang.common.RpcResult;
 
  41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 
  42 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 
  43 import org.slf4j.Logger;
 
  44 import org.slf4j.LoggerFactory;
 
  46 public class WebSocketManager extends WebSocketServlet implements WebsocketmanagerService {
 
  48     private static final long serialVersionUID = -681665669062744439L;
 
  50     private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName());
 
  51     private static final String APPLICATION_NAME = WebSocketManager.class.getName();
 
  52     private static final int PORT = 8181;
 
  53     private final EventInputCallback rpcEventInputCallback;
 
  54     private final AkkaConfig akkaConfig;
 
  56      * timeout for websocket with no messages in ms
 
  58     //private static final long IDLE_TIMEOUT = 5 * 60 * 1000L;
 
  59     private static final long IDLE_TIMEOUT =0L;
 
  61     private final ArrayList<URI> clusterNodeClients = new ArrayList<>();
 
  63     public WebSocketManager() {
 
  67     public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) {
 
  69         this.akkaConfig = akkaconfig;
 
  71             this.rpcEventInputCallback = cb;
 
  73             this.rpcEventInputCallback = message -> {
 
  74                 LOG.debug("onMessagePushed: " + message);
 
  75                 SyncWebSocketClient client;
 
  76                 for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
 
  77                     client = new SyncWebSocketClient(clientURI);
 
  78                     LOG.debug("try to push message to " + client.getURI());
 
  79                     client.openAndSendAndCloseSync(message);
 
  83         LOG.info("Create servlet for {}", APPLICATION_NAME);
 
  87     public void configure(WebSocketServletFactory factory) {
 
  88         LOG.info("Configure provider for {}", APPLICATION_NAME);
 
  89         // set a second timeout
 
  90         factory.getPolicy().setIdleTimeout(IDLE_TIMEOUT);
 
  91         factory.getPolicy().setMaxBinaryMessageSize(1);
 
  92         factory.getPolicy().setMaxTextMessageSize(64 * 1024);
 
  94         // register Socket as the WebSocket to create on Upgrade
 
  95         factory.register(WebSocketManagerSocket.class);
 
  97         AkkaConfig cfg = this.akkaConfig;
 
 100                 cfg = AkkaConfig.load();
 
 101             } catch (Exception e) {
 
 102                 LOG.warn("problem loading akka config: " + e.getMessage());
 
 105         if (cfg != null && cfg.isCluster()) {
 
 106             this.initWSClients(cfg.getClusterConfig());
 
 110     // ODL in Dublin version generates ListenableFuture that is child of Future.
 
 112     public ListenableFuture<RpcResult<WebsocketEventOutput>> websocketEvent(WebsocketEventInput input) {
 
 113         LOG.debug("Send message '{}'", input);
 
 114         RpcResultBuilder<WebsocketEventOutput> result;
 
 116         final String eventAsXmlString = input.getXmlEvent();
 
 117         if (eventAsXmlString != null) {
 
 118             WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), eventAsXmlString);
 
 120                 JSONObject o = new JSONObject();
 
 121                 o.put(WebSocketManagerSocket.KEY_NODENAME, input.getNodeName());
 
 122                 o.put(WebSocketManagerSocket.KEY_EVENTTYPE, input.getEventType());
 
 123                 o.put(WebSocketManagerSocket.KEY_XMLEVENT, input.getXmlEvent());
 
 124                 this.rpcEventInputCallback.onMessagePushed(o.toString());
 
 126                 WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder();
 
 127                 outputBuilder.setResponse("OK");
 
 128                 result = RpcResultBuilder.success(outputBuilder);
 
 129             } catch (Exception err) {
 
 130                 LOG.warn("problem pushing messsage to other nodes: " + err.getMessage());
 
 131                 result = RpcResultBuilder.failed();
 
 132                 result.withError(ErrorType.APPLICATION, "Exception", err);
 
 135             String msg = "Emtpy event received";
 
 137             result = RpcResultBuilder.failed();
 
 138             result.withError(ErrorType.APPLICATION, msg);
 
 140         return result.buildFuture();
 
 143     /**********************************************************
 
 148     protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 
 149         if (req.getHeader("Upgrade") != null) {
 
 150             /* Accept upgrade request */
 
 152             resp.setHeader("Upgrade", "XYZP");
 
 153             resp.setHeader("Connection", "Upgrade");
 
 154             resp.setHeader("OtherHeaderB", "Value");
 
 158     private void initWSClients(ClusterConfig clusterConfig) {
 
 159         for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) {
 
 160             if (clusterConfig.isMe(nodeConfig)) {
 
 163             String url = String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(), PORT);
 
 165                 LOG.debug("registering ws client for " + url);
 
 166                 clusterNodeClients.add(new URI(url));
 
 167             } catch (URISyntaxException e) {
 
 168                 LOG.warn("problem instantiating wsclient for url: " + url);