2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
23 import org.glassfish.jersey.media.sse.EventSource;
24 import org.glassfish.jersey.media.sse.SseFeature;
25 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
26 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
27 import org.onap.ccsdk.sli.core.utils.common.AcceptIpAddressHostNameVerifier;
28 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
29 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
30 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
31 import org.slf4j.Logger;
33 import javax.net.ssl.SSLContext;
34 import javax.net.ssl.TrustManager;
35 import javax.net.ssl.X509TrustManager;
36 import javax.ws.rs.client.Client;
37 import javax.ws.rs.client.ClientBuilder;
38 import javax.ws.rs.client.WebTarget;
39 import java.security.KeyManagementException;
40 import java.security.NoSuchAlgorithmException;
41 import java.security.cert.CertificateException;
42 import java.security.cert.X509Certificate;
44 import java.util.concurrent.ConcurrentHashMap;
45 import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.Executors;
47 import java.util.concurrent.LinkedBlockingQueue;
49 import static org.slf4j.LoggerFactory.getLogger;
52 * Representation of a plugin to subscribe for notification and then
53 * to handle the received notifications.
55 public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
57 private static final Logger log = getLogger(RestconfDiscoveryNode.class);
59 private ExecutorService executor = Executors.newCachedThreadPool();
60 private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
61 private RestconfApiCallNode restconfApiCallNode;
63 private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
64 private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
66 private static final String SUBSCRIBER_ID = "subscriberId";
67 private static final String RESPONSE_CODE = "response-code";
68 private static final String RESPONSE_PREFIX = "responsePrefix";
69 private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
70 "ications:establish-subscription.output.identifier";
71 private static final String RESPONSE_CODE_200 = "200";
72 private static final String SSE_URL = "sseConnectURL";
75 * Creates an instance of RestconfDiscoveryNode and starts processing of
78 * @param r restconf api call node
80 public RestconfDiscoveryNode(RestconfApiCallNode r) {
81 this.restconfApiCallNode = r;
82 ExecutorService e = Executors.newFixedThreadPool(20);
83 EventProcessor p = new EventProcessor(this);
84 for (int i = 0; i < 20; ++i) {
90 public void establishSubscription(Map<String, String> paramMap,
91 SvcLogicContext ctx) throws SvcLogicException {
92 String subscriberId = paramMap.get(SUBSCRIBER_ID);
93 if (subscriberId == null) {
94 throw new SvcLogicException("Subscriber Id is null");
97 restconfApiCallNode.sendRequest(paramMap, ctx);
99 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
100 // TODO: save subscription id and subscriber in MYSQL
102 establishPersistentConnection(paramMap, ctx, subscriberId);
104 log.info("Failed to subscribe {}", subscriberId);
105 throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
110 public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
111 // TODO: to be implemented
115 public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
116 String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
118 PersistentConnection conn = runnableInfo.get(id);
120 runnableInfo.remove(id);
121 subscriptionInfoMap.remove(id);
125 class PersistentConnection implements Runnable {
127 private volatile boolean running = true;
128 private Map<String, String> paramMap;
130 PersistentConnection(String url, Map<String, String> paramMap) {
132 this.paramMap = paramMap;
135 private void terminate() {
142 WebTarget target = null;
144 RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
145 p = RestapiCallNode.getParameters(paramMap, new Parameters());
146 Client client = ignoreSslClient(p.disableHostVerification).register(SseFeature.class);
147 target = restapi.addAuthType(client, p).target(url);
148 } catch (SvcLogicException e) {
149 log.error("Exception occured!", e);
150 Thread.currentThread().interrupt();
153 target = addToken(target, paramMap.get("customHttpHeaders"));
154 EventSource eventSource = EventSource.target(target).build();
155 eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
157 log.info("Connected to SSE source");
160 log.info("SSE state " + eventSource.isOpen());
162 } catch (InterruptedException e) {
163 log.error("Interrupted!", e);
164 Thread.currentThread().interrupt();
168 log.info("Closed connection to SSE source");
171 // Note: Sonar complains about host name verification being
172 // disabled here. This is necessary to handle devices using self-signed
173 // certificates (where CA would be unknown) - so we are leaving this code as is.
174 private Client ignoreSslClient(boolean disableHostVerification) {
175 SSLContext sslcontext = null;
178 sslcontext = SSLContext.getInstance("TLS");
179 sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
181 public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
185 public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
189 public X509Certificate[] getAcceptedIssuers() {
190 return new X509Certificate[0];
192 } }, new java.security.SecureRandom());
193 } catch (NoSuchAlgorithmException | KeyManagementException e) {
194 throw new IllegalStateException(e);
197 return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier(new AcceptIpAddressHostNameVerifier(disableHostVerification)).build();
201 protected String getTokenId(String customHttpHeaders) {
202 if (customHttpHeaders.contains("=")) {
203 String[] s = customHttpHeaders.split("=");
206 return customHttpHeaders;
209 protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
210 if (customHttpHeaders == null) {
214 return new AdditionalHeaderWebTarget(
215 target, getTokenId(customHttpHeaders));
219 * Establishes a persistent between the client and server.
221 * @param paramMap input paramter map
222 * @param ctx service logic context
223 * @param subscriberId subscriber identifier
225 void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
226 String subscriberId) {
227 String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
228 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
230 paramMap.get("version"),
231 paramMap.get("mode"));
232 SubscriptionInfo info = new SubscriptionInfo();
233 info.callBackDG(callbackDG);
234 info.subscriptionId(id);
235 info.subscriberId(subscriberId);
236 subscriptionInfoMap.put(id, info);
238 String url = paramMap.get(SSE_URL);
239 PersistentConnection connection = new PersistentConnection(url, paramMap);
240 runnableInfo.put(id, connection);
241 executor.execute(connection);
245 * Returns response code.
247 * @param prefix prefix given in input parameter
248 * @param ctx service logic context
249 * @return response code
251 String getResponseCode(String prefix, SvcLogicContext ctx) {
252 return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
256 * Returns subscription id from event.
258 * @param prefix prefix given in input parameter
259 * @param ctx service logic context
260 * @return subscription id from event
262 String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
263 return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
266 private String getPrefix(String prefix) {
267 return prefix != null ? prefix + "." : "";
270 private String getSubscriptionId(String subscriberId) {
271 for (Map.Entry<String,SubscriptionInfo> entry
272 : subscriptionInfoMap.entrySet()) {
273 if (entry.getValue().subscriberId()
274 .equals(subscriberId)) {
275 return entry.getKey();
282 * Returns restconfApiCallNode.
284 * @return restconfApiCallNode
286 protected RestconfApiCallNode restconfapiCallNode() {
287 return restconfApiCallNode;
291 * Sets restconfApiCallNode.
293 * @param node restconfApiCallNode
295 void restconfapiCallNode(RestconfApiCallNode node) {
296 restconfApiCallNode = node;
299 Map<String, SubscriptionInfo> subscriptionInfoMap() {
300 return subscriptionInfoMap;
303 void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
304 this.subscriptionInfoMap = subscriptionInfoMap;
307 LinkedBlockingQueue<String> eventQueue() {
311 void eventQueue(LinkedBlockingQueue<String> eventQueue) {
312 this.eventQueue = eventQueue;