d6b93f7440c846ae75b71b3dec7bfdafc749a1e6
[ccsdk/sli.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CCSDK
4  * ================================================================================
5  * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
22
23 import org.glassfish.jersey.media.sse.EventSource;
24 import org.glassfish.jersey.media.sse.SseFeature;
25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
27 import org.onap.ccsdk.sli.core.utils.common.AcceptIpAddressHostNameVerifier;
28 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
29 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
30 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
31 import org.slf4j.Logger;
32
33 import javax.net.ssl.SSLContext;
34 import javax.net.ssl.TrustManager;
35 import javax.net.ssl.X509TrustManager;
36 import javax.ws.rs.client.Client;
37 import javax.ws.rs.client.ClientBuilder;
38 import javax.ws.rs.client.WebTarget;
39 import java.security.KeyManagementException;
40 import java.security.NoSuchAlgorithmException;
41 import java.security.cert.CertificateException;
42 import java.security.cert.X509Certificate;
43 import java.util.Map;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.Executors;
47 import java.util.concurrent.LinkedBlockingQueue;
48
49 import static org.slf4j.LoggerFactory.getLogger;
50
51 /**
52  * Representation of a plugin to subscribe for notification and then
53  * to handle the received notifications.
54  */
55 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
56
57     private static final Logger log = getLogger(RestconfDiscoveryNode.class);
58
59     private ExecutorService executor = Executors.newCachedThreadPool();
60     private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
61     private RestconfApiCallNode restconfApiCallNode;
62
63     private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
64     private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
65
66     private static final String SUBSCRIBER_ID = "subscriberId";
67     private static final String RESPONSE_CODE = "response-code";
68     private static final String RESPONSE_PREFIX = "responsePrefix";
69     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
70             "ications:establish-subscription.output.identifier";
71     private static final String RESPONSE_CODE_200 = "200";
72     private static final String SSE_URL = "sseConnectURL";
73
74     /**
75      * Creates an instance of RestconfDiscoveryNode and starts processing of
76      * event.
77      *
78      * @param r restconf api call node
79      */
80     public RestconfDiscoveryNode(RestconfApiCallNode r) {
81         this.restconfApiCallNode = r;
82         ExecutorService e = Executors.newFixedThreadPool(20);
83         EventProcessor p = new EventProcessor(this);
84         for (int i = 0; i < 20; ++i) {
85             e.execute(p);
86         }
87     }
88
89     @Override
90     public void establishSubscription(Map<String, String> paramMap,
91                                       SvcLogicContext ctx) throws SvcLogicException {
92         String subscriberId = paramMap.get(SUBSCRIBER_ID);
93         if (subscriberId == null) {
94             throw new SvcLogicException("Subscriber Id is null");
95         }
96
97         restconfApiCallNode.sendRequest(paramMap, ctx);
98
99         if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
100             // TODO: save subscription id and subscriber in MYSQL
101
102             establishPersistentConnection(paramMap, ctx, subscriberId);
103         } else {
104             log.info("Failed to subscribe {}", subscriberId);
105             throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
106         }
107     }
108
109     @Override
110     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
111         // TODO: to be implemented
112     }
113
114     @Override
115     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
116         String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
117         if (id != null) {
118             PersistentConnection conn = runnableInfo.get(id);
119             conn.terminate();
120             runnableInfo.remove(id);
121             subscriptionInfoMap.remove(id);
122         }
123     }
124
125     class PersistentConnection implements Runnable {
126         private String url;
127         private volatile boolean running = true;
128         private Map<String, String> paramMap;
129
130         PersistentConnection(String url, Map<String, String> paramMap) {
131             this.url = url;
132             this.paramMap = paramMap;
133         }
134
135         private void terminate() {
136             running = false;
137         }
138
139         @Override
140         public void run() {
141             Parameters p;
142             WebTarget target = null;
143             try {
144                 RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
145                 p = RestapiCallNode.getParameters(paramMap, new Parameters());
146                 Client client =  ignoreSslClient(p.disableHostVerification).register(SseFeature.class);
147                 target = restapi.addAuthType(client, p).target(url);
148             } catch (SvcLogicException e) {
149                 log.error("Exception occured!", e);
150                 Thread.currentThread().interrupt();
151             }
152
153             target = addToken(target, paramMap.get("customHttpHeaders"));
154             EventSource eventSource = EventSource.target(target).build();
155             eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
156             eventSource.open();
157             log.info("Connected to SSE source");
158             while (running) {
159                 try {
160                     log.info("SSE state " + eventSource.isOpen());
161                     Thread.sleep(5000);
162                 } catch (InterruptedException e) {
163                     log.error("Interrupted!", e);
164                     Thread.currentThread().interrupt();
165                 }
166             }
167             eventSource.close();
168             log.info("Closed connection to SSE source");
169         }
170
171         // Note: Sonar complains about host name verification being 
172         // disabled here.  This is necessary to handle devices using self-signed
173         // certificates (where CA would be unknown) - so we are leaving this code as is.
174         private Client ignoreSslClient(boolean disableHostVerification) {
175             SSLContext sslcontext = null;
176
177             try {
178                 sslcontext = SSLContext.getInstance("TLS");
179                 sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
180                     @Override
181                     public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
182                     }
183
184                     @Override
185                     public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
186                     }
187
188                     @Override
189                     public X509Certificate[] getAcceptedIssuers() {
190                         return new X509Certificate[0];
191                     }
192                 } }, new java.security.SecureRandom());
193             } catch (NoSuchAlgorithmException | KeyManagementException e) {
194                 throw new IllegalStateException(e);
195             }
196
197             return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier(new AcceptIpAddressHostNameVerifier(disableHostVerification)).build();
198         }
199     }
200
201     protected String getTokenId(String customHttpHeaders) {
202         if (customHttpHeaders.contains("=")) {
203             String[] s = customHttpHeaders.split("=");
204             return s[1];
205         }
206         return customHttpHeaders;
207     }
208
209     protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
210         if (customHttpHeaders == null) {
211             return target;
212         }
213
214         return new AdditionalHeaderWebTarget(
215                 target, getTokenId(customHttpHeaders));
216     }
217
218     /**
219      * Establishes a persistent between the client and server.
220      *
221      * @param paramMap input paramter map
222      * @param ctx service logic context
223      * @param subscriberId subscriber identifier
224      */
225     void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
226                                               String subscriberId) {
227         String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
228         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
229                                                              paramMap.get("rpc"),
230                                                              paramMap.get("version"),
231                                                              paramMap.get("mode"));
232         SubscriptionInfo info = new SubscriptionInfo();
233         info.callBackDG(callbackDG);
234         info.subscriptionId(id);
235         info.subscriberId(subscriberId);
236         subscriptionInfoMap.put(id, info);
237
238         String url = paramMap.get(SSE_URL);
239         PersistentConnection connection = new PersistentConnection(url, paramMap);
240         runnableInfo.put(id, connection);
241         executor.execute(connection);
242     }
243
244     /**
245      * Returns response code.
246      *
247      * @param prefix prefix given in input parameter
248      * @param ctx service logic context
249      * @return response code
250      */
251     String getResponseCode(String prefix, SvcLogicContext ctx) {
252         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
253     }
254
255     /**
256      * Returns subscription id from event.
257      *
258      * @param prefix prefix given in input parameter
259      * @param ctx service logic context
260      * @return subscription id from event
261      */
262     String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
263         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
264     }
265
266     private String getPrefix(String prefix) {
267         return prefix != null ? prefix + "." : "";
268     }
269
270     private String getSubscriptionId(String subscriberId) {
271         for (Map.Entry<String,SubscriptionInfo> entry
272                 : subscriptionInfoMap.entrySet()) {
273             if (entry.getValue().subscriberId()
274                     .equals(subscriberId)) {
275                 return entry.getKey();
276             }
277         }
278         return null;
279     }
280
281     /**
282      * Returns restconfApiCallNode.
283      *
284      * @return restconfApiCallNode
285      */
286     protected RestconfApiCallNode restconfapiCallNode() {
287         return restconfApiCallNode;
288     }
289
290     /**
291      * Sets restconfApiCallNode.
292      *
293      * @param node restconfApiCallNode
294      */
295     void restconfapiCallNode(RestconfApiCallNode node) {
296         restconfApiCallNode = node;
297     }
298
299     Map<String, SubscriptionInfo> subscriptionInfoMap() {
300         return subscriptionInfoMap;
301     }
302
303     void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
304         this.subscriptionInfoMap = subscriptionInfoMap;
305     }
306
307     LinkedBlockingQueue<String> eventQueue() {
308         return eventQueue;
309     }
310
311     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
312         this.eventQueue = eventQueue;
313     }
314 }