Add authorization header in SSE request
[ccsdk/sli/plugins.git] / restconf-client / provider / src / main / java / org / onap / ccsdk / sli / plugins / restconfdiscovery / RestconfDiscoveryNode.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CCSDK
4  * ================================================================================
5  * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
22
23 import org.glassfish.jersey.media.sse.EventSource;
24 import org.glassfish.jersey.media.sse.SseFeature;
25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
27 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
28 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
29 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
30 import org.slf4j.Logger;
31
32 import javax.net.ssl.SSLContext;
33 import javax.net.ssl.TrustManager;
34 import javax.net.ssl.X509TrustManager;
35 import javax.ws.rs.client.Client;
36 import javax.ws.rs.client.ClientBuilder;
37 import javax.ws.rs.client.WebTarget;
38 import java.security.KeyManagementException;
39 import java.security.NoSuchAlgorithmException;
40 import java.security.cert.CertificateException;
41 import java.security.cert.X509Certificate;
42 import java.util.Map;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.LinkedBlockingQueue;
47
48 import static org.slf4j.LoggerFactory.getLogger;
49
50 /**
51  * Representation of a plugin to subscribe for notification and then
52  * to handle the received notifications.
53  */
54 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
55
56     private static final Logger log = getLogger(RestconfDiscoveryNode.class);
57
58     private ExecutorService executor = Executors.newCachedThreadPool();
59     private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
60     private RestconfApiCallNode restconfApiCallNode;
61
62     private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
63     private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
64
65     private static final String SUBSCRIBER_ID = "subscriberId";
66     private static final String RESPONSE_CODE = "response-code";
67     private static final String RESPONSE_PREFIX = "responsePrefix";
68     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
69             "ications:establish-subscription.output.identifier";
70     private static final String RESPONSE_CODE_200 = "200";
71     private static final String SSE_URL = "sseConnectURL";
72
73     /**
74      * Creates an instance of RestconfDiscoveryNode and starts processing of
75      * event.
76      *
77      * @param r restconf api call node
78      */
79     public RestconfDiscoveryNode(RestconfApiCallNode r) {
80         this.restconfApiCallNode = r;
81         ExecutorService e = Executors.newFixedThreadPool(20);
82         EventProcessor p = new EventProcessor(this);
83         for (int i = 0; i < 20; ++i) {
84             e.execute(p);
85         }
86     }
87
88     @Override
89     public void establishSubscription(Map<String, String> paramMap,
90                                       SvcLogicContext ctx) throws SvcLogicException {
91         String subscriberId = paramMap.get(SUBSCRIBER_ID);
92         if (subscriberId == null) {
93             throw new SvcLogicException("Subscriber Id is null");
94         }
95
96         restconfApiCallNode.sendRequest(paramMap, ctx);
97
98         if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
99             // TODO: save subscription id and subscriber in MYSQL
100
101             establishPersistentConnection(paramMap, ctx, subscriberId);
102         } else {
103             log.info("Failed to subscribe {}", subscriberId);
104             throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
105         }
106     }
107
108     @Override
109     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
110         // TODO: to be implemented
111     }
112
113     @Override
114     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
115         String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
116         if (id != null) {
117             PersistentConnection conn = runnableInfo.get(id);
118             conn.terminate();
119             runnableInfo.remove(id);
120             subscriptionInfoMap.remove(id);
121         }
122     }
123
124     class PersistentConnection implements Runnable {
125         private String url;
126         private volatile boolean running = true;
127         private Map<String, String> paramMap;
128
129         PersistentConnection(String url, Map<String, String> paramMap) {
130             this.url = url;
131             this.paramMap = paramMap;
132         }
133
134         private void terminate() {
135             running = false;
136         }
137
138         @Override
139         public void run() {
140             Parameters p;
141             WebTarget target = null;
142             try {
143                 RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
144                 p = restapi.getParameters(paramMap, new Parameters());
145                 Client client =  ignoreSslClient().register(SseFeature.class);
146                 target = restapi.addAuthType(client, p).target(url);
147             } catch (SvcLogicException e) {
148                 log.error("Exception occured!", e);
149                 Thread.currentThread().interrupt();
150             }
151
152             target = addToken(target, paramMap.get("customHttpHeaders"));
153             EventSource eventSource = EventSource.target(target).build();
154             eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
155             eventSource.open();
156             log.info("Connected to SSE source");
157             while (running) {
158                 try {
159                     log.info("SSE state " + eventSource.isOpen());
160                     Thread.sleep(5000);
161                 } catch (InterruptedException e) {
162                     log.error("Interrupted!", e);
163                     Thread.currentThread().interrupt();
164                 }
165             }
166             eventSource.close();
167             log.info("Closed connection to SSE source");
168         }
169     }
170
171     private Client ignoreSslClient() {
172         SSLContext sslcontext = null;
173
174         try {
175             sslcontext = SSLContext.getInstance("TLS");
176             sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
177                 @Override
178                 public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
179                 }
180
181                 @Override
182                 public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
183                 }
184
185                 @Override
186                 public X509Certificate[] getAcceptedIssuers() {
187                     return new X509Certificate[0];
188                 }
189             } }, new java.security.SecureRandom());
190         } catch (NoSuchAlgorithmException | KeyManagementException e) {
191             throw new IllegalStateException(e);
192         }
193
194         return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier((s1, s2) -> true).build();
195     }
196
197     protected String getTokenId(String customHttpHeaders) {
198         if (customHttpHeaders.contains("=")) {
199             String s[] = customHttpHeaders.split("=");
200             return s[1];
201         }
202         return customHttpHeaders;
203     }
204
205     protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
206         if (customHttpHeaders == null) {
207             return target;
208         }
209
210         return new AdditionalHeaderWebTarget(
211                 target, getTokenId(customHttpHeaders));
212     }
213
214     /**
215      * Establishes a persistent between the client and server.
216      *
217      * @param paramMap input paramter map
218      * @param ctx service logic context
219      * @param subscriberId subscriber identifier
220      */
221     void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
222                                               String subscriberId) {
223         String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
224         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
225                                                              paramMap.get("rpc"),
226                                                              paramMap.get("version"),
227                                                              paramMap.get("mode"));
228         SubscriptionInfo info = new SubscriptionInfo();
229         info.callBackDG(callbackDG);
230         info.subscriptionId(id);
231         info.subscriberId(subscriberId);
232         subscriptionInfoMap.put(id, info);
233
234         String url = paramMap.get(SSE_URL);
235         PersistentConnection connection = new PersistentConnection(url, paramMap);
236         runnableInfo.put(id, connection);
237         executor.execute(connection);
238     }
239
240     /**
241      * Returns response code.
242      *
243      * @param prefix prefix given in input parameter
244      * @param ctx service logic context
245      * @return response code
246      */
247     String getResponseCode(String prefix, SvcLogicContext ctx) {
248         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
249     }
250
251     /**
252      * Returns subscription id from event.
253      *
254      * @param prefix prefix given in input parameter
255      * @param ctx service logic context
256      * @return subscription id from event
257      */
258     String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
259         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
260     }
261
262     private String getPrefix(String prefix) {
263         return prefix != null ? prefix + "." : "";
264     }
265
266     private String getSubscriptionId(String subscriberId) {
267         for (Map.Entry<String,SubscriptionInfo> entry
268                 : subscriptionInfoMap.entrySet()) {
269             if (entry.getValue().subscriberId()
270                     .equals(subscriberId)) {
271                 return entry.getKey();
272             }
273         }
274         return null;
275     }
276
277     /**
278      * Returns restconfApiCallNode.
279      *
280      * @return restconfApiCallNode
281      */
282     protected RestconfApiCallNode restconfapiCallNode() {
283         return restconfApiCallNode;
284     }
285
286     /**
287      * Sets restconfApiCallNode.
288      *
289      * @param node restconfApiCallNode
290      */
291     void restconfapiCallNode(RestconfApiCallNode node) {
292         restconfApiCallNode = node;
293     }
294
295     Map<String, SubscriptionInfo> subscriptionInfoMap() {
296         return subscriptionInfoMap;
297     }
298
299     void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
300         this.subscriptionInfoMap = subscriptionInfoMap;
301     }
302
303     LinkedBlockingQueue<String> eventQueue() {
304         return eventQueue;
305     }
306
307     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
308         this.eventQueue = eventQueue;
309     }
310 }