220de8790e7fe7e20f00efb83c4219a059b1cce5
[ccsdk/sli.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - CCSDK
4  * ================================================================================
5  * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
22
23 import com.google.common.collect.ImmutableMap;
24 import org.glassfish.jersey.media.sse.EventInput;
25 import org.glassfish.jersey.media.sse.EventSource;
26 import org.glassfish.jersey.media.sse.InboundEvent;
27 import org.glassfish.jersey.media.sse.SseFeature;
28 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
29 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
30 import org.onap.ccsdk.sli.core.utils.common.AcceptIpAddressHostNameVerifier;
31 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
32 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
33 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
34 import org.slf4j.Logger;
35
36 import javax.net.ssl.SSLContext;
37 import javax.net.ssl.TrustManager;
38 import javax.net.ssl.X509TrustManager;
39 import javax.ws.rs.client.Client;
40 import javax.ws.rs.client.ClientBuilder;
41 import javax.ws.rs.client.WebTarget;
42 import javax.ws.rs.core.Response;
43 import java.net.MalformedURLException;
44 import java.net.URL;
45 import java.security.KeyManagementException;
46 import java.security.NoSuchAlgorithmException;
47 import java.security.cert.CertificateException;
48 import java.security.cert.X509Certificate;
49 import java.util.Map;
50 import java.util.NoSuchElementException;
51 import java.util.concurrent.*;
52
53 import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties;
54 import static org.slf4j.LoggerFactory.getLogger;
55
56 /**
57  * Representation of a plugin to subscribe for notification and then
58  * to handle the received notifications.
59  */
60 public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin {
61
62     private static final Logger log = getLogger(PeriodicDiscoveryNode.class);
63
64     private static final String ROOT_RESOURCE = "/restconf";
65     private static final String SUBSCRIBER_ID = "subscriberId";
66     private static final String RESPONSE_CODE = "response-code";
67     private static final String RESPONSE_PREFIX = "responsePrefix";
68     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
69             "ications:establish-subscription.output.identifier";
70     private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier";
71     private static final String RESPONSE_CODE_200 = "200";
72     private static final String SSE_URL = "sseConnectURL";
73     private static final String PERIODIC_PUL_URL = "periodicPullURL";
74     private static final String REST_API_URL = "restapiUrl";
75     private static final String RESOURCE_PATH_PREFIX = "/data/";
76     private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
77     private static final String DEVICE_IP = "deviceIp";
78     private static final String DEVICE_PORT = "devicePort";
79     private static final String DOUBLESLASH = "//";
80     private static final String COLON = ":";
81
82     private RestconfApiCallNode restconfApiCallNode;
83     private RestapiCallNode restapiCallNode = new RestapiCallNode();
84     private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
85     private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
86     private Map<DeviceId, PeriodicPullRunnable> periodicRunnableTable = new ConcurrentHashMap<>();
87     private Map<DeviceId, String> subscribedDevicesTable = new ConcurrentHashMap<>();
88     private Map<DeviceId, BlockingQueue<String>> eventQMap = new ConcurrentHashMap<>();
89     private Map<DeviceId, InternalPeriodicPullingProcessorRunnable>
90             processorRunnableTable = new ConcurrentHashMap<>();
91     private final Map<DeviceId, RestSBDevice> deviceMap = new ConcurrentHashMap<>();
92     private final Map<DeviceId, Client> clientMap = new ConcurrentHashMap<>();
93     private ExecutorService executor = Executors.newCachedThreadPool();
94     private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
95
96     /**
97      * Creates an instance of RestconfDiscoveryNode and starts processing of
98      * event.
99      *
100      * @param r restconf api call node
101      */
102     public PeriodicDiscoveryNode(RestconfApiCallNode r) {
103         log.info("inside RestconfDiscoveryNode Constructor");
104         this.restconfApiCallNode = r;
105         this.activate();
106 //        ExecutorService e = Executors.newFixedThreadPool(20);
107 //        EventProcessor p = new EventProcessor(this);
108 //        for (int i = 0; i < 20; ++i) {
109 //            e.execute(p);
110 //        }
111     }
112
113     public void activate() {
114         log.info("RESTCONF SBI Started");
115     }
116
117     public void deactivate() {
118         log.info("RESTCONF SBI Stopped");
119         executor.shutdown();
120         this.getClientMap().clear();
121         this.getDeviceMap().clear();
122     }
123
124     public Map<DeviceId, RestSBDevice> getDeviceMap() {
125         return deviceMap;
126     }
127
128     public Map<DeviceId, Client> getClientMap() {
129         return clientMap;
130     }
131
132     @Override
133     public Map<DeviceId, RestSBDevice> getDevices() {
134         log.trace("RESTCONF SBI::getDevices");
135         return ImmutableMap.copyOf(deviceMap);
136     }
137
138     @Override
139     public RestSBDevice getDevice(DeviceId deviceInfo) {
140         log.trace("RESTCONF SBI::getDevice with deviceId");
141         return deviceMap.get(deviceInfo);
142     }
143
144     @Override
145     public RestSBDevice getDevice(String ip, int port) {
146         log.trace("RESTCONF SBI::getDevice with ip and port");
147         try {
148             if (!deviceMap.isEmpty()) {
149                 return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get();
150             }
151         } catch (NoSuchElementException noSuchElementException) {
152             log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port);
153         }
154         return null;
155     }
156
157     @Override
158     public void addDevice(RestSBDevice device) {
159         log.trace("RESTCONF SBI::addDevice");
160         if (!deviceMap.containsKey(device.deviceId())) {
161             if (device.username() != null) {
162                 String username = device.username();
163                 String password = device.password() == null ? "" : device.password();
164     //                authenticate(client, username, password);
165             }
166             BlockingQueue<String> newBlockingQueue = new LinkedBlockingQueue<>();
167             eventQMap.put(device.deviceId(), newBlockingQueue);
168             InternalPeriodicPullingProcessorRunnable eventProcessorRunnable =
169                     new InternalPeriodicPullingProcessorRunnable(device.deviceId());
170             processorRunnableTable.put(device.deviceId(), eventProcessorRunnable);
171             log.trace("addDevice::restconf event processor runnable is created and is going for execute");
172             executor.execute(eventProcessorRunnable);
173             log.trace("addDevice::restconf event processor runnable was sent for execute");
174             deviceMap.put(device.deviceId(), device);
175         } else {
176             log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId());
177         }
178     }
179
180     @Override
181     public void removeDevice(DeviceId deviceId) {
182         log.trace("RESTCONF SBI::removeDevice");
183         eventQMap.remove(deviceId);
184         clientMap.remove(deviceId);
185         deviceMap.remove(deviceId);
186     }
187
188     @Override
189     public void establishSubscription(Map<String, String> paramMap,
190                                       SvcLogicContext ctx) throws SvcLogicException {
191         String subscriberId = paramMap.get(SUBSCRIBER_ID);
192         if (subscriberId == null) {
193             throw new SvcLogicException("Subscriber Id is null");
194         }
195
196         restconfApiCallNode.sendRequest(paramMap, ctx);
197
198         if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
199             // TODO: save subscription id and subscriber in MYSQL
200
201             establishPersistentConnection(paramMap, ctx, subscriberId);
202         } else {
203             log.info("Failed to subscribe {}", subscriberId);
204             throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
205         }
206     }
207
208     @Override
209     public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx)
210             throws SvcLogicException {
211         String subscriberId = paramMap.get(SUBSCRIBER_ID);
212         if (subscriberId == null) {
213             throw new SvcLogicException("Subscriber Id is null");
214         }
215
216         String subscribeUrlString = paramMap.get(REST_API_URL);
217         URL subscribeUrl = null;
218         RestSBDevice dev = null;
219         try {
220             subscribeUrl = new URL(subscribeUrlString);
221             dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort());
222         } catch (MalformedURLException e) {
223             log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
224             return;
225         }
226
227         if (dev == null) {
228             log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now.");
229             //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap
230             dev = new DefaultRestSBDevice(subscribeUrl.getHost(),
231                     subscribeUrl.getPort(), "onos", "rocks", "http",
232                     subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true);
233             this.addDevice(dev);
234         }
235
236         if (!subscribedDevicesTable.containsKey(dev.deviceId())) {
237             log.info("establishSubscriptionOnly::The device {} has not been subscribed yet. " +
238                     "Trying to subscribe it now...");
239             restapiCallNode.sendRequest(paramMap, ctx);
240             if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
241                 // TODO: save subscription id and subscriber in MYSQL
242                 String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
243                 log.info("establishSubscriptionOnly::Subscription is done successfully and " +
244                         "the output.identifier is: {}", id);
245                 log.info("establishSubscriptionOnly::The subscriptionID returned by the server " +
246                         "does not exist in the map. Adding it now...");
247                 subscribedDevicesTable.put(dev.deviceId(), id);
248
249                 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
250                         paramMap.get("rpc"),
251                         paramMap.get("version"),
252                         paramMap.get("mode"));
253                 SubscriptionInfo info = new SubscriptionInfo();
254                 info.callBackDG(callbackDG);
255                 info.subscriptionId(id);
256                 info.subscriberId(subscriberId);
257                 subscriptionInfoMap.put(id, info);
258
259             }
260         }
261
262     }
263
264     @Override
265     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
266         // TODO: to be implemented
267     }
268
269     @Override
270     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
271         String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
272         if (id != null) {
273             subscriptionInfoMap.remove(id);
274         }
275     }
276
277     protected String getTokenId(String customHttpHeaders) {
278         if (customHttpHeaders.contains("=")) {
279             String[] s = customHttpHeaders.split("=");
280             return s[1];
281         }
282         return customHttpHeaders;
283     }
284
285     protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
286         if (customHttpHeaders == null) {
287             return target;
288         }
289
290         return new AdditionalHeaderWebTarget(
291                 target, getTokenId(customHttpHeaders));
292     }
293
294     /**
295      * Establishes a persistent between the client and server.
296      *
297      * @param paramMap input paramter map
298      * @param ctx service logic context
299      * @param subscriberId subscriber identifier
300      */
301     void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
302                                               String subscriberId) {
303     }
304
305     /**
306      * Returns response code.
307      *
308      * @param prefix prefix given in input parameter
309      * @param ctx service logic context
310      * @return response code
311      */
312     String getResponseCode(String prefix, SvcLogicContext ctx) {
313         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
314     }
315
316     String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) {
317         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX);
318     }
319
320     /**
321      * Returns subscription id from event.
322      *
323      * @param prefix prefix given in input parameter
324      * @param ctx service logic context
325      * @return subscription id from event
326      */
327     String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
328         return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
329     }
330
331     private String getPrefix(String prefix) {
332         return prefix != null ? prefix + "." : "";
333     }
334
335     private String getSubscriptionId(String subscriberId) {
336         for (Map.Entry<String,SubscriptionInfo> entry
337                 : subscriptionInfoMap.entrySet()) {
338             if (entry.getValue().subscriberId()
339                     .equals(subscriberId)) {
340                 return entry.getKey();
341             }
342         }
343         return null;
344     }
345
346     private String getUrlString(DeviceId deviceId, String request) {
347         RestSBDevice restSBDevice = deviceMap.get(deviceId);
348         if (restSBDevice == null) {
349             log.warn("getUrlString::restSbDevice cannot be NULL!");
350             return "";
351         }
352         if (restSBDevice.url() != null) {
353             return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request;
354         } else {
355             return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString()
356                     + COLON + restSBDevice.port() + request;
357         }
358     }
359
360     private String getSubscriptionIdFromDeviceId(DeviceId deviceId) {
361         if (subscribedDevicesTable.containsKey(deviceId)) {
362             return subscribedDevicesTable.get(deviceId);
363         }
364         return null;
365     }
366
367     private BlockingQueue<String> getEventQ(DeviceId deviceId) {
368         if (eventQMap.containsKey(deviceId)) {
369             return eventQMap.get(deviceId);
370         }
371         return null;
372     }
373
374     /**
375      * Returns restconfApiCallNode.
376      *
377      * @return restconfApiCallNode
378      */
379     protected RestconfApiCallNode restconfapiCallNode() {
380         return restconfApiCallNode;
381     }
382
383     /**
384      * Sets restconfApiCallNode.
385      *
386      * @param node restconfApiCallNode
387      */
388     void restconfapiCallNode(RestconfApiCallNode node) {
389         restconfApiCallNode = node;
390     }
391
392     Map<String, SubscriptionInfo> subscriptionInfoMap() {
393         return subscriptionInfoMap;
394     }
395
396     void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
397         this.subscriptionInfoMap = subscriptionInfoMap;
398     }
399
400     LinkedBlockingQueue<String> eventQueue() {
401         return eventQueue;
402     }
403
404     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
405         this.eventQueue = eventQueue;
406     }
407
408     /**
409      * Establishes a persistent SSE connection between the client and the server.
410      *
411      * @param paramMap input paramter map
412      * @param ctx service logic context
413      */
414     @Override
415     public void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException {
416
417     }
418
419     @Override
420     public void establishPeriodicPullConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException {
421         String subscriberId = paramMap.get(SUBSCRIBER_ID);
422         SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
423                 paramMap.get("rpc"),
424                 paramMap.get("version"),
425                 paramMap.get("mode"));
426         SubscriptionInfo info = new SubscriptionInfo();
427         info.callBackDG(callbackDG);
428         info.subscriberId(subscriberId);
429
430         String periodicPullUrlString = paramMap.get(PERIODIC_PUL_URL);
431         URL periodicPullUrl = null;
432         RestSBDevice dev = null;
433         try {
434             periodicPullUrl = new URL(periodicPullUrlString);
435             dev = getDevice(periodicPullUrl.getHost(), periodicPullUrl.getPort());
436         } catch (MalformedURLException e) {
437             log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e);
438             return;
439         }
440
441         if (dev == null) {
442             log.warn("establishPeriodicPullConnection::device does not exist in the map. Trying to add one now.");
443             dev = new DefaultRestSBDevice(periodicPullUrl.getHost(),
444                     periodicPullUrl.getPort(), "onos", "rocks", "http",
445                     periodicPullUrl.getHost() + ":" + periodicPullUrl.getPort(), true);
446             this.addDevice(dev);
447         }
448
449         if (isNotificationEnabled(dev.deviceId())) {
450             log.warn("establishPeriodicPullConnection::notifications already enabled on device: {}",
451                     dev.deviceId());
452             return;
453         }
454
455         if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) {
456             log.warn("This device {} has not yet been subscribed to receive notifications.",
457                     dev.deviceId());
458             return;
459         }
460
461         RestconfNotificationEventListenerImpl myListener =
462                 new RestconfNotificationEventListenerImpl(info);
463         enableNotifications(dev.deviceId(), "ietf-service-pm:performance-monitoring", "json", myListener);
464     }
465
466     @Override
467     public void enableNotifications(DeviceId device, String request,
468                                     String mediaType,
469                                     RestconfNotificationEventListener listener) {
470         if (isNotificationEnabled(device)) {
471             log.warn("enableNotifications::already enabled on device: {}", device);
472             return;
473         }
474
475         request = discoverRootResource(device) + RESOURCE_PATH_PREFIX
476                 + request;
477
478         addNotificationListener(device, listener);
479
480         PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device);
481         periodicRunnableTable.put(device, periodicRunnable);
482         scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 60, TimeUnit.SECONDS);
483     }
484
485     public void stopNotifications(DeviceId device) {
486         try {
487             periodicRunnableTable.get(device).terminate();
488             processorRunnableTable.get(device).terminate();
489         } catch (Exception ex) {
490             log.error("stopNotifications::Exception happened when terminating, ex: {}", ex);
491         }
492         log.info("stopNotifications::Runnable is now terminated");
493         periodicRunnableTable.remove(device);
494         processorRunnableTable.remove(device);
495         log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
496     }
497
498     @Override
499     public void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) {
500         String deleteSubscribeUrlString = paramMap.get(REST_API_URL);
501         URL deleteSubscribeUrl = null;
502         RestSBDevice dev = null;
503         try {
504             deleteSubscribeUrl = new URL(deleteSubscribeUrlString);
505             dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort());
506         } catch (MalformedURLException e) {
507             log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
508             return;
509         }
510
511         String deviceIp = deleteSubscribeUrl.getHost();
512         String devicePort = String.valueOf(deleteSubscribeUrl.getPort());
513         log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}",
514                 deviceIp, devicePort);
515         if (dev == null) {
516             log.error("deleteSubscriptionAndSseConnection::device does not exist in the map");
517             return;
518         }
519         String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId());
520
521         if (subscriptionId != null) {
522             log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId);
523             log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request");
524             try {
525                 ctx.setAttribute("subscriptionId", subscriptionId);
526                 restapiCallNode.sendRequest(paramMap, ctx);
527                 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
528                     log.info("deleteSubscriptionAndSseConnection::Successfully unsubscribed");
529                     stopNotifications(dev.deviceId());
530                     subscribedDevicesTable.remove(dev.deviceId());
531
532                     String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
533                     if (id != null) {
534                         subscriptionInfoMap.remove(id);
535                     }
536
537                 } else {
538                     log.info("deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull");
539                 }
540             } catch (SvcLogicException e) {
541                 log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e);
542             }
543         } else {
544             log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
545         }
546     }
547
548     public class PeriodicPullRunnable implements Runnable {
549         private String request;
550         private DeviceId deviceId;
551
552         private volatile boolean running = true;
553
554         public void terminate() {
555             log.info("PeriodicPullRunnable.terminate()::threadID: {}",
556                     Thread.currentThread().getId());
557             running = false;
558         }
559
560         /**
561          * @param request   request
562          * @param deviceId    device identifier
563          */
564         public PeriodicPullRunnable(String request, DeviceId deviceId) {
565             this.request = request;
566             this.deviceId = deviceId;
567         }
568
569         @Override
570         public void run() {
571             log.trace("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}",
572                     Thread.currentThread().getId(), running);
573             try {
574                     Client client = ClientBuilder.newBuilder().build();
575                     WebTarget target = client.target(getUrlString(deviceId, request));
576                     log.trace("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString());
577                     Response response = null;
578                     if (running) {
579                         response = target.request().get();
580                         String rcvdData = response.readEntity(String.class);
581                         log.trace("PeriodicPullRunnable.run()::after readEntity");
582                         BlockingQueue<String> eventQ = getEventQ(deviceId);
583                         if (eventQ != null) {
584                             eventQ.add(rcvdData);
585                             eventQMap.put(deviceId, eventQ);
586                             log.trace("PeriodicPullRunnable.run()::eventQ got filled.");
587                         } else {
588                             log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}",
589                                     deviceId);
590                         }
591                     } else {
592                         log.trace("PeriodicPullRunnable.run()::running is false! " +
593                                 "closing the client and the response, threadID: {}", Thread.currentThread().getId());
594                         response.close();
595                         client.close();
596                         log.info("PeriodicPullRunnable.run()::eventInput is closed in run()");
597                     }
598             } catch (Exception ex) {
599                 log.info("PeriodicPullRunnable.run()::We got some exception: {}, threadID: {} ", ex,
600                         Thread.currentThread().getId());
601             }
602             log.trace("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ",
603                     Thread.currentThread().getId());
604         }
605     }
606
607     public class InternalPeriodicPullingProcessorRunnable implements Runnable {
608
609         private volatile boolean running = true;
610         private DeviceId deviceId;
611
612         public InternalPeriodicPullingProcessorRunnable(DeviceId deviceId) {
613             this.deviceId = deviceId;
614         }
615
616         public void terminate() {
617             log.info("InternalPeriodicPullingProcessorRunnable.terminate()::threadID: {}",
618                     Thread.currentThread().getId());
619             running = false;
620         }
621
622         @Override
623         public void run() {
624             log.trace("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()");
625             while (running) {
626                 try {
627                     if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) {
628                         log.trace("InternalPeriodicPullingProcessorRunnable::waiting for take()");
629                         if (running) {
630                             String eventJsonString = eventQMap.get(deviceId).take();
631                             log.trace("InternalPeriodicPullingProcessorRunnable::after take()");
632                             log.info("InternalPeriodicPullingProcessorRunnable::eventJsonString is {}", eventJsonString);
633                             Map<String, String> param = convertToProperties(eventJsonString);
634                             String idString = param.get("push-change-update.subscription-id");
635                             SubscriptionInfo info = subscriptionInfoMap().get(idString);
636                             if (info != null) {
637                                 SvcLogicContext ctx = setContext(param);
638                                 SvcLogicGraphInfo callbackDG = info.callBackDG();
639                                 callbackDG.executeGraph(ctx);
640                             }
641                         } else {
642                             log.info("InternalPeriodicPullingProcessorRunnable.run()::running has changed to false " +
643                                     "while eventQ was blocked to process new notifications");
644                             log.info("InternalPeriodicPullingProcessorRunnable.run()::" +
645                                     "the client is no longer interested to receive notifications.");
646                             break;
647                         }
648                     }
649                 } catch (InterruptedException | SvcLogicException e) {
650                     e.printStackTrace();
651                 }
652             }
653         }
654         private SvcLogicContext setContext(Map<String, String> param) {
655             SvcLogicContext ctx = new SvcLogicContext();
656             for (Map.Entry<String, String> entry : param.entrySet()) {
657                 ctx.setAttribute(entry.getKey(), entry.getValue());
658             }
659             return ctx;
660         }
661     }
662
663     public String discoverRootResource(DeviceId device) {
664         return ROOT_RESOURCE;
665     }
666
667     @Override
668     public void addNotificationListener(DeviceId deviceId,
669                                         RestconfNotificationEventListener listener) {
670     }
671
672     @Override
673     public void removeNotificationListener(DeviceId deviceId,
674                                            RestconfNotificationEventListener listener) {
675     }
676
677     public boolean isNotificationEnabled(DeviceId deviceId) {
678         return periodicRunnableTable.containsKey(deviceId);
679     }
680
681 }