5b47cf5b6c3019cbdc83525088b95a8010e9bef8
[ccsdk/sli.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CCSDK
4  * ================================================================================
5  * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
22
23 import org.glassfish.jersey.media.sse.EventSource;
24 import org.glassfish.jersey.media.sse.SseFeature;
25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
27 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
28 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
29 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
30 import org.slf4j.Logger;
31
32 import javax.net.ssl.SSLContext;
33 import javax.net.ssl.TrustManager;
34 import javax.net.ssl.X509TrustManager;
35 import javax.ws.rs.client.Client;
36 import javax.ws.rs.client.ClientBuilder;
37 import javax.ws.rs.client.WebTarget;
38 import java.security.KeyManagementException;
39 import java.security.NoSuchAlgorithmException;
40 import java.security.cert.CertificateException;
41 import java.security.cert.X509Certificate;
42 import java.util.Map;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.LinkedBlockingQueue;
47
48 import static org.slf4j.LoggerFactory.getLogger;
49
50 /**
51  * Representation of a plugin to subscribe for notification and then
52  * to handle the received notifications.
53  */
54 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
55
56     private static final Logger log = getLogger(RestconfDiscoveryNode.class);
57
58     private ExecutorService executor = Executors.newCachedThreadPool();
59     private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
60     private RestconfApiCallNode restconfApiCallNode;
61
62     private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
63     private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
64
65     private static final String SUBSCRIBER_ID = "subscriberId";
66     private static final String RESPONSE_CODE = "response-code";
67     private static final String RESPONSE_PREFIX = "responsePrefix";
68     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
69             "ications:establish-subscription.output.identifier";
70     private static final String RESPONSE_CODE_200 = "200";
71     private static final String SSE_URL = "sseConnectURL";
72
73     /**
74      * Creates an instance of RestconfDiscoveryNode and starts processing of
75      * event.
76      *
77      * @param r restconf api call node
78      */
79     public RestconfDiscoveryNode(RestconfApiCallNode r) {
80         this.restconfApiCallNode = r;
81         ExecutorService e = Executors.newFixedThreadPool(20);
82         EventProcessor p = new EventProcessor(this);
83         for (int i = 0; i < 20; ++i) {
84             e.execute(p);
85         }
86     }
87
88     @Override
89     public void establishSubscription(Map<String, String> paramMap,
90                                       SvcLogicContext ctx) throws SvcLogicException {
91         String subscriberId = paramMap.get(SUBSCRIBER_ID);
92         if (subscriberId == null) {
93             throw new SvcLogicException("Subscriber Id is null");
94         }
95
96         restconfApiCallNode.sendRequest(paramMap, ctx);
97
98         if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
99             // TODO: save subscription id and subscriber in MYSQL
100
101             establishPersistentConnection(paramMap, ctx, subscriberId);
102         } else {
103             log.info("Failed to subscribe {}", subscriberId);
104             throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
105         }
106     }
107
108     @Override
109     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
110         // TODO: to be implemented
111     }
112
113     @Override
114     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
115         String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
116         if (id != null) {
117             PersistentConnection conn = runnableInfo.get(id);
118             conn.terminate();
119             runnableInfo.remove(id);
120             subscriptionInfoMap.remove(id);
121         }
122     }
123
124     class PersistentConnection implements Runnable {
125         private String url;
126         private volatile boolean running = true;
127         private Map<String, String> paramMap;
128
129         PersistentConnection(String url, Map<String, String> paramMap) {
130             this.url = url;
131             this.paramMap = paramMap;
132         }
133
134         private void terminate() {
135             running = false;
136         }
137
138         @Override
139         public void run() {
140             Parameters p;
141             WebTarget target = null;
142             try {
143                 RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
144                 p = RestapiCallNode.getParameters(paramMap, new Parameters());
145                 Client client =  ignoreSslClient().register(SseFeature.class);
146                 target = restapi.addAuthType(client, p).target(url);
147             } catch (SvcLogicException e) {
148                 log.error("Exception occured!", e);
149                 Thread.currentThread().interrupt();
150             }
151
152             target = addToken(target, paramMap.get("customHttpHeaders"));
153             EventSource eventSource = EventSource.target(target).build();
154             eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
155             eventSource.open();
156             log.info("Connected to SSE source");
157             while (running) {
158                 try {
159                     log.info("SSE state " + eventSource.isOpen());
160                     Thread.sleep(5000);
161                 } catch (InterruptedException e) {
162                     log.error("Interrupted!", e);
163                     Thread.currentThread().interrupt();
164                 }
165             }
166             eventSource.close();
167             log.info("Closed connection to SSE source");
168         }
169
170         // Note: Sonar complains about host name verification being 
171         // disabled here.  This is necessary to handle devices using self-signed
172         // certificates (where CA would be unknown) - so we are leaving this code as is.
173         private Client ignoreSslClient() {
174             SSLContext sslcontext = null;
175
176             try {
177                 sslcontext = SSLContext.getInstance("TLS");
178                 sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
179                     @Override
180                     public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
181                     }
182
183                     @Override
184                     public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
185                     }
186
187                     @Override
188                     public X509Certificate[] getAcceptedIssuers() {
189                         return new X509Certificate[0];
190                     }
191                 } }, new java.security.SecureRandom());
192             } catch (NoSuchAlgorithmException | KeyManagementException e) {
193                 throw new IllegalStateException(e);
194             }
195
196             return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier((s1, s2) -> true).build();
197         }
198     }
199
200     protected String getTokenId(String customHttpHeaders) {
201         if (customHttpHeaders.contains("=")) {
202             String[] s = customHttpHeaders.split("=");
203             return s[1];
204         }
205         return customHttpHeaders;
206     }
207
208     protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
209         if (customHttpHeaders == null) {
210             return target;
211         }
212
213         return new AdditionalHeaderWebTarget(
214                 target, getTokenId(customHttpHeaders));
215     }
216
217     /**
218      * Establishes a persistent between the client and server.
219      *
220      * @param paramMap input paramter map
221      * @param ctx service logic context
222      * @param subscriberId subscriber identifier
223      */
224     void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
225                                               String subscriberId) {
226         String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
227         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
228                                                              paramMap.get("rpc"),
229                                                              paramMap.get("version"),
230                                                              paramMap.get("mode"));
231         SubscriptionInfo info = new SubscriptionInfo();
232         info.callBackDG(callbackDG);
233         info.subscriptionId(id);
234         info.subscriberId(subscriberId);
235         subscriptionInfoMap.put(id, info);
236
237         String url = paramMap.get(SSE_URL);
238         PersistentConnection connection = new PersistentConnection(url, paramMap);
239         runnableInfo.put(id, connection);
240         executor.execute(connection);
241     }
242
243     /**
244      * Returns response code.
245      *
246      * @param prefix prefix given in input parameter
247      * @param ctx service logic context
248      * @return response code
249      */
250     String getResponseCode(String prefix, SvcLogicContext ctx) {
251         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
252     }
253
254     /**
255      * Returns subscription id from event.
256      *
257      * @param prefix prefix given in input parameter
258      * @param ctx service logic context
259      * @return subscription id from event
260      */
261     String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
262         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
263     }
264
265     private String getPrefix(String prefix) {
266         return prefix != null ? prefix + "." : "";
267     }
268
269     private String getSubscriptionId(String subscriberId) {
270         for (Map.Entry<String,SubscriptionInfo> entry
271                 : subscriptionInfoMap.entrySet()) {
272             if (entry.getValue().subscriberId()
273                     .equals(subscriberId)) {
274                 return entry.getKey();
275             }
276         }
277         return null;
278     }
279
280     /**
281      * Returns restconfApiCallNode.
282      *
283      * @return restconfApiCallNode
284      */
285     protected RestconfApiCallNode restconfapiCallNode() {
286         return restconfApiCallNode;
287     }
288
289     /**
290      * Sets restconfApiCallNode.
291      *
292      * @param node restconfApiCallNode
293      */
294     void restconfapiCallNode(RestconfApiCallNode node) {
295         restconfApiCallNode = node;
296     }
297
298     Map<String, SubscriptionInfo> subscriptionInfoMap() {
299         return subscriptionInfoMap;
300     }
301
302     void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
303         this.subscriptionInfoMap = subscriptionInfoMap;
304     }
305
306     LinkedBlockingQueue<String> eventQueue() {
307         return eventQueue;
308     }
309
310     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
311         this.eventQueue = eventQueue;
312     }
313 }