2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
 
  23 import org.glassfish.jersey.media.sse.EventSource;
 
  24 import org.glassfish.jersey.media.sse.SseFeature;
 
  25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
 
  26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
 
  27 import org.onap.ccsdk.sli.core.utils.common.AcceptIpAddressHostNameVerifier;
 
  28 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
 
  29 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
 
  30 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
 
  31 import org.slf4j.Logger;
 
  33 import javax.net.ssl.SSLContext;
 
  34 import javax.net.ssl.TrustManager;
 
  35 import javax.net.ssl.X509TrustManager;
 
  36 import javax.ws.rs.client.Client;
 
  37 import javax.ws.rs.client.ClientBuilder;
 
  38 import javax.ws.rs.client.WebTarget;
 
  39 import java.security.KeyManagementException;
 
  40 import java.security.NoSuchAlgorithmException;
 
  41 import java.security.cert.CertificateException;
 
  42 import java.security.cert.X509Certificate;
 
  44 import java.util.concurrent.ConcurrentHashMap;
 
  45 import java.util.concurrent.ExecutorService;
 
  46 import java.util.concurrent.Executors;
 
  47 import java.util.concurrent.LinkedBlockingQueue;
 
  49 import static org.slf4j.LoggerFactory.getLogger;
 
  52  * Representation of a plugin to subscribe for notification and then
 
  53  * to handle the received notifications.
 
  55 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
 
  57     private static final Logger log = getLogger(RestconfDiscoveryNode.class);
 
  59     private ExecutorService executor = Executors.newCachedThreadPool();
 
  60     private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
 
  61     private RestconfApiCallNode restconfApiCallNode;
 
  63     private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
 
  64     private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
 
  66     private static final String SUBSCRIBER_ID = "subscriberId";
 
  67     private static final String RESPONSE_CODE = "response-code";
 
  68     private static final String RESPONSE_PREFIX = "responsePrefix";
 
  69     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
 
  70             "ications:establish-subscription.output.identifier";
 
  71     private static final String RESPONSE_CODE_200 = "200";
 
  72     private static final String SSE_URL = "sseConnectURL";
 
  75      * Creates an instance of RestconfDiscoveryNode and starts processing of
 
  78      * @param r restconf api call node
 
  80     public RestconfDiscoveryNode(RestconfApiCallNode r) {
 
  81         this.restconfApiCallNode = r;
 
  82         ExecutorService e = Executors.newFixedThreadPool(20);
 
  83         EventProcessor p = new EventProcessor(this);
 
  84         for (int i = 0; i < 20; ++i) {
 
  90     public void establishSubscription(Map<String, String> paramMap,
 
  91                                       SvcLogicContext ctx) throws SvcLogicException {
 
  92         String subscriberId = paramMap.get(SUBSCRIBER_ID);
 
  93         if (subscriberId == null) {
 
  94             throw new SvcLogicException("Subscriber Id is null");
 
  97         restconfApiCallNode.sendRequest(paramMap, ctx);
 
  99         if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
 
 100             // TODO: save subscription id and subscriber in MYSQL
 
 102             establishPersistentConnection(paramMap, ctx, subscriberId);
 
 104             log.info("Failed to subscribe {}", subscriberId);
 
 105             throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
 
 110     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
 
 111         // TODO: to be implemented
 
 115     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
 
 116         String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
 
 118             PersistentConnection conn = runnableInfo.get(id);
 
 120             runnableInfo.remove(id);
 
 121             subscriptionInfoMap.remove(id);
 
 125     class PersistentConnection implements Runnable {
 
 127         private volatile boolean running = true;
 
 128         private Map<String, String> paramMap;
 
 130         PersistentConnection(String url, Map<String, String> paramMap) {
 
 132             this.paramMap = paramMap;
 
 135         private void terminate() {
 
 142             WebTarget target = null;
 
 144                 RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
 
 145                 p = RestapiCallNode.getParameters(paramMap, new Parameters());
 
 146                 Client client =  ignoreSslClient(p.disableHostVerification).register(SseFeature.class);
 
 147                 target = restapi.addAuthType(client, p).target(url);
 
 148             } catch (SvcLogicException e) {
 
 149                 log.error("Exception occured!", e);
 
 150                 Thread.currentThread().interrupt();
 
 153             target = addToken(target, paramMap.get("customHttpHeaders"));
 
 154             EventSource eventSource = EventSource.target(target).build();
 
 155             eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
 
 157             log.info("Connected to SSE source");
 
 160                     log.info("SSE state " + eventSource.isOpen());
 
 162                 } catch (InterruptedException e) {
 
 163                     log.error("Interrupted!", e);
 
 164                     Thread.currentThread().interrupt();
 
 168             log.info("Closed connection to SSE source");
 
 171         // Note: Sonar complains about host name verification being 
 
 172         // disabled here.  This is necessary to handle devices using self-signed
 
 173         // certificates (where CA would be unknown) - so we are leaving this code as is.
 
 174         private Client ignoreSslClient(boolean disableHostVerification) {
 
 175             SSLContext sslcontext = null;
 
 178                 sslcontext = SSLContext.getInstance("TLS");
 
 179                 sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
 
 181                     public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
 
 185                     public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
 
 189                     public X509Certificate[] getAcceptedIssuers() {
 
 190                         return new X509Certificate[0];
 
 192                 } }, new java.security.SecureRandom());
 
 193             } catch (NoSuchAlgorithmException | KeyManagementException e) {
 
 194                 throw new IllegalStateException(e);
 
 197             return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier(new AcceptIpAddressHostNameVerifier(disableHostVerification)).build();
 
 201     protected String getTokenId(String customHttpHeaders) {
 
 202         if (customHttpHeaders.contains("=")) {
 
 203             String[] s = customHttpHeaders.split("=");
 
 206         return customHttpHeaders;
 
 209     protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
 
 210         if (customHttpHeaders == null) {
 
 214         return new AdditionalHeaderWebTarget(
 
 215                 target, getTokenId(customHttpHeaders));
 
 219      * Establishes a persistent between the client and server.
 
 221      * @param paramMap input paramter map
 
 222      * @param ctx service logic context
 
 223      * @param subscriberId subscriber identifier
 
 225     void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
 
 226                                               String subscriberId) {
 
 227         String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
 
 228         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
 
 230                                                              paramMap.get("version"),
 
 231                                                              paramMap.get("mode"));
 
 232         SubscriptionInfo info = new SubscriptionInfo();
 
 233         info.callBackDG(callbackDG);
 
 234         info.subscriptionId(id);
 
 235         info.subscriberId(subscriberId);
 
 236         subscriptionInfoMap.put(id, info);
 
 238         String url = paramMap.get(SSE_URL);
 
 239         PersistentConnection connection = new PersistentConnection(url, paramMap);
 
 240         runnableInfo.put(id, connection);
 
 241         executor.execute(connection);
 
 245      * Returns response code.
 
 247      * @param prefix prefix given in input parameter
 
 248      * @param ctx service logic context
 
 249      * @return response code
 
 251     String getResponseCode(String prefix, SvcLogicContext ctx) {
 
 252         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
 
 256      * Returns subscription id from event.
 
 258      * @param prefix prefix given in input parameter
 
 259      * @param ctx service logic context
 
 260      * @return subscription id from event
 
 262     String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
 
 263         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
 
 266     private String getPrefix(String prefix) {
 
 267         return prefix != null ? prefix + "." : "";
 
 270     private String getSubscriptionId(String subscriberId) {
 
 271         for (Map.Entry<String,SubscriptionInfo> entry
 
 272                 : subscriptionInfoMap.entrySet()) {
 
 273             if (entry.getValue().subscriberId()
 
 274                     .equals(subscriberId)) {
 
 275                 return entry.getKey();
 
 282      * Returns restconfApiCallNode.
 
 284      * @return restconfApiCallNode
 
 286     protected RestconfApiCallNode restconfapiCallNode() {
 
 287         return restconfApiCallNode;
 
 291      * Sets restconfApiCallNode.
 
 293      * @param node restconfApiCallNode
 
 295     void restconfapiCallNode(RestconfApiCallNode node) {
 
 296         restconfApiCallNode = node;
 
 299     Map<String, SubscriptionInfo> subscriptionInfoMap() {
 
 300         return subscriptionInfoMap;
 
 303     void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
 
 304         this.subscriptionInfoMap = subscriptionInfoMap;
 
 307     LinkedBlockingQueue<String> eventQueue() {
 
 311     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
 
 312         this.eventQueue = eventQueue;