Bug fixing for the periodic pulling of Performance Monitoring (PM) data from 3rd... 60/131860/1
authorHesam Rahimi <hesam.rahimi@huawei.com>
Fri, 28 Oct 2022 22:18:52 +0000 (18:18 -0400)
committerHesam Rahimi <hesam.rahimi@huawei.com>
Fri, 28 Oct 2022 22:26:25 +0000 (22:26 +0000)
Issue-ID: CCSDK-3752
Signed-off-by: Hesam Rahimi <hesam.rahimi@huawei.com>
Change-Id: Ie5a0b65686a101e7416608fc10135d25a974aedd

plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java

index 220de87..909c643 100644 (file)
@@ -115,8 +115,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
     }
 
     public void deactivate() {
-        log.info("RESTCONF SBI Stopped");
-        executor.shutdown();
+        log.info("PeriodicDiscoveryNode::deactivate: Going to shutdown the executors.");
+        if (executor != null) {
+            executor.shutdown();
+        }
+        if (scheduledExecutor != null) {
+            scheduledExecutor.shutdown();
+        }
         this.getClientMap().clear();
         this.getDeviceMap().clear();
     }
@@ -131,19 +136,19 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
 
     @Override
     public Map<DeviceId, RestSBDevice> getDevices() {
-        log.trace("RESTCONF SBI::getDevices");
+        log.info("RESTCONF SBI::getDevices");
         return ImmutableMap.copyOf(deviceMap);
     }
 
     @Override
     public RestSBDevice getDevice(DeviceId deviceInfo) {
-        log.trace("RESTCONF SBI::getDevice with deviceId");
+        log.info("RESTCONF SBI::getDevice with deviceId");
         return deviceMap.get(deviceInfo);
     }
 
     @Override
     public RestSBDevice getDevice(String ip, int port) {
-        log.trace("RESTCONF SBI::getDevice with ip and port");
+        log.info("RESTCONF SBI::getDevice with ip and port");
         try {
             if (!deviceMap.isEmpty()) {
                 return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get();
@@ -156,7 +161,7 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
 
     @Override
     public void addDevice(RestSBDevice device) {
-        log.trace("RESTCONF SBI::addDevice");
+        log.info("RESTCONF SBI::addDevice");
         if (!deviceMap.containsKey(device.deviceId())) {
             if (device.username() != null) {
                 String username = device.username();
@@ -168,9 +173,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
             InternalPeriodicPullingProcessorRunnable eventProcessorRunnable =
                     new InternalPeriodicPullingProcessorRunnable(device.deviceId());
             processorRunnableTable.put(device.deviceId(), eventProcessorRunnable);
-            log.trace("addDevice::restconf event processor runnable is created and is going for execute");
+            log.info("addDevice::restconf event processor runnable is created and is going for execute");
+            if (executor.isShutdown()) {
+                log.info("PeriodicPulDiscoveryNode::addDevice - executor was shutdown. Restarting it.");
+                executor = Executors.newCachedThreadPool();
+            }
             executor.execute(eventProcessorRunnable);
-            log.trace("addDevice::restconf event processor runnable was sent for execute");
+            log.info("addDevice::restconf event processor runnable was sent for execute");
             deviceMap.put(device.deviceId(), device);
         } else {
             log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId());
@@ -179,12 +188,18 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
 
     @Override
     public void removeDevice(DeviceId deviceId) {
-        log.trace("RESTCONF SBI::removeDevice");
+        log.info("RESTCONF SBI::removeDevice");
         eventQMap.remove(deviceId);
         clientMap.remove(deviceId);
         deviceMap.remove(deviceId);
     }
 
+    @Override
+    public void enableNotifications(DeviceId device, String request, String mediaType,
+                                    RestconfNotificationEventListener callBackListener) {
+
+    }
+
     @Override
     public void establishSubscription(Map<String, String> paramMap,
                                       SvcLogicContext ctx) throws SvcLogicException {
@@ -208,6 +223,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
     @Override
     public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx)
             throws SvcLogicException {
+        log.info("establishSubscriptionOnly::Necessary 55 sec. delay for the hardware to finish creating the resource");
+        try {
+            Thread.sleep(55000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
         String subscriberId = paramMap.get(SUBSCRIBER_ID);
         if (subscriberId == null) {
             throw new SvcLogicException("Subscriber Id is null");
@@ -239,9 +260,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
             restapiCallNode.sendRequest(paramMap, ctx);
             if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
                 // TODO: save subscription id and subscriber in MYSQL
-                String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
+//                String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
+//                log.info("establishSubscriptionOnly::Subscription is done successfully and " +
+//                        "the output.identifier is: {}", id);
+                String id = dev.ip();
                 log.info("establishSubscriptionOnly::Subscription is done successfully and " +
-                        "the output.identifier is: {}", id);
+                        "the device ip is: {}", id);
                 log.info("establishSubscriptionOnly::The subscriptionID returned by the server " +
                         "does not exist in the map. Adding it now...");
                 subscribedDevicesTable.put(dev.deviceId(), id);
@@ -268,9 +292,44 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
 
     @Override
     public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
-        String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
-        if (id != null) {
-            subscriptionInfoMap.remove(id);
+//        String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+//        if (id != null) {
+//            subscriptionInfoMap.remove(id);
+//        }
+
+        String deleteSubscribeUrlString = paramMap.get(REST_API_URL);
+        URL deleteSubscribeUrl = null;
+        RestSBDevice dev = null;
+        try {
+            deleteSubscribeUrl = new URL(deleteSubscribeUrlString);
+            dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort());
+        } catch (MalformedURLException e) {
+            log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
+            return;
+        }
+
+        String deviceIp = deleteSubscribeUrl.getHost();
+        String devicePort = String.valueOf(deleteSubscribeUrl.getPort());
+        log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}",
+                deviceIp, devicePort);
+        if (dev == null) {
+            log.error("deleteSubscriptionAndSseConnection::device does not exist in the map");
+            return;
+        }
+        String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId());
+
+        if (subscriptionId != null) {
+            log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId);
+            log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request");
+            stopNotifications(dev.deviceId());
+            subscribedDevicesTable.remove(dev.deviceId());
+
+//            String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+            if (subscriptionId != null) {
+                subscriptionInfoMap.remove(subscriptionId);
+            }
+        } else {
+            log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
         }
     }
 
@@ -460,13 +519,16 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
 
         RestconfNotificationEventListenerImpl myListener =
                 new RestconfNotificationEventListenerImpl(info);
-        enableNotifications(dev.deviceId(), "ietf-service-pm:performance-monitoring", "json", myListener);
+        enableNotifications(dev.deviceId(),
+                "ietf-service-pm:performance-monitoring/service-pm=" + paramMap.get("ethServiceName"),
+                "json", myListener, paramMap, ctx);
+//        enableNotifications(dev.deviceId(), periodicPullUrlString,
+//                "json", myListener, paramMap, ctx);
     }
 
-    @Override
     public void enableNotifications(DeviceId device, String request,
                                     String mediaType,
-                                    RestconfNotificationEventListener listener) {
+                                    RestconfNotificationEventListener listener, Map<String, String> paramMap, SvcLogicContext ctx) {
         if (isNotificationEnabled(device)) {
             log.warn("enableNotifications::already enabled on device: {}", device);
             return;
@@ -477,9 +539,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
 
         addNotificationListener(device, listener);
 
-        PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device);
+        PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device, paramMap, ctx);
         periodicRunnableTable.put(device, periodicRunnable);
-        scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 60, TimeUnit.SECONDS);
+        if (scheduledExecutor.isShutdown()) {
+            log.info("PeriodicPulDiscoveryNode::enableNotifications - scheduledExecutor was shutdown. Restarting it.");
+            scheduledExecutor = Executors.newScheduledThreadPool(2);
+        }
+        scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 90, TimeUnit.SECONDS);
     }
 
     public void stopNotifications(DeviceId device) {
@@ -492,7 +558,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
         log.info("stopNotifications::Runnable is now terminated");
         periodicRunnableTable.remove(device);
         processorRunnableTable.remove(device);
-        log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
+        if (periodicRunnableTable.isEmpty()) {
+            log.info("stopNotifications::periodicRunnableTable is empty. Going to shutdown the executors");
+            this.deactivate();
+            log.info("stopNotifications::Executors are now shutdown.");
+        }
+        log.info("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
     }
 
     @Override
@@ -540,6 +611,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
             } catch (SvcLogicException e) {
                 log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e);
             }
+            stopNotifications(dev.deviceId());
+            subscribedDevicesTable.remove(dev.deviceId());
+
+            String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+            if (id != null) {
+                subscriptionInfoMap.remove(id);
+            }
         } else {
             log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
         }
@@ -548,6 +626,8 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
     public class PeriodicPullRunnable implements Runnable {
         private String request;
         private DeviceId deviceId;
+        private Map<String, String> paramMap;
+        private SvcLogicContext ctx;
 
         private volatile boolean running = true;
 
@@ -560,36 +640,60 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
         /**
          * @param request   request
          * @param deviceId    device identifier
+         * @param paramMap
+         * @param ctx
          */
-        public PeriodicPullRunnable(String request, DeviceId deviceId) {
+        public PeriodicPullRunnable(String request, DeviceId deviceId, Map<String, String> paramMap, SvcLogicContext ctx) {
             this.request = request;
             this.deviceId = deviceId;
+            this.paramMap = paramMap;
+            this.ctx = ctx;
         }
 
         @Override
         public void run() {
-            log.trace("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}",
+            Parameters p;
+            WebTarget target = null;
+
+            log.info("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}",
                     Thread.currentThread().getId(), running);
             try {
+//                    Client client = ClientBuilder.newBuilder().build();
+//                    WebTarget target = client.target(getUrlString(deviceId, request));
+
+
+                    log.info("PeriodicPullRunnable::sending periodic GET pm-data request to hardware");
+                    RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
+                    p = RestapiCallNode.getParameters(paramMap, new Parameters());
+    //                Client client =  ignoreSslClient(p.disableHostVerification).register(SseFeature.class);
                     Client client = ClientBuilder.newBuilder().build();
-                    WebTarget target = client.target(getUrlString(deviceId, request));
-                    log.trace("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString());
+                    target = restapi.addAuthType(client, p).target(getUrlString(deviceId, request));
+//                    target = restapi.addAuthType(client, p).target(request);
+
+
+                    log.info("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString());
                     Response response = null;
                     if (running) {
                         response = target.request().get();
                         String rcvdData = response.readEntity(String.class);
-                        log.trace("PeriodicPullRunnable.run()::after readEntity");
-                        BlockingQueue<String> eventQ = getEventQ(deviceId);
-                        if (eventQ != null) {
-                            eventQ.add(rcvdData);
-                            eventQMap.put(deviceId, eventQ);
-                            log.trace("PeriodicPullRunnable.run()::eventQ got filled.");
+                        if (response.getStatus() == 200) {
+                            log.info("PeriodicPullRunnable.run()::after readEntity");
+                            BlockingQueue<String> eventQ = getEventQ(deviceId);
+                            if (eventQ != null) {
+                                eventQ.add(rcvdData);
+                                eventQMap.put(deviceId, eventQ);
+                                log.info("PeriodicPullRunnable.run()::eventQ got filled.");
+                            } else {
+                                log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}",
+                                        deviceId);
+                            }
                         } else {
-                            log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}",
-                                    deviceId);
+                            log.info("PeriodicPullRunnable.run():: GET pm-data did NOT return 200: {}", rcvdData);
+                            log.info("PeriodicPullRunnable.run():: Status code is: {}", response.getStatus());
+                            log.info("PeriodicPullRunnable.run():: response is: {}", response.toString());
                         }
                     } else {
-                        log.trace("PeriodicPullRunnable.run()::running is false! " +
+                        log.info("PeriodicPullRunnable.run()::running is false! " +
                                 "closing the client and the response, threadID: {}", Thread.currentThread().getId());
                         response.close();
                         client.close();
@@ -598,10 +702,40 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
             } catch (Exception ex) {
                 log.info("PeriodicPullRunnable.run()::We got some exception: {}, threadID: {} ", ex,
                         Thread.currentThread().getId());
+                executor.shutdown();
+                scheduledExecutor.shutdown();
+                log.info("PeriodicPullRunnable.run():: exceptions happened. So shutting down the executors");
             }
-            log.trace("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ",
+            log.info("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ",
                     Thread.currentThread().getId());
         }
+
+        private Client ignoreSslClient(boolean disableHostVerification) {
+            SSLContext sslcontext = null;
+
+            try {
+                sslcontext = SSLContext.getInstance("TLS");
+                sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
+                    @Override
+                    public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+                    }
+
+                    @Override
+                    public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+                    }
+
+                    @Override
+                    public X509Certificate[] getAcceptedIssuers() {
+                        return new X509Certificate[0];
+                    }
+                } }, new java.security.SecureRandom());
+            } catch (NoSuchAlgorithmException | KeyManagementException e) {
+                throw new IllegalStateException(e);
+            }
+
+            return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier(new AcceptIpAddressHostNameVerifier(disableHostVerification)).build();
+        }
+
     }
 
     public class InternalPeriodicPullingProcessorRunnable implements Runnable {
@@ -621,22 +755,26 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
 
         @Override
         public void run() {
-            log.trace("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()");
+            log.info("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()");
             while (running) {
                 try {
                     if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) {
-                        log.trace("InternalPeriodicPullingProcessorRunnable::waiting for take()");
+                        log.info("InternalPeriodicPullingProcessorRunnable::waiting for take()");
                         if (running) {
                             String eventJsonString = eventQMap.get(deviceId).take();
-                            log.trace("InternalPeriodicPullingProcessorRunnable::after take()");
+                            log.info("InternalPeriodicPullingProcessorRunnable::after take()");
                             log.info("InternalPeriodicPullingProcessorRunnable::eventJsonString is {}", eventJsonString);
                             Map<String, String> param = convertToProperties(eventJsonString);
-                            String idString = param.get("push-change-update.subscription-id");
+//                            String idString = param.get("push-change-update.subscription-id");
+                            String idString = getSubscriptionIdFromDeviceId(deviceId);
+                            log.info("InternalPeriodicPullingProcessorRunnable::idString is {}", idString);
                             SubscriptionInfo info = subscriptionInfoMap().get(idString);
                             if (info != null) {
+                                log.info("InternalPeriodicPullingProcessorRunnable::subscriptionInfo is not null; going to call the callback dg");
                                 SvcLogicContext ctx = setContext(param);
                                 SvcLogicGraphInfo callbackDG = info.callBackDG();
                                 callbackDG.executeGraph(ctx);
+                                log.info("InternalPeriodicPullingProcessorRunnable::The callback dg is called");
                             }
                         } else {
                             log.info("InternalPeriodicPullingProcessorRunnable.run()::running has changed to false " +