2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.gson.JsonObject;
25 import com.google.gson.JsonParser;
26 import org.glassfish.jersey.media.sse.EventInput;
27 import org.glassfish.jersey.media.sse.EventSource;
28 import org.glassfish.jersey.media.sse.InboundEvent;
29 import org.glassfish.jersey.media.sse.SseFeature;
30 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
31 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
32 import org.onap.ccsdk.sli.core.utils.common.AcceptIpAddressHostNameVerifier;
33 import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
34 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
35 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import javax.net.ssl.SSLContext;
40 import javax.net.ssl.TrustManager;
41 import javax.net.ssl.X509TrustManager;
42 import javax.ws.rs.client.Client;
43 import javax.ws.rs.client.ClientBuilder;
44 import javax.ws.rs.client.WebTarget;
45 import javax.ws.rs.core.Response;
46 import java.net.MalformedURLException;
48 import java.security.KeyManagementException;
49 import java.security.NoSuchAlgorithmException;
50 import java.security.cert.CertificateException;
51 import java.security.cert.X509Certificate;
52 import java.util.HashSet;
54 import java.util.NoSuchElementException;
56 import java.util.concurrent.*;
58 import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties;
59 import static org.slf4j.LoggerFactory.getLogger;
62 * Representation of a plugin to subscribe for notification and then
63 * to handle the received notifications.
65 public class RestconfDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin {
67 private static final Logger log = getLogger(RestconfDiscoveryNode.class);
69 private static final String ROOT_RESOURCE = "/restconf";
70 private static final String SUBSCRIBER_ID = "subscriberId";
71 private static final String RESPONSE_CODE = "response-code";
72 private static final String RESPONSE_PREFIX = "responsePrefix";
73 private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
74 "ications:establish-subscription.output.identifier";
75 private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier";
76 private static final String RESPONSE_CODE_200 = "200";
77 private static final String SSE_URL = "sseConnectURL";
78 private static final String REST_API_URL = "restapiUrl";
79 private static final String RESOURCE_PATH_PREFIX = "/data/";
80 private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
81 private static final String DEVICE_IP = "deviceIp";
82 private static final String DEVICE_PORT = "devicePort";
83 private static final String DOUBLESLASH = "//";
84 private static final String COLON = ":";
86 private RestconfApiCallNode restconfApiCallNode;
87 private RestapiCallNode restapiCallNode = new RestapiCallNode();
88 private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
89 private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
90 private Map<DeviceId, Set<RestconfNotificationEventListener>>
91 restconfNotificationListenerMap = new ConcurrentHashMap<>();
92 private Map<DeviceId, GetChunksRunnable>
93 runnableTable = new ConcurrentHashMap<>();
94 private Map<DeviceId, String> subscribedDevicesTable = new ConcurrentHashMap<>();
95 private Map<DeviceId, BlockingQueue<String>> eventQMap = new ConcurrentHashMap<>();
96 private Map<DeviceId, InternalRestconfEventProcessorRunnable>
97 processorRunnableTable = new ConcurrentHashMap<>();
98 private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
99 private final Map<DeviceId, RestSBDevice> deviceMap = new ConcurrentHashMap<>();
100 private final Map<DeviceId, Client> clientMap = new ConcurrentHashMap<>();
101 private ExecutorService executor = Executors.newCachedThreadPool();
104 * Creates an instance of RestconfDiscoveryNode and starts processing of
107 * @param r restconf api call node
109 public RestconfDiscoveryNode(RestconfApiCallNode r) {
110 log.info("inside RestconfDiscoveryNode Constructor");
111 this.restconfApiCallNode = r;
113 // ExecutorService e = Executors.newFixedThreadPool(20);
114 // EventProcessor p = new EventProcessor(this);
115 // for (int i = 0; i < 20; ++i) {
120 public void activate() {
121 log.info("RESTCONF SBI Started");
124 public void deactivate() {
125 log.info("RESTCONF SBI Stopped");
127 this.getClientMap().clear();
128 this.getDeviceMap().clear();
131 public Map<DeviceId, RestSBDevice> getDeviceMap() {
135 public Map<DeviceId, Client> getClientMap() {
140 public Map<DeviceId, RestSBDevice> getDevices() {
141 log.trace("RESTCONF SBI::getDevices");
142 return ImmutableMap.copyOf(deviceMap);
146 public RestSBDevice getDevice(DeviceId deviceInfo) {
147 log.trace("RESTCONF SBI::getDevice with deviceId");
148 return deviceMap.get(deviceInfo);
152 public RestSBDevice getDevice(String ip, int port) {
153 log.trace("RESTCONF SBI::getDevice with ip and port");
155 if (!deviceMap.isEmpty()) {
156 return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get();
158 } catch (NoSuchElementException noSuchElementException) {
159 log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port);
165 public void addDevice(RestSBDevice device) {
166 log.trace("RESTCONF SBI::addDevice");
167 if (!deviceMap.containsKey(device.deviceId())) {
168 if (device.username() != null) {
169 String username = device.username();
170 String password = device.password() == null ? "" : device.password();
171 // authenticate(client, username, password);
173 BlockingQueue<String> newBlockingQueue = new LinkedBlockingQueue<>();
174 eventQMap.put(device.deviceId(), newBlockingQueue);
175 InternalRestconfEventProcessorRunnable eventProcessorRunnable =
176 new InternalRestconfEventProcessorRunnable(device.deviceId());
177 processorRunnableTable.put(device.deviceId(), eventProcessorRunnable);
178 log.trace("addDevice::restconf event processor runnable is created and is going for execute");
179 executor.execute(eventProcessorRunnable);
180 log.trace("addDevice::restconf event processor runnable was sent for execute");
181 deviceMap.put(device.deviceId(), device);
183 log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId());
188 public void removeDevice(DeviceId deviceId) {
189 log.trace("RESTCONF SBI::removeDevice");
190 eventQMap.remove(deviceId);
191 clientMap.remove(deviceId);
192 deviceMap.remove(deviceId);
196 public void establishSubscription(Map<String, String> paramMap,
197 SvcLogicContext ctx) throws SvcLogicException {
198 String subscriberId = paramMap.get(SUBSCRIBER_ID);
199 if (subscriberId == null) {
200 throw new SvcLogicException("Subscriber Id is null");
203 restconfApiCallNode.sendRequest(paramMap, ctx);
205 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
206 // TODO: save subscription id and subscriber in MYSQL
208 establishPersistentConnection(paramMap, ctx, subscriberId);
210 log.info("Failed to subscribe {}", subscriberId);
211 throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE));
216 public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx)
217 throws SvcLogicException {
218 String subscriberId = paramMap.get(SUBSCRIBER_ID);
219 if (subscriberId == null) {
220 throw new SvcLogicException("Subscriber Id is null");
223 String subscribeUrlString = paramMap.get(REST_API_URL);
224 URL subscribeUrl = null;
225 RestSBDevice dev = null;
227 subscribeUrl = new URL(subscribeUrlString);
228 dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort());
229 } catch (MalformedURLException e) {
230 log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
235 log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now.");
236 //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap
237 dev = new DefaultRestSBDevice(subscribeUrl.getHost(),
238 subscribeUrl.getPort(), "onos", "rocks", "http",
239 subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true);
243 if (!subscribedDevicesTable.containsKey(dev.deviceId())) {
244 log.info("establishSubscriptionOnly::The device {} has not been subscribed yet. " +
245 "Trying to subscribe it now...");
246 restapiCallNode.sendRequest(paramMap, ctx);
247 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
248 // TODO: save subscription id and subscriber in MYSQL
249 String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
250 log.info("establishSubscriptionOnly::Subscription is done successfully and " +
251 "the output.identifier is: {}", id);
252 log.info("establishSubscriptionOnly::The subscriptionID returned by the server " +
253 "does not exist in the map. Adding it now...");
254 subscribedDevicesTable.put(dev.deviceId(), id);
256 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
258 paramMap.get("version"),
259 paramMap.get("mode"));
260 SubscriptionInfo info = new SubscriptionInfo();
261 info.callBackDG(callbackDG);
262 info.subscriptionId(id);
263 info.subscriberId(subscriberId);
264 subscriptionInfoMap.put(id, info);
272 public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
273 // TODO: to be implemented
277 public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
278 String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
280 PersistentConnection conn = runnableInfo.get(id);
282 runnableInfo.remove(id);
283 subscriptionInfoMap.remove(id);
287 class PersistentConnection implements Runnable {
289 private volatile boolean running = true;
290 private Map<String, String> paramMap;
292 PersistentConnection(String url, Map<String, String> paramMap) {
294 this.paramMap = paramMap;
297 private void terminate() {
304 WebTarget target = null;
306 RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
307 p = RestapiCallNode.getParameters(paramMap, new Parameters());
308 Client client = ignoreSslClient(p.disableHostVerification).register(SseFeature.class);
309 target = restapi.addAuthType(client, p).target(url);
310 } catch (SvcLogicException e) {
311 log.error("Exception occured!", e);
312 Thread.currentThread().interrupt();
315 target = addToken(target, paramMap.get("customHttpHeaders"));
316 EventSource eventSource = EventSource.target(target).build();
317 eventSource.register(new EventHandler(RestconfDiscoveryNode.this));
319 log.info("Connected to SSE source");
322 log.info("SSE state " + eventSource.isOpen());
324 } catch (InterruptedException e) {
325 log.error("Interrupted!", e);
326 Thread.currentThread().interrupt();
330 log.info("Closed connection to SSE source");
333 // Note: Sonar complains about host name verification being
334 // disabled here. This is necessary to handle devices using self-signed
335 // certificates (where CA would be unknown) - so we are leaving this code as is.
336 private Client ignoreSslClient(boolean disableHostVerification) {
337 SSLContext sslcontext = null;
340 sslcontext = SSLContext.getInstance("TLS");
341 sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
343 public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
347 public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
351 public X509Certificate[] getAcceptedIssuers() {
352 return new X509Certificate[0];
354 } }, new java.security.SecureRandom());
355 } catch (NoSuchAlgorithmException | KeyManagementException e) {
356 throw new IllegalStateException(e);
359 return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier(new AcceptIpAddressHostNameVerifier(disableHostVerification)).build();
363 protected String getTokenId(String customHttpHeaders) {
364 if (customHttpHeaders.contains("=")) {
365 String[] s = customHttpHeaders.split("=");
368 return customHttpHeaders;
371 protected WebTarget addToken(WebTarget target, String customHttpHeaders) {
372 if (customHttpHeaders == null) {
376 return new AdditionalHeaderWebTarget(
377 target, getTokenId(customHttpHeaders));
381 * Establishes a persistent between the client and server.
383 * @param paramMap input paramter map
384 * @param ctx service logic context
385 * @param subscriberId subscriber identifier
387 void establishPersistentConnection(Map<String, String> paramMap, SvcLogicContext ctx,
388 String subscriberId) {
389 String id = getOutputIdentifier(paramMap.get(RESPONSE_PREFIX), ctx);
390 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
392 paramMap.get("version"),
393 paramMap.get("mode"));
394 SubscriptionInfo info = new SubscriptionInfo();
395 info.callBackDG(callbackDG);
396 info.subscriptionId(id);
397 info.subscriberId(subscriberId);
398 subscriptionInfoMap.put(id, info);
400 String url = paramMap.get(SSE_URL);
401 PersistentConnection connection = new PersistentConnection(url, paramMap);
402 runnableInfo.put(id, connection);
403 executor.execute(connection);
407 * Returns response code.
409 * @param prefix prefix given in input parameter
410 * @param ctx service logic context
411 * @return response code
413 String getResponseCode(String prefix, SvcLogicContext ctx) {
414 return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
417 String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) {
418 return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX);
422 * Returns subscription id from event.
424 * @param prefix prefix given in input parameter
425 * @param ctx service logic context
426 * @return subscription id from event
428 String getOutputIdentifier(String prefix, SvcLogicContext ctx) {
429 return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER);
432 private String getPrefix(String prefix) {
433 return prefix != null ? prefix + "." : "";
436 private String getSubscriptionId(String subscriberId) {
437 for (Map.Entry<String,SubscriptionInfo> entry
438 : subscriptionInfoMap.entrySet()) {
439 if (entry.getValue().subscriberId()
440 .equals(subscriberId)) {
441 return entry.getKey();
447 private String getUrlString(DeviceId deviceId, String request) {
448 RestSBDevice restSBDevice = deviceMap.get(deviceId);
449 if (restSBDevice == null) {
450 log.warn("getUrlString::restSbDevice cannot be NULL!");
453 if (restSBDevice.url() != null) {
454 return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request;
456 return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString()
457 + COLON + restSBDevice.port() + request;
461 private String getSubscriptionIdFromDeviceId(DeviceId deviceId) {
462 if (subscribedDevicesTable.containsKey(deviceId)) {
463 return subscribedDevicesTable.get(deviceId);
468 private BlockingQueue<String> getEventQ(DeviceId deviceId) {
469 if (eventQMap.containsKey(deviceId)) {
470 return eventQMap.get(deviceId);
476 * Returns restconfApiCallNode.
478 * @return restconfApiCallNode
480 protected RestconfApiCallNode restconfapiCallNode() {
481 return restconfApiCallNode;
485 * Sets restconfApiCallNode.
487 * @param node restconfApiCallNode
489 void restconfapiCallNode(RestconfApiCallNode node) {
490 restconfApiCallNode = node;
493 Map<String, SubscriptionInfo> subscriptionInfoMap() {
494 return subscriptionInfoMap;
497 void subscriptionInfoMap(Map<String, SubscriptionInfo> subscriptionInfoMap) {
498 this.subscriptionInfoMap = subscriptionInfoMap;
501 LinkedBlockingQueue<String> eventQueue() {
505 void eventQueue(LinkedBlockingQueue<String> eventQueue) {
506 this.eventQueue = eventQueue;
510 * Establishes a persistent SSE connection between the client and the server.
512 * @param paramMap input paramter map
513 * @param ctx service logic context
516 public void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException {
518 //TODO: FIXME: remove the instantiation of info; not useful
519 String subscriberId = paramMap.get(SUBSCRIBER_ID);
520 SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
522 paramMap.get("version"),
523 paramMap.get("mode"));
524 SubscriptionInfo info = new SubscriptionInfo();
525 info.callBackDG(callbackDG);
526 info.subscriberId(subscriberId);
528 String sseUrlString = paramMap.get(SSE_URL);
530 RestSBDevice dev = null;
532 sseUrl = new URL(sseUrlString);
533 dev = getDevice(sseUrl.getHost(), sseUrl.getPort());
534 } catch (MalformedURLException e) {
535 log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e);
540 log.warn("establishPersistentSseConnection::device does not exist in the map. Trying to add one now.");
541 dev = new DefaultRestSBDevice(sseUrl.getHost(),
542 sseUrl.getPort(), "onos", "rocks", "http",
543 sseUrl.getHost() + ":" + sseUrl.getPort(), true);
547 if (isNotificationEnabled(dev.deviceId())) {
548 log.warn("establishPersistentSseConnection::notifications already enabled on device: {}",
553 if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) {
554 log.warn("This device {} has not yet been subscribed to receive notifications.",
559 RestconfNotificationEventListenerImpl myListener =
560 new RestconfNotificationEventListenerImpl(info);
561 enableNotifications(dev.deviceId(), "yang-push-json", "json", myListener);
565 public void enableNotifications(DeviceId device, String request,
567 RestconfNotificationEventListener listener) {
568 if (isNotificationEnabled(device)) {
569 log.warn("enableNotifications::already enabled on device: {}", device);
573 request = discoverRootResource(device) + NOTIFICATION_PATH_PREFIX
576 addNotificationListener(device, listener);
578 GetChunksRunnable runnable = new GetChunksRunnable(request, mediaType,
580 runnableTable.put(device, runnable);
581 executor.execute(runnable);
584 public void stopNotifications(DeviceId device) {
586 runnableTable.get(device).terminate();
587 processorRunnableTable.get(device).terminate();
588 } catch (Exception ex) {
589 log.error("stopNotifications::Exception happened when terminating, ex: {}", ex);
591 log.info("stopNotifications::Runnable is now terminated");
592 runnableTable.remove(device);
593 processorRunnableTable.remove(device);
594 restconfNotificationListenerMap.remove(device);
595 log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
599 public void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) {
600 String deleteSubscribeUrlString = paramMap.get(REST_API_URL);
601 URL deleteSubscribeUrl = null;
602 RestSBDevice dev = null;
604 deleteSubscribeUrl = new URL(deleteSubscribeUrlString);
605 dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort());
606 } catch (MalformedURLException e) {
607 log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
611 String deviceIp = deleteSubscribeUrl.getHost();
612 String devicePort = String.valueOf(deleteSubscribeUrl.getPort());
613 log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}",
614 deviceIp, devicePort);
616 log.error("deleteSubscriptionAndSseConnection::device does not exist in the map");
619 String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId());
621 if (subscriptionId != null) {
622 log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId);
623 log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request");
625 restapiCallNode.sendRequest(paramMap, ctx);
626 if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
627 log.info("deleteSubscriptionAndSseConnection::Successfully unsubscribed");
628 stopNotifications(dev.deviceId());
629 subscribedDevicesTable.remove(dev.deviceId());
631 String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
633 subscriptionInfoMap.remove(id);
637 log.info("deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull");
639 } catch (SvcLogicException e) {
640 log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e);
643 log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
648 * Notifies providers about incoming RESTCONF notification events.
649 */public class GetChunksRunnable implements Runnable {
650 private String request;
651 private String mediaType;
652 private DeviceId deviceId;
654 private volatile boolean running = true;
656 public void terminate() {
657 log.info("GetChunksRunnable.terminate()::threadID: {}",
658 Thread.currentThread().getId());
663 * @param request request
664 * @param mediaType media type
665 * @param deviceId device identifier
667 public GetChunksRunnable(String request, String mediaType,
669 this.request = request;
670 this.mediaType = mediaType;
671 this.deviceId = deviceId;
676 log.trace("GetChunksRunnable.run()::threadID is: {} ...., running is: {}",
677 Thread.currentThread().getId(), running);
679 Client client = ClientBuilder.newBuilder()
680 .register(SseFeature.class).build();
681 WebTarget target = client.target(getUrlString(deviceId, request));
682 log.trace("GetChunksRunnable.run()::target URI is {}", target.getUri().toString());
683 Response response = target.request().get();
684 EventInput eventInput = response.readEntity(EventInput.class);
685 log.trace("GetChunksRunnable.run()::after eventInput");
686 String rcvdData = "";
687 while (!eventInput.isClosed() && running) {
688 log.trace("GetChunksRunnable.run()::inside while ...");
689 final InboundEvent inboundEvent = eventInput.read();
690 log.trace("GetChunksRunnable.run()::after eventInput.read() ...");
691 if (inboundEvent == null) {
692 // connection has been closed
693 log.info("GetChunksRunnable.run()::connection has been closed ...");
697 rcvdData = inboundEvent.readData(String.class);
698 BlockingQueue<String> eventQ = getEventQ(deviceId);
699 if (eventQ != null) {
700 eventQ.add(rcvdData);
701 eventQMap.put(deviceId, eventQ);
702 log.trace("GetChunksRunnable.run()::eventQ got filled.");
704 log.error("GetChunksRunnable.run()::eventQ has not been initialized for this device {}",
708 log.info("GetChunksRunnable.run()::running has changed to false while eventInput.read() " +
709 "was blocked to receive new notifications");
710 log.info("GetChunksRunnable.run()::the client is no longer interested to " +
711 "receive notifications.");
716 log.trace("GetChunksRunnable.run()::running is false! " +
717 "closing eventInput, threadID: {}", Thread.currentThread().getId());
721 log.info("GetChunksRunnable.run()::eventInput is closed in run()");
723 } catch (Exception ex) {
724 log.info("GetChunksRunnable.run()::We got some exception: {}, threadID: {} ", ex,
725 Thread.currentThread().getId());
727 log.trace("GetChunksRunnable.run()::after Runnable Try Catch. threadID: {} ",
728 Thread.currentThread().getId());
732 public class InternalRestconfEventProcessorRunnable implements Runnable {
734 private volatile boolean running = true;
735 private DeviceId deviceId;
737 public InternalRestconfEventProcessorRunnable(DeviceId deviceId) {
738 this.deviceId = deviceId;
741 public void terminate() {
742 log.info("InternalRestconfEventProcessorRunnable.terminate()::threadID: {}",
743 Thread.currentThread().getId());
749 log.trace("InternalRestconfEventProcessorRunnable::restconf event processor runnable inside run()");
752 if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) {
753 log.trace("InternalRestconfEventProcessorRunnable::waiting for take()");
755 String eventJsonString = eventQMap.get(deviceId).take();
756 log.trace("InternalRestconfEventProcessorRunnable::after take()");
757 log.info("InternalRestconfEventProcessorRunnable::eventJsonString is {}", eventJsonString);
758 Map<String, String> param = convertToProperties(eventJsonString);
759 String idString = param.get("push-change-update.subscription-id");
760 SubscriptionInfo info = subscriptionInfoMap().get(idString);
762 SvcLogicContext ctx = setContext(param);
763 SvcLogicGraphInfo callbackDG = info.callBackDG();
764 callbackDG.executeGraph(ctx);
767 log.info("InternalRestconfEventProcessorRunnable.run()::running has changed to false " +
768 "while eventQ was blocked to process new notifications");
769 log.info("InternalRestconfEventProcessorRunnable.run()::" +
770 "the client is no longer interested to receive notifications.");
774 } catch (InterruptedException | SvcLogicException e) {
779 private SvcLogicContext setContext(Map<String, String> param) {
780 SvcLogicContext ctx = new SvcLogicContext();
781 for (Map.Entry<String, String> entry : param.entrySet()) {
782 ctx.setAttribute(entry.getKey(), entry.getValue());
788 public String discoverRootResource(DeviceId device) {
789 return ROOT_RESOURCE;
793 public void addNotificationListener(DeviceId deviceId,
794 RestconfNotificationEventListener listener) {
795 Set<RestconfNotificationEventListener> listeners =
796 restconfNotificationListenerMap.get(deviceId);
797 if (listeners == null) {
798 listeners = new HashSet<>();
801 listeners.add(listener);
803 this.restconfNotificationListenerMap.put(deviceId, listeners);
807 public void removeNotificationListener(DeviceId deviceId,
808 RestconfNotificationEventListener listener) {
809 Set<RestconfNotificationEventListener> listeners =
810 restconfNotificationListenerMap.get(deviceId);
811 if (listeners != null) {
812 listeners.remove(listener);
816 public boolean isNotificationEnabled(DeviceId deviceId) {
817 return runnableTable.containsKey(deviceId);