e3b0512651eacdbd09754f52216326e2e980c7ea
[ccsdk/sli.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CCSDK
4  * ================================================================================
5  * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
22
23 import com.google.common.collect.ImmutableMap;
24 import com.google.gson.JsonObject;
25 import com.google.gson.JsonParser;
26 import org.glassfish.jersey.media.sse.EventInput;
27 import org.glassfish.jersey.media.sse.EventSource;
28 import org.glassfish.jersey.media.sse.InboundEvent;
29 import org.glassfish.jersey.media.sse.SseFeature;
30 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
31 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
32 import org.onap.ccsdk.sli.core.utils.common.AcceptIpAddressHostNameVerifier;
33 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
34 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
35 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import javax.net.ssl.SSLContext;
40 import javax.net.ssl.TrustManager;
41 import javax.net.ssl.X509TrustManager;
42 import javax.ws.rs.client.Client;
43 import javax.ws.rs.client.ClientBuilder;
44 import javax.ws.rs.client.WebTarget;
45 import javax.ws.rs.core.Response;
46 import java.net.MalformedURLException;
47 import java.net.URL;
48 import java.security.KeyManagementException;
49 import java.security.NoSuchAlgorithmException;
50 import java.security.cert.CertificateException;
51 import java.security.cert.X509Certificate;
52 import java.util.HashSet;
53 import java.util.Map;
54 import java.util.NoSuchElementException;
55 import java.util.Set;
56 import java.util.concurrent.*;
57
58 import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties;
59 import static org.slf4j.LoggerFactory.getLogger;
60
61 /**
62  * Representation of a plugin to subscribe for notification and then
63  * to handle the received notifications.
64  */
65 public class RestconfDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin {
66
67     private static final Logger log = getLogger(RestconfDiscoveryNode.class);
68
69     private static final String ROOT_RESOURCE = "/restconf";
70     private static final String SUBSCRIBER_ID = "subscriberId";
71     private static final String RESPONSE_CODE = "response-code";
72     private static final String RESPONSE_PREFIX = "responsePrefix";
73     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
74             "ications:establish-subscription.output.identifier";
75     private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier";
76     private static final String RESPONSE_CODE_200 = "200";
77     private static final String SSE_URL = "sseConnectURL";
78     private static final String REST_API_URL = "restapiUrl";
79     private static final String RESOURCE_PATH_PREFIX = "/data/";
80     private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
81     private static final String DEVICE_IP = "deviceIp";
82     private static final String DEVICE_PORT = "devicePort";
83     private static final String DOUBLESLASH = "//";
84     private static final String COLON = ":";
85
86     private RestconfApiCallNode restconfApiCallNode;
87     private RestapiCallNode restapiCallNode = new RestapiCallNode();
88     private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
89     private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
90     private Map<DeviceId, Set<RestconfNotificationEventListener>>
91             restconfNotificationListenerMap = new ConcurrentHashMap<>();
92     private Map<DeviceId, GetChunksRunnable>
93             runnableTable = new ConcurrentHashMap<>();
94     private Map<DeviceId, String> subscribedDevicesTable = new ConcurrentHashMap<>();
95     private Map<DeviceId, BlockingQueue<String>> eventQMap = new ConcurrentHashMap<>();
96     private Map<DeviceId, InternalRestconfEventProcessorRunnable>
97             processorRunnableTable = new ConcurrentHashMap<>();
98     private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
99     private final Map<DeviceId, RestSBDevice> deviceMap = new ConcurrentHashMap<>();
100     private final Map<DeviceId, Client> clientMap = new ConcurrentHashMap<>();
101     private ExecutorService executor = Executors.newCachedThreadPool();
102
103     /**
104      * Creates an instance of RestconfDiscoveryNode and starts processing of
105      * event.
106      *
107      * @param r restconf api call node
108      */
109     public RestconfDiscoveryNode(RestconfApiCallNode r) {
110         log.info("inside RestconfDiscoveryNode Constructor");
111         this.restconfApiCallNode = r;
112         this.activate();
113 //        ExecutorService e = Executors.newFixedThreadPool(20);
114 //        EventProcessor p = new EventProcessor(this);
115 //        for (int i = 0; i < 20; ++i) {
116 //            e.execute(p);
117 //        }
118     }
119
120     public void activate() {
121         log.info("RESTCONF SBI Started");
122     }
123
124     public void deactivate() {
125         log.info("RESTCONF SBI Stopped");
126         executor.shutdown();
127         this.getClientMap().clear();
128         this.getDeviceMap().clear();
129     }
130
131     public Map<DeviceId, RestSBDevice> getDeviceMap() {
132         return deviceMap;
133     }
134
135     public Map<DeviceId, Client> getClientMap() {
136         return clientMap;
137     }
138
139     @Override
140     public Map<DeviceId, RestSBDevice> getDevices() {
141         log.trace("RESTCONF SBI::getDevices");
142         return ImmutableMap.copyOf(deviceMap);
143     }
144
145     @Override
146     public RestSBDevice getDevice(DeviceId deviceInfo) {
147         log.trace("RESTCONF SBI::getDevice with deviceId");
148         return deviceMap.get(deviceInfo);
149     }
150
151     @Override
152     public RestSBDevice getDevice(String ip, int port) {
153         log.trace("RESTCONF SBI::getDevice with ip and port");
154         try {
155             if (!deviceMap.isEmpty()) {
156                 return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get();
157             }
158         } catch (NoSuchElementException noSuchElementException) {
159             log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port);
160         }
161         return null;
162     }
163
164     @Override
165     public void addDevice(RestSBDevice device) {
166         log.trace("RESTCONF SBI::addDevice");
167         if (!deviceMap.containsKey(device.deviceId())) {
168             if (device.username() != null) {
169                 String username = device.username();
170                 String password = device.password() == null ? "" : device.password();
171     //                authenticate(client, username, password);
172             }
173             BlockingQueue<String> newBlockingQueue = new LinkedBlockingQueue<>();
174             eventQMap.put(device.deviceId(), newBlockingQueue);
175             InternalRestconfEventProcessorRunnable eventProcessorRunnable =
176                     new InternalRestconfEventProcessorRunnable(device.deviceId());
177             processorRunnableTable.put(device.deviceId(), eventProcessorRunnable);
178             log.trace("addDevice::restconf event processor runnable is created and is going for execute");
179             executor.execute(eventProcessorRunnable);
180             log.trace("addDevice::restconf event processor runnable was sent for execute");
181             deviceMap.put(device.deviceId(), device);
182         } else {
183             log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId());
184         }
185     }
186
187     @Override
188     public void removeDevice(DeviceId deviceId) {
189         log.trace("RESTCONF SBI::removeDevice");
190         eventQMap.remove(deviceId);
191         clientMap.remove(deviceId);
192         deviceMap.remove(deviceId);
193     }
194
195     @Override
196     public void establishSubscription(Map<String, String> paramMap,
197                                       SvcLogicContext ctx) throws SvcLogicException {
198         String subscriberId = paramMap.get(SUBSCRIBER_ID);
199         if (subscriberId == null) {
200             throw new SvcLogicException("Subscriber Id is null");
201         }
202
203         restconfApiCallNode.sendRequest(paramMap, ctx);
204
205         if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
206             // TODO: save subscription id and subscriber in MYSQL
207
208             establishPersistentConnection(paramMap, ctx, subscriberId);
209         } else {
210             log.info("Failed to subscribe {}", subscriberId);
211             throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
212         }
213     }
214
215     @Override
216     public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx)
217             throws SvcLogicException {
218         String subscriberId = paramMap.get(SUBSCRIBER_ID);
219         if (subscriberId == null) {
220             throw new SvcLogicException("Subscriber Id is null");
221         }
222
223         String subscribeUrlString = paramMap.get(REST_API_URL);
224         URL subscribeUrl = null;
225         RestSBDevice dev = null;
226         try {
227             subscribeUrl = new URL(subscribeUrlString);
228             dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort());
229         } catch (MalformedURLException e) {
230             log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
231             return;
232         }
233
234         if (dev == null) {
235             log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now.");
236             //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap
237             dev = new DefaultRestSBDevice(subscribeUrl.getHost(),
238                     subscribeUrl.getPort(), "onos", "rocks", "http",
239                     subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true);
240             this.addDevice(dev);
241         }
242
243         if (!subscribedDevicesTable.containsKey(dev.deviceId())) {
244             log.info("establishSubscriptionOnly::The device {} has not been subscribed yet. " +
245                     "Trying to subscribe it now...");
246             restapiCallNode.sendRequest(paramMap, ctx);
247             if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
248                 // TODO: save subscription id and subscriber in MYSQL
249                 String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
250                 log.info("establishSubscriptionOnly::Subscription is done successfully and " +
251                         "the output.identifier is: {}", id);
252                 log.info("establishSubscriptionOnly::The subscriptionID returned by the server " +
253                         "does not exist in the map. Adding it now...");
254                 subscribedDevicesTable.put(dev.deviceId(), id);
255
256                 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
257                         paramMap.get("rpc"),
258                         paramMap.get("version"),
259                         paramMap.get("mode"));
260                 SubscriptionInfo info = new SubscriptionInfo();
261                 info.callBackDG(callbackDG);
262                 info.subscriptionId(id);
263                 info.subscriberId(subscriberId);
264                 subscriptionInfoMap.put(id, info);
265
266             }
267         }
268
269     }
270
271     @Override
272     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
273         // TODO: to be implemented
274     }
275
276     @Override
277     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
278         String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
279         if (id != null) {
280             PersistentConnection conn = runnableInfo.get(id);
281             conn.terminate();
282             runnableInfo.remove(id);
283             subscriptionInfoMap.remove(id);
284         }
285     }
286
287     class PersistentConnection implements Runnable {
288         private String url;
289         private volatile boolean running = true;
290         private Map<String, String> paramMap;
291
292         PersistentConnection(String url, Map<String, String> paramMap) {
293             this.url = url;
294             this.paramMap = paramMap;
295         }
296
297         private void terminate() {
298             running = false;
299         }
300
301         @Override
302         public void run() {
303             Parameters p;
304             WebTarget target = null;
305             try {
306                 RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
307                 p = RestapiCallNode.getParameters(paramMap, new Parameters());
308                 Client client =  ignoreSslClient(p.disableHostVerification).register(SseFeature.class);
309                 target = restapi.addAuthType(client, p).target(url);
310             } catch (SvcLogicException e) {
311                 log.error("Exception occured!", e);
312                 Thread.currentThread().interrupt();
313             }
314
315             target = addToken(target, paramMap.get("customHttpHeaders"));
316             EventSource eventSource = EventSource.target(target).build();
317             eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
318             eventSource.open();
319             log.info("Connected to SSE source");
320             while (running) {
321                 try {
322                     log.info("SSE state " + eventSource.isOpen());
323                     Thread.sleep(5000);
324                 } catch (InterruptedException e) {
325                     log.error("Interrupted!", e);
326                     Thread.currentThread().interrupt();
327                 }
328             }
329             eventSource.close();
330             log.info("Closed connection to SSE source");
331         }
332
333         // Note: Sonar complains about host name verification being 
334         // disabled here.  This is necessary to handle devices using self-signed
335         // certificates (where CA would be unknown) - so we are leaving this code as is.
336         private Client ignoreSslClient(boolean disableHostVerification) {
337             SSLContext sslcontext = null;
338
339             try {
340                 sslcontext = SSLContext.getInstance("TLS");
341                 sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
342                     @Override
343                     public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
344                     }
345
346                     @Override
347                     public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
348                     }
349
350                     @Override
351                     public X509Certificate[] getAcceptedIssuers() {
352                         return new X509Certificate[0];
353                     }
354                 } }, new java.security.SecureRandom());
355             } catch (NoSuchAlgorithmException | KeyManagementException e) {
356                 throw new IllegalStateException(e);
357             }
358
359             return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier(new AcceptIpAddressHostNameVerifier(disableHostVerification)).build();
360         }
361     }
362
363     protected String getTokenId(String customHttpHeaders) {
364         if (customHttpHeaders.contains("=")) {
365             String[] s = customHttpHeaders.split("=");
366             return s[1];
367         }
368         return customHttpHeaders;
369     }
370
371     protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
372         if (customHttpHeaders == null) {
373             return target;
374         }
375
376         return new AdditionalHeaderWebTarget(
377                 target, getTokenId(customHttpHeaders));
378     }
379
380     /**
381      * Establishes a persistent between the client and server.
382      *
383      * @param paramMap input paramter map
384      * @param ctx service logic context
385      * @param subscriberId subscriber identifier
386      */
387     void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
388                                               String subscriberId) {
389         String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
390         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
391                                                              paramMap.get("rpc"),
392                                                              paramMap.get("version"),
393                                                              paramMap.get("mode"));
394         SubscriptionInfo info = new SubscriptionInfo();
395         info.callBackDG(callbackDG);
396         info.subscriptionId(id);
397         info.subscriberId(subscriberId);
398         subscriptionInfoMap.put(id, info);
399
400         String url = paramMap.get(SSE_URL);
401         PersistentConnection connection = new PersistentConnection(url, paramMap);
402         runnableInfo.put(id, connection);
403         executor.execute(connection);
404     }
405
406     /**
407      * Returns response code.
408      *
409      * @param prefix prefix given in input parameter
410      * @param ctx service logic context
411      * @return response code
412      */
413     String getResponseCode(String prefix, SvcLogicContext ctx) {
414         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
415     }
416
417     String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) {
418         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX);
419     }
420
421     /**
422      * Returns subscription id from event.
423      *
424      * @param prefix prefix given in input parameter
425      * @param ctx service logic context
426      * @return subscription id from event
427      */
428     String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
429         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
430     }
431
432     private String getPrefix(String prefix) {
433         return prefix != null ? prefix + "." : "";
434     }
435
436     private String getSubscriptionId(String subscriberId) {
437         for (Map.Entry<String,SubscriptionInfo> entry
438                 : subscriptionInfoMap.entrySet()) {
439             if (entry.getValue().subscriberId()
440                     .equals(subscriberId)) {
441                 return entry.getKey();
442             }
443         }
444         return null;
445     }
446
447     private String getUrlString(DeviceId deviceId, String request) {
448         RestSBDevice restSBDevice = deviceMap.get(deviceId);
449         if (restSBDevice == null) {
450             log.warn("getUrlString::restSbDevice cannot be NULL!");
451             return "";
452         }
453         if (restSBDevice.url() != null) {
454             return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request;
455         } else {
456             return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString()
457                     + COLON + restSBDevice.port() + request;
458         }
459     }
460
461     private String getSubscriptionIdFromDeviceId(DeviceId deviceId) {
462         if (subscribedDevicesTable.containsKey(deviceId)) {
463             return subscribedDevicesTable.get(deviceId);
464         }
465         return null;
466     }
467
468     private BlockingQueue<String> getEventQ(DeviceId deviceId) {
469         if (eventQMap.containsKey(deviceId)) {
470             return eventQMap.get(deviceId);
471         }
472         return null;
473     }
474
475     /**
476      * Returns restconfApiCallNode.
477      *
478      * @return restconfApiCallNode
479      */
480     protected RestconfApiCallNode restconfapiCallNode() {
481         return restconfApiCallNode;
482     }
483
484     /**
485      * Sets restconfApiCallNode.
486      *
487      * @param node restconfApiCallNode
488      */
489     void restconfapiCallNode(RestconfApiCallNode node) {
490         restconfApiCallNode = node;
491     }
492
493     Map<String, SubscriptionInfo> subscriptionInfoMap() {
494         return subscriptionInfoMap;
495     }
496
497     void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
498         this.subscriptionInfoMap = subscriptionInfoMap;
499     }
500
501     LinkedBlockingQueue<String> eventQueue() {
502         return eventQueue;
503     }
504
505     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
506         this.eventQueue = eventQueue;
507     }
508
509     /**
510      * Establishes a persistent SSE connection between the client and the server.
511      *
512      * @param paramMap input paramter map
513      * @param ctx service logic context
514      */
515     @Override
516     public void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException {
517
518         //TODO: FIXME: remove the instantiation of info; not useful
519         String subscriberId = paramMap.get(SUBSCRIBER_ID);
520         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
521                 paramMap.get("rpc"),
522                 paramMap.get("version"),
523                 paramMap.get("mode"));
524         SubscriptionInfo info = new SubscriptionInfo();
525         info.callBackDG(callbackDG);
526         info.subscriberId(subscriberId);
527
528         String sseUrlString = paramMap.get(SSE_URL);
529         URL sseUrl = null;
530         RestSBDevice dev = null;
531         try {
532             sseUrl = new URL(sseUrlString);
533             dev = getDevice(sseUrl.getHost(), sseUrl.getPort());
534         } catch (MalformedURLException e) {
535             log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e);
536             return;
537         }
538
539         if (dev == null) {
540             log.warn("establishPersistentSseConnection::device does not exist in the map. Trying to add one now.");
541             dev = new DefaultRestSBDevice(sseUrl.getHost(),
542                     sseUrl.getPort(), "onos", "rocks", "http",
543                     sseUrl.getHost() + ":" + sseUrl.getPort(), true);
544             this.addDevice(dev);
545         }
546
547         if (isNotificationEnabled(dev.deviceId())) {
548             log.warn("establishPersistentSseConnection::notifications already enabled on device: {}",
549                     dev.deviceId());
550             return;
551         }
552
553         if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) {
554             log.warn("This device {} has not yet been subscribed to receive notifications.",
555                     dev.deviceId());
556             return;
557         }
558
559         RestconfNotificationEventListenerImpl myListener =
560                 new RestconfNotificationEventListenerImpl(info);
561         enableNotifications(dev.deviceId(), "yang-push-json", "json", myListener);
562     }
563
564     @Override
565     public void enableNotifications(DeviceId device, String request,
566                                     String mediaType,
567                                     RestconfNotificationEventListener listener) {
568         if (isNotificationEnabled(device)) {
569             log.warn("enableNotifications::already enabled on device: {}", device);
570             return;
571         }
572
573         request = discoverRootResource(device) + NOTIFICATION_PATH_PREFIX
574                 + request;
575
576         addNotificationListener(device, listener);
577
578         GetChunksRunnable runnable = new GetChunksRunnable(request, mediaType,
579                 device);
580         runnableTable.put(device, runnable);
581         executor.execute(runnable);
582     }
583
584     public void stopNotifications(DeviceId device) {
585         try {
586             runnableTable.get(device).terminate();
587             processorRunnableTable.get(device).terminate();
588         } catch (Exception ex) {
589             log.error("stopNotifications::Exception happened when terminating, ex: {}", ex);
590         }
591         log.info("stopNotifications::Runnable is now terminated");
592         runnableTable.remove(device);
593         processorRunnableTable.remove(device);
594         restconfNotificationListenerMap.remove(device);
595         log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
596     }
597
598     @Override
599     public void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) {
600         String deleteSubscribeUrlString = paramMap.get(REST_API_URL);
601         URL deleteSubscribeUrl = null;
602         RestSBDevice dev = null;
603         try {
604             deleteSubscribeUrl = new URL(deleteSubscribeUrlString);
605             dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort());
606         } catch (MalformedURLException e) {
607             log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
608             return;
609         }
610
611         String deviceIp = deleteSubscribeUrl.getHost();
612         String devicePort = String.valueOf(deleteSubscribeUrl.getPort());
613         log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}",
614                 deviceIp, devicePort);
615         if (dev == null) {
616             log.error("deleteSubscriptionAndSseConnection::device does not exist in the map");
617             return;
618         }
619         String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId());
620
621         if (subscriptionId != null) {
622             log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId);
623             log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request");
624             try {
625                 ctx.setAttribute("subscriptionId", subscriptionId);
626                 restapiCallNode.sendRequest(paramMap, ctx);
627                 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
628                     log.info("deleteSubscriptionAndSseConnection::Successfully unsubscribed");
629                     stopNotifications(dev.deviceId());
630                     subscribedDevicesTable.remove(dev.deviceId());
631
632                     String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
633                     if (id != null) {
634                         subscriptionInfoMap.remove(id);
635                     }
636
637                 } else {
638                     log.info("deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull");
639                 }
640             } catch (SvcLogicException e) {
641                 log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e);
642             }
643         } else {
644             log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
645         }
646     }
647
648     /**
649      * Notifies providers about incoming RESTCONF notification events.
650      */public class GetChunksRunnable implements Runnable {
651         private String request;
652         private String mediaType;
653         private DeviceId deviceId;
654
655         private volatile boolean running = true;
656
657         public void terminate() {
658             log.info("GetChunksRunnable.terminate()::threadID: {}",
659                     Thread.currentThread().getId());
660             running = false;
661         }
662
663         /**
664          * @param request   request
665          * @param mediaType media type
666          * @param deviceId    device identifier
667          */
668         public GetChunksRunnable(String request, String mediaType,
669                                  DeviceId deviceId) {
670             this.request = request;
671             this.mediaType = mediaType;
672             this.deviceId = deviceId;
673         }
674
675         @Override
676         public void run() {
677             log.trace("GetChunksRunnable.run()::threadID is: {} ...., running is: {}",
678                     Thread.currentThread().getId(), running);
679             try {
680                 Client client = ClientBuilder.newBuilder()
681                         .register(SseFeature.class).build();
682                 WebTarget target = client.target(getUrlString(deviceId, request));
683                 log.trace("GetChunksRunnable.run()::target URI is {}", target.getUri().toString());
684                 Response response = target.request().get();
685                 EventInput eventInput = response.readEntity(EventInput.class);
686                 log.trace("GetChunksRunnable.run()::after eventInput");
687                 String rcvdData = "";
688                 while (!eventInput.isClosed() && running) {
689                     log.trace("GetChunksRunnable.run()::inside while ...");
690                     final InboundEvent inboundEvent = eventInput.read();
691                     log.trace("GetChunksRunnable.run()::after eventInput.read() ...");
692                     if (inboundEvent == null) {
693                         // connection has been closed
694                         log.info("GetChunksRunnable.run()::connection has been closed ...");
695                         break;
696                     }
697                     if (running) {
698                         rcvdData = inboundEvent.readData(String.class);
699                         BlockingQueue<String> eventQ = getEventQ(deviceId);
700                         if (eventQ != null) {
701                             eventQ.add(rcvdData);
702                             eventQMap.put(deviceId, eventQ);
703                             log.trace("GetChunksRunnable.run()::eventQ got filled.");
704                         } else {
705                             log.error("GetChunksRunnable.run()::eventQ has not been initialized for this device {}",
706                                     deviceId);
707                         }
708                     } else {
709                         log.info("GetChunksRunnable.run()::running has changed to false while eventInput.read() " +
710                                 "was blocked to receive new notifications");
711                         log.info("GetChunksRunnable.run()::the client is no longer interested to " +
712                                 "receive notifications.");
713                         break;
714                     }
715                 }
716                 if (!running) {
717                     log.trace("GetChunksRunnable.run()::running is false! " +
718                                     "closing eventInput, threadID: {}", Thread.currentThread().getId());
719                     eventInput.close();
720                     response.close();
721                     client.close();
722                     log.info("GetChunksRunnable.run()::eventInput is closed in run()");
723                 }
724             } catch (Exception ex) {
725                 log.info("GetChunksRunnable.run()::We got some exception: {}, threadID: {} ", ex,
726                         Thread.currentThread().getId());
727             }
728             log.trace("GetChunksRunnable.run()::after Runnable Try Catch. threadID: {} ",
729                     Thread.currentThread().getId());
730         }
731     }
732
733     public class InternalRestconfEventProcessorRunnable implements Runnable {
734
735         private volatile boolean running = true;
736         private DeviceId deviceId;
737
738         public InternalRestconfEventProcessorRunnable(DeviceId deviceId) {
739             this.deviceId = deviceId;
740         }
741
742         public void terminate() {
743             log.info("InternalRestconfEventProcessorRunnable.terminate()::threadID: {}",
744                     Thread.currentThread().getId());
745             running = false;
746         }
747
748         @Override
749         public void run() {
750             log.trace("InternalRestconfEventProcessorRunnable::restconf event processor runnable inside run()");
751             while (running) {
752                 try {
753                     if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) {
754                         log.trace("InternalRestconfEventProcessorRunnable::waiting for take()");
755                         if (running) {
756                             String eventJsonString = eventQMap.get(deviceId).take();
757                             log.trace("InternalRestconfEventProcessorRunnable::after take()");
758                             log.info("InternalRestconfEventProcessorRunnable::eventJsonString is {}", eventJsonString);
759                             Map<String, String> param = convertToProperties(eventJsonString);
760                             String idString = param.get("push-change-update.subscription-id");
761                             SubscriptionInfo info = subscriptionInfoMap().get(idString);
762                             if (info != null) {
763                                 SvcLogicContext ctx = setContext(param);
764                                 SvcLogicGraphInfo callbackDG = info.callBackDG();
765                                 callbackDG.executeGraph(ctx);
766                             }
767                         } else {
768                             log.info("InternalRestconfEventProcessorRunnable.run()::running has changed to false " +
769                                     "while eventQ was blocked to process new notifications");
770                             log.info("InternalRestconfEventProcessorRunnable.run()::" +
771                                     "the client is no longer interested to receive notifications.");
772                             break;
773                         }
774                     }
775                 } catch (InterruptedException | SvcLogicException e) {
776                     e.printStackTrace();
777                 }
778             }
779         }
780         private SvcLogicContext setContext(Map<String, String> param) {
781             SvcLogicContext ctx = new SvcLogicContext();
782             for (Map.Entry<String, String> entry : param.entrySet()) {
783                 ctx.setAttribute(entry.getKey(), entry.getValue());
784             }
785             return ctx;
786         }
787     }
788
789     public String discoverRootResource(DeviceId device) {
790         return ROOT_RESOURCE;
791     }
792
793     @Override
794     public void addNotificationListener(DeviceId deviceId,
795                                         RestconfNotificationEventListener listener) {
796         Set<RestconfNotificationEventListener> listeners =
797                 restconfNotificationListenerMap.get(deviceId);
798         if (listeners == null) {
799             listeners = new HashSet<>();
800         }
801
802         listeners.add(listener);
803
804         this.restconfNotificationListenerMap.put(deviceId, listeners);
805     }
806
807     @Override
808     public void removeNotificationListener(DeviceId deviceId,
809                                            RestconfNotificationEventListener listener) {
810         Set<RestconfNotificationEventListener> listeners =
811                 restconfNotificationListenerMap.get(deviceId);
812         if (listeners != null) {
813             listeners.remove(listener);
814         }
815     }
816
817     public boolean isNotificationEnabled(DeviceId deviceId) {
818         return runnableTable.containsKey(deviceId);
819     }
820
821 }