Merge "SDN-R ws notifications"
[ccsdk/features.git] / sdnr / wt / devicemanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / devicemanager / impl / DeviceManagerImpl.java
index f0efe7a..97595dd 100644 (file)
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.aaiconnector.impl.AaiProviderClient;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.archiveservice.ArchiveCleanService;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database.HtDatabaseNode;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementFactory;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
@@ -71,6 +72,8 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 
 @SuppressWarnings("deprecation")
 public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, ResyncNetworkElementsListener {
@@ -92,6 +95,7 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
     private RpcProviderRegistry rpcProviderRegistry = null;
     @SuppressWarnings("unused")
     private NotificationPublishService notificationPublishService = null;
+    private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
 
     private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations =
             new ConcurrentHashMap<>();
@@ -116,6 +120,9 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
     private Thread threadDoClearCurrentFaultByNodename = null;
     private int refreshCounter = 0;
     private AkkaConfig akkaConfig;
+    private ArchiveCleanService archiveCleanService;
+    @SuppressWarnings("unused")
+       private ClusterSingletonServiceRegistration cssRegistration;
 
     // Blueprint 1
     public DeviceManagerImpl() {
@@ -137,7 +144,9 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
     public void setMountPointService(MountPointService mountPointService) {
         this.mountPointService = mountPointService;
     }
-
+    public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
+       this.clusterSingletonServiceProvider = clusterSingletonService;
+    }
     public void init() {
 
         LOG.info("Session Initiated start {}", APPLICATION_NAME);
@@ -146,11 +155,11 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
         this.rpcApiService = new DeviceManagerApiServiceImpl(rpcProviderRegistry);
         // Get configuration
         HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
-        this.akkaConfig = null;
         try {
             this.akkaConfig = AkkaConfig.load();
             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
         } catch (Exception e1) {
+            this.akkaConfig = null;
             LOG.warn("problem loading akka.conf: " + e1.getMessage());
         }
         GeoConfig geoConfig = null;
@@ -180,8 +189,7 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
             LOG.error("Can only run with local database. Stop initialization of devicemanager.");
         } else {
             // init Database Values only if singleNode or clusterMember=1
-            if (akkaConfig == null || akkaConfig.isSingleNode() || akkaConfig != null && akkaConfig.isCluster()
-                    && akkaConfig.getClusterConfig().getRoleMemberIndex() == 1) {
+            if (akkaConfig == null || akkaConfig.isClusterAndFirstNode()) {
                 // Create DB index if not existing and if database is running
                 try {
                     this.configService = new IndexConfigService(htDatabase);
@@ -192,6 +200,7 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
             }
             // start service for device maintenance service
             this.maintenanceService = new MaintenanceServiceImpl(htDatabase);
+
             // Websockets
             try {
                 this.webSocketService = new WebSocketServiceClientImpl2(rpcProviderRegistry);
@@ -210,14 +219,17 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
                 LOG.warn("No configuration available. Don't start event manager");
             } else {
                 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
+                //Make sure to start for one cluster node only
+                if (akkaConfig == null || akkaConfig.isClusterAndFirstNode() || akkaConfig.isSingleNode()) {
+                 }
 
                 String myDbKeyNameExtended = MYDBKEYNAMEBASE + "-" + dbConfig.getCluster();
 
-
                 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService,
                         databaseClientEvents, dcaeProviderClient, aotsMProvider, maintenanceService);
             }
-
+            this.archiveCleanService = new ArchiveCleanService(config, databaseClientEvents, mwtnService);
+            this.cssRegistration = this.clusterSingletonServiceProvider.registerClusterSingletonService(this.archiveCleanService);
             // PM
             PmConfig configurationPM = config.getPm();
             LOG.info("Performance manager configuration: {}", configurationPM);
@@ -275,7 +287,7 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
         close(maintenanceService);
         close(rpcApiService);
         close(notificationDelayService);
-
+        close(archiveCleanService);
         LOG.info("DeviceManagerImpl closing done");
     }
 
@@ -300,8 +312,11 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
 
     /**
      * For each mounted device a mountpoint is created and this listener is called.
+     * Mountpoint was created or existing. Managed device is now fully connected to node/mountpoint.
+     * @param action provide action
+     * @param nNodeId id of the mountpoint
+     * @param nNode mountpoint contents
      */
-    @Override
     public void startListenerOnNodeForConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
 
         String mountPointNodeName = nNodeId.getValue();
@@ -384,7 +399,8 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
         ne.initSynchronizationExtension();
 
         // Setup Service that monitors registration/ deregistration of session
-        odlEventListener.registration(mountPointNodeName);
+        ConnectionStatus csts = nNode.getConnectionStatus();
+        sendCreateOrUpdateNotification(mountPointNodeName, action, csts);
 
         if (aaiProviderClient != null) {
             aaiProviderClient.onDeviceRegistered(mountPointNodeName);
@@ -399,52 +415,121 @@ public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, R
         LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
     }
 
-    @Override
-    public void enterNonConnectedState(NodeId nNodeId, NetconfNode nNode) {
+    /**
+     * Mountpoint created or existing. Managed device is actually disconnected from node/ mountpoint.
+     * Origin state: Connecting, Connected
+     * Target state: are UnableToConnect or Connecting
+     * @param action create or update
+     * @param nNodeId id of the mountpoint
+     * @param nNode mountpoint contents
+     */
+    public void enterNonConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
+        String mountPointNodeName = nNodeId.getValue();
+        ConnectionStatus csts = nNode.getConnectionStatus();
+
+        sendCreateOrUpdateNotification(mountPointNodeName, action, csts);
+
+        // Handling if mountpoint exist. connected -> connecting/UnableToConnect
+        stopListenerOnNodeForConnectedState(mountPointNodeName);
+
+        if (deviceMonitor != null) {
+            deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
+        }
+
+    }
+
+    /**
+     * Mountpoint removed indication.
+     * @param nNodeId id of the mountpoint
+     */
+    public void removeMountpointState(NodeId nNodeId) {
         String mountPointNodeName = nNodeId.getValue();
-        LOG.info("enter Non ConnectedState for device :: Name : {}", mountPointNodeName);
+        LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
 
+        stopListenerOnNodeForConnectedState(mountPointNodeName);
+        deviceMonitor.removeMountpointIndication(mountPointNodeName);
+        if (odlEventListener != null) {
+            odlEventListener.deRegistration(mountPointNodeName);
+        }
+    }
+
+    /**
+     * Do all tasks necessary to move from mountpoint state connected -> connecting
+     * @param mountPointNodeName provided
+     * @param ne representing the device connected to mountpoint
+     */
+    private void stopListenerOnNodeForConnectedState( String mountPointNodeName) {
         ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
         if (ne != null) {
-            // Handling transition mountpoint connected -> connecting
             this.maintenanceService.deleteIfNotRequired(mountPointNodeName);
             int problems = ne.removeAllCurrentProblemsOfNode();
             LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
-            if (odlEventListener != null) {
-                odlEventListener.deRegistration(mountPointNodeName);
-            }
             if (performanceManager != null) {
                 performanceManager.deRegistration(mountPointNodeName);
             }
             if (aaiProviderClient != null) {
                 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
             }
-        } else {
-            // Handling -> create not connected mountpoint, or change other beside connected.
-            ConnectionStatus csts = nNode.getConnectionStatus();
-            if (csts != null) {
-                odlEventListener.updateRegistration(mountPointNodeName, csts.getClass().getSimpleName(), csts.getName());
-            } else {
-                LOG.info("Unknown connection status");
-            }
-        }
-        if (deviceMonitor != null) {
-            deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
         }
+    }
 
+    private void sendCreateOrUpdateNotification(String mountPointNodeName, Action action, ConnectionStatus csts) {
+        LOG.info("enter Non ConnectedState for device :: Name : {} Action {} ConnectionStatus {}", mountPointNodeName, action, csts);
+        if (action == Action.CREATE) {
+            odlEventListener.registration(mountPointNodeName);
+        } else {
+            odlEventListener.updateRegistration(mountPointNodeName, ConnectionStatus.class.getSimpleName(), csts != null ? csts.getName() : "null");
+        }
     }
 
+    /**
+     * Handle netconf/mountpoint changes
+     */
     @Override
-    public void removeMountpointState(NodeId nNodeId) {
-        String mountPointNodeName = nNodeId.getValue();
-        LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
-        deviceMonitor.removeMountpointIndication(mountPointNodeName);
+    public void netconfChangeHandler(Action action, @Nullable ConnectionStatus csts, NodeId nodeId, NetconfNode nnode) {
+        switch (action) {
+            case REMOVE:
+                removeMountpointState(nodeId); // Stop Monitor
+                //deviceManagerService.enterNonConnectedState(nodeId, nnode); // Remove Mountpoint handler
+                break;
+
+            case UPDATE:
+            case CREATE:
+                if (csts != null) {
+                    switch (csts) {
+                        case Connected: {
+                            startListenerOnNodeForConnectedState(action, nodeId, nnode);
+                            break;
+                        }
+                        case UnableToConnect:
+                        case Connecting: {
+                            enterNonConnectedState(action, nodeId, nnode);
+                            break;
+                        }
+                    }
+                } else {
+                    LOG.debug("NETCONF Node handled with null status for action", action);
+                }
+                break;
+        }
     }
 
     /*-------------------------------------------------------------------------------------------
      * Functions
      */
 
+    public ArchiveCleanService getArchiveCleanService() {
+        return this.archiveCleanService;
+    }
+
+    public HtDatabaseEventsService getDatabaseClientEvents() {
+        return databaseClientEvents;
+    }
+
+    public IndexMwtnService getMwtnService() {
+        return mwtnService;
+    }
+
     /**
      * Async RPC Interface implementation
      */