Change log levels to reduce the amount of logs generated
[ccsdk/features.git] / sdnr / wt / netconfnode-state-service / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / netconfnodestateservice / impl / NetconfNodeStateServiceImpl.java
index c190346..1b1676c 100644 (file)
@@ -19,20 +19,26 @@ package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import javax.annotation.Nullable;
-
 import org.eclipse.jdt.annotation.NonNull;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
+import org.onap.ccsdk.features.sdnr.wt.common.threading.GenericRunnableFactory;
+import org.onap.ccsdk.features.sdnr.wt.common.threading.KeyBasedThreadpool;
 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.IEntityDataProvider;
 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.StatusChangedHandler.StatusKey;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
-import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.TransactionUtils;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.VesNotificationListener;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorManager;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfCommunicatorManager;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContext;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.NetconfStateConfig;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.AkkaConfig;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.ClusterConfig;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlGeo.GeoConfig;
@@ -45,11 +51,13 @@ import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationTyp
 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
 import org.opendaylight.mdsal.binding.api.DataTreeModification;
-import org.opendaylight.mdsal.binding.api.MountPoint;
 import org.opendaylight.mdsal.binding.api.MountPointService;
 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
@@ -64,47 +72,48 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.parser.api.YangParserFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, RpcApigetStateCallback, AutoCloseable {
+public class NetconfNodeStateServiceImpl
+        implements NetconfNodeStateService, RpcApigetStateCallback, AutoCloseable, IConfigChangedListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeStateServiceImpl.class);
     private static final String APPLICATION_NAME = "NetconfNodeStateService";
-    @SuppressWarnings("unused")
     private static final String CONFIGURATIONFILE = "etc/netconfnode-status-service.properties";
 
-
-    @SuppressWarnings("null")
     private static final @NonNull InstanceIdentifier<Topology> NETCONF_TOPO_IID =
             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
 
-    @SuppressWarnings("null")
     private static final @NonNull InstanceIdentifier<Node> NETCONF_NODE_TOPO_IID =
             InstanceIdentifier.create(NetworkTopology.class)
                     .child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())))
                     .child(Node.class);
 
-    private static final DataTreeIdentifier<Node> NETCONF_NODE_TOPO_TREE_ID =
+    private static final @NonNull DataTreeIdentifier<Node> NETCONF_NODE_TOPO_TREE_ID =
             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
 
     // Name of ODL controller NETCONF instance
     private static final NodeId CONTROLLER = new NodeId("controller-config");
-    private static final TransactionUtils TRANSACTIONUTILS = new GenericTransactionUtils();
+    private static final int ASYNC_EXECUTION_POOLSIZE = 20;
 
     // -- OSGi services, provided
     private DataBroker dataBroker;
+    private DOMDataBroker domDataBroker;
     private MountPointService mountPointService;
+    private DOMMountPointService domMountPointService;
     private RpcProviderService rpcProviderRegistry;
     private IEntityDataProvider iEntityDataProvider;
     @SuppressWarnings("unused")
     private NotificationPublishService notificationPublishService;
     @SuppressWarnings("unused")
     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+    private YangParserFactory yangParserFactory;
+    private BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
 
     // -- Parameter
     private ListenerRegistration<L1> listenerL1;
@@ -117,6 +126,9 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
     /** Indication if init() function called and fully executed **/
     private Boolean initializationSuccessful;
 
+    /** Manager accessor objects for connection **/
+    private NetconfAccessorManager accessorManager;
+
     /** List of all registered listeners **/
     private final List<NetconfNodeConnectListener> netconfNodeConnectListenerList;
 
@@ -132,15 +144,30 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
     /** Indicates the name of the cluster **/
     private String clusterName;
 
+    /** nodeId to threadPool (size=1) for datatreechange handling) **/
+    //    private final Map<String, ExecutorService> handlingPool;
+    private KeyBasedThreadpool<NodeId, NetconfChangeDataHolder> handlingPool;
+
+    private boolean handleDataTreeAsync;
+
+    private ConfigurationFileRepresentation configFileRepresentation;
+    private NetconfStateConfig config;
+    private NetconfCommunicatorManager netconfCommunicatorManager;
+    private DomContext domContext;
+
     /** Blueprint **/
     public NetconfNodeStateServiceImpl() {
         LOG.info("Creating provider for {}", APPLICATION_NAME);
 
         this.dataBroker = null;
+        this.domDataBroker = null;
         this.mountPointService = null;
+        this.domMountPointService = null;
         this.rpcProviderRegistry = null;
         this.notificationPublishService = null;
         this.clusterSingletonServiceProvider = null;
+        this.yangParserFactory = null;
+        this.domContext = null;
 
         this.listenerL1 = null;
         this.listenerL2 = null;
@@ -148,12 +175,17 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
         this.netconfNodeConnectListenerList = new CopyOnWriteArrayList<>();
         this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>();
         this.vesNotificationListenerList = new CopyOnWriteArrayList<>();
+        this.accessorManager = null;
+        this.handlingPool = null;
     }
-
     public void setDataBroker(DataBroker dataBroker) {
         this.dataBroker = dataBroker;
     }
 
+    public void setDomDataBroker(DOMDataBroker domDataBroker) {
+        this.domDataBroker = domDataBroker;
+    }
+
     public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
         this.rpcProviderRegistry = rpcProviderRegistry;
     }
@@ -166,6 +198,10 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
         this.mountPointService = mountPointService;
     }
 
+    public void setDomMountPointService(DOMMountPointService domMountPointService) {
+        this.domMountPointService = domMountPointService;
+    }
+
     public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
         this.clusterSingletonServiceProvider = clusterSingletonService;
     }
@@ -174,15 +210,34 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
         this.iEntityDataProvider = iEntityDataProvider;
     }
 
-    /** Blueprint initialization **/
+    public void setYangParserFactory(YangParserFactory yangParserFactory) {
+        this.yangParserFactory = yangParserFactory;
+    }
+
+    public void setBindingNormalizedNodeSerializer(BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer) {
+        this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
+    }
+
+    /**
+     * Blueprint initialization
+     *
+     * @throws YangParserException
+     **/
     public void init() {
 
         LOG.info("Session Initiated start {}", APPLICATION_NAME);
-
+        this.domContext = new DomContext(this.yangParserFactory, this.bindingNormalizedNodeSerializer);
+        this.netconfCommunicatorManager =
+                new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext);
+        this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext, this);
         // Start RPC Service
         this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList);
         // Get configuration
-        // ConfigurationFileRepresentation config = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
+        this.configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
+        this.config = new NetconfStateConfig(this.configFileRepresentation);
+        this.handleDataTreeAsync = this.config.handleAsync();
+        this.configFileRepresentation.registerConfigChangedListener(this);
+
         // Akka setup
         AkkaConfig akkaConfig = getAkkaConfig();
         this.isCluster = akkaConfig == null ? false : akkaConfig.isCluster();
@@ -203,7 +258,19 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
 
         listenerL1 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1());
         listenerL2 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L2());
-
+        this.handlingPool = new KeyBasedThreadpool<NodeId, NetconfChangeDataHolder>(this.config.getAsyncHandlingPoolsize(), 1,
+                new GenericRunnableFactory<>() {
+                    public Runnable create(final NodeId key, final NetconfChangeDataHolder arg) {
+                        return new Runnable() {
+
+                            @Override
+                            public void run() {
+                                NetconfNodeStateServiceImpl.this.handleDataTreeChange(arg.root, key,
+                                        arg.modificationTyp);
+                            }
+                        };
+                    };
+                });
         this.initializationSuccessful = true;
 
         LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful);
@@ -215,13 +282,20 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
         close();
     }
 
-    /**
-     * Getter
-     * 
-     * @return NetconfnodeStateServiceRpcApiImpl
-     */
+    public DomContext getDomContext() {
+        return Objects.requireNonNull(domContext, "Initialization not completed for domContext");
+    }
+
+    public DataBroker getDataBroker() {
+        return dataBroker;
+    }
+
+    public DOMDataBroker getDOMDataBroker() {
+        return domDataBroker;
+    }
+
     public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() {
-        return rpcApiService;
+        return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService");
     }
 
     @Override
@@ -232,7 +306,7 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
     @Override
     public <L extends NetconfNodeConnectListener> @NonNull ListenerRegistration<L> registerNetconfNodeConnectListener(
             final @NonNull L netconfNodeConnectListener) {
-        LOG.info("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
+        LOG.debug("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
         netconfNodeConnectListenerList.add(netconfNodeConnectListener);
 
         return new ListenerRegistration<L>() {
@@ -243,7 +317,7 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
 
             @Override
             public void close() {
-                LOG.info("Remove connect listener {}", netconfNodeConnectListener);
+                LOG.debug("Remove connect listener {}", netconfNodeConnectListener);
                 netconfNodeConnectListenerList.remove(netconfNodeConnectListener);
             }
         };
@@ -252,7 +326,7 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
     @Override
     public <L extends NetconfNodeStateListener> @NonNull ListenerRegistration<L> registerNetconfNodeStateListener(
             @NonNull L netconfNodeStateListener) {
-        LOG.info("Register state listener {}", netconfNodeStateListener.getClass().getName());
+        LOG.debug("Register state listener {}", netconfNodeStateListener.getClass().getName());
         netconfNodeStateListenerList.add(netconfNodeStateListener);
 
         return new ListenerRegistration<L>() {
@@ -263,7 +337,7 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
 
             @Override
             public void close() {
-                LOG.info("Remove state listener {}", netconfNodeStateListener);
+                LOG.debug("Remove state listener {}", netconfNodeStateListener);
                 netconfNodeStateListenerList.remove(netconfNodeStateListener);
             }
         };
@@ -272,7 +346,7 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
     @Override
     public <L extends VesNotificationListener> @NonNull ListenerRegistration<L> registerVesNotifications(
             @NonNull L vesNotificationListener) {
-        LOG.info("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
+        LOG.debug("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
         vesNotificationListenerList.add(vesNotificationListener);
 
         return new ListenerRegistration<L>() {
@@ -283,7 +357,7 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
 
             @Override
             public void close() {
-                LOG.info("Remove Ves notification listener {}", vesNotificationListener);
+                LOG.debug("Remove Ves notification listener {}", vesNotificationListener);
                 vesNotificationListenerList.remove(vesNotificationListener);
             }
         };
@@ -312,11 +386,12 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
                 element.close();
             }
         }
+        this.configFileRepresentation.unregisterConfigChangedListener(this);
     }
 
     /**
      * Indication if init() of this bundle successfully done.
-     * 
+     *
      * @return true if init() was successful. False if not done or not successful.
      */
     public boolean isInitializationSuccessful() {
@@ -330,14 +405,14 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
     /**
      * For each mounted device a mountpoint is created and this listener is called. Mountpoint was created or existing.
      * Managed device is now fully connected to node/mountpoint.
-     * 
+     *
      * @param nNodeId id of the mountpoint
      * @param netconfNode mountpoint contents
      */
     private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) {
 
         String mountPointNodeName = nNodeId.getValue();
-        LOG.info("Access connected state for mountpoint {}", mountPointNodeName);
+        LOG.debug("Access connected state for mountpoint {}", mountPointNodeName);
 
         boolean preConditionMissing = false;
         if (mountPointService == null) {
@@ -353,185 +428,171 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
         }
 
         boolean isNetconfNodeMaster = isNetconfNodeMaster(netconfNode);
-        LOG.info("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
+        LOG.debug("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
         if (isNetconfNodeMaster) {
-
-            InstanceIdentifier<Node> instanceIdentifier =
-                    NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountPointNodeName)));
-
-            Optional<MountPoint> optionalMountPoint = null;
-            int timeout = 10000;
-            while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent()
-                    && timeout > 0) {
-                LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
-                sleepMs(1000);
-                timeout -= 1000;
-            }
-
-            if (!optionalMountPoint.isPresent()) {
-                LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
-                        mountPointNodeName);
-            } else {
-                // Mountpoint is present for sure
-                MountPoint mountPoint = optionalMountPoint.get();
-                // BindingDOMDataBrokerAdapter.BUILDER_FACTORY;
-                LOG.info("Mountpoint with id: {}", mountPoint.getIdentifier());
-
-                Optional<DataBroker> optionalNetconfNodeDatabroker = mountPoint.getService(DataBroker.class);
-
-                if (!optionalNetconfNodeDatabroker.isPresent()) {
-                    LOG.info("Slave mountpoint {} without databroker", mountPointNodeName);
-                } else {
-                    LOG.info("Master mountpoint {}", mountPointNodeName);
-                    DataBroker netconfNodeDataBroker = optionalNetconfNodeDatabroker.get();
-                    NetconfAccessor acessor = new NetconfAccessorImpl(nNodeId, netconfNode, netconfNodeDataBroker,
-                            mountPoint, TRANSACTIONUTILS);
-                    /*
-                     * --> Call Listers for onConnect() Indication
-                       for (all)
-                     */
-                    netconfNodeConnectListenerList.forEach(item -> {
-                        try {
-                            item.onEnterConnected(acessor);
-                        } catch (Exception e) {
-                            LOG.info("Exception during onEnterConnected listener call", e);
-                        }
-                    });
-
-                    LOG.info("Connect indication forwarded for {}", mountPointNodeName);
+            NetconfAccessor acessor = accessorManager.getAccessor(nNodeId, netconfNode);
+            /*
+             * --> Call Listers for onConnect() Indication
+               for (all)
+             */
+            netconfNodeConnectListenerList.forEach(item -> {
+                try {
+                    item.onEnterConnected(acessor);
+                } catch (Exception e) {
+                    LOG.debug("Exception during onEnterConnected listener call", e);
                 }
-            }
+            });
+
+            LOG.debug("Connect indication forwarded for {}", mountPointNodeName);
         }
     }
 
     /**
      * Leave the connected status to a non connected or removed status for master mountpoint
-     * 
+     *
      * @param action that occurred
      * @param nNodeId id of the mountpoint
      * @param netconfNode mountpoint contents or not available on remove
      */
     private void leaveConnectedState(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
         String mountPointNodeName = nNodeId.getValue();
-        LOG.info("netconfNode id {}", mountPointNodeName);
-
-        InstanceIdentifier<Node> instanceIdentifier =
-                NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountPointNodeName)));
-        Optional<MountPoint> optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier);
-        if (optionalMountPoint.isPresent()) {
-            Optional<DataBroker> optionalNetconfNodeDatabroker = optionalMountPoint.get().getService(DataBroker.class);
-            if (optionalNetconfNodeDatabroker.isPresent()) {
-                LOG.info("Master mountpoint {}", mountPointNodeName);
-                netconfNodeConnectListenerList.forEach(item -> {
-                    try {
-                        if (item != null) {
-                            item.onLeaveConnected(nNodeId, optionalNetconfNode);
-                        } else {
-                            LOG.warn("Unexpeced null item during onleave");
+        LOG.debug("leaveConnectedState id {}", mountPointNodeName);
+
+        if (this.accessorManager.containes(nNodeId)) {
+            netconfNodeConnectListenerList.forEach(item -> {
+                try {
+                    if (item != null) {
+                        item.onLeaveConnected(nNodeId, optionalNetconfNode);
+                    } else {
+                        LOG.warn("Unexpeced null item during onleave");
+                    }
+                } catch (Exception e) {
+                    LOG.debug("Exception during onLeaveConnected listener call", e);
+                }
+            });
+            LOG.debug("Remove Master mountpoint {}", mountPointNodeName);
+            this.accessorManager.removeAccessor(nNodeId);
+        } else {
+            LOG.debug("Master mountpoint already removed {}", mountPointNodeName);
+        }
+    }
+
+    // ---- onDataTreeChangedHandler
+
+    private void handleDataTreeChange(DataObjectModification<Node> root, NodeId nodeId,
+            ModificationType modificationTyp) {
+        // Move status into boolean flags for
+        boolean connectedBefore, connectedAfter, created;
+        NetconfNode nNodeAfter = getNetconfNode(root.getDataAfter());
+        connectedAfter = isConnected(nNodeAfter);
+        if (root.getDataBefore() != null) {
+            // It is an update or delete
+            NetconfNode nodeBefore = getNetconfNode(root.getDataBefore());
+            connectedBefore = isConnected(nodeBefore);
+            created = false;
+        } else {
+            // It is a create
+            connectedBefore = false;
+            created = true;
+        }
+        LOG.debug("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId,
+                modificationTyp, created, connectedBefore, connectedAfter, isCluster,
+                getClusteredConnectionStatus(nNodeAfter));
+        switch (modificationTyp) {
+            case SUBTREE_MODIFIED: // Create or modify sub level node
+            case WRITE: // Create or modify top level node
+                // Treat an overwrite as an update
+                // leaveConnected state.before = connected; state.after != connected
+                // enterConnected state.after == connected
+                // => Here create or update by checking root.getDataBefore() != null
+                boolean handled = false;
+                if (created) {
+                    handled = true;
+                    netconfNodeStateListenerList.forEach(item -> {
+                        try {
+                            item.onCreated(nodeId, nNodeAfter);
+                        } catch (Exception e) {
+                            LOG.info("Exception during onCreated listener call", e);
                         }
+                    });
+                }
+                if (!connectedBefore && connectedAfter) {
+                    handled = true;
+                    enterConnectedState(nodeId, nNodeAfter);
+                }
+                if (connectedBefore && !connectedAfter) {
+                    handled = true;
+                    leaveConnectedState(nodeId, Optional.of(nNodeAfter));
+                }
+                if (!handled) {
+                    //Change if not handled by the messages before
+                    netconfNodeStateListenerList.forEach(item -> {
+                        try {
+                            item.onStateChange(nodeId, nNodeAfter);
+                        } catch (Exception e) {
+                            LOG.info("Exception during onStateChange listener call", e);
+                        }
+                    });
+                }
+                // doProcessing(update ? Action.UPDATE : Action.CREATE, nodeId, root);
+                break;
+            case DELETE:
+                // Node removed
+                // leaveconnected state.before = connected;
+                if (!connectedBefore) {
+                    leaveConnectedState(nodeId, Optional.empty());
+                }
+                netconfNodeStateListenerList.forEach(item -> {
+                    try {
+                        item.onRemoved(nodeId);
                     } catch (Exception e) {
-                        LOG.info("Exception during onLeaveConnected listener call", e);
+                        LOG.info("Exception during onRemoved listener call", e);
                     }
                 });
-            }
+                // doProcessing(Action.REMOVE, nodeId, root);
+                break;
         }
     }
 
-    // ---- onDataTreeChangedHandler
-
     private void onDataTreeChangedHandler(@NonNull Collection<DataTreeModification<Node>> changes) {
         for (final DataTreeModification<Node> change : changes) {
 
             final DataObjectModification<Node> root = change.getRootNode();
-            //if (LOG.isTraceEnabled()) {
-            LOG.info /*trace*/("Handle this modificationType:{} path:{} root:{}", root.getModificationType(),
-                    change.getRootPath(), root);
-            //}
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Handle this modificationType:{} path:{} root:{}", root.getModificationType(),
+                        change.getRootPath(), root);
+            }
 
             // Catch potential nullpointer exceptions ..
             try {
                 ModificationType modificationTyp = root.getModificationType();
                 Node node = modificationTyp == ModificationType.DELETE ? root.getDataBefore() : root.getDataAfter();
                 NodeId nodeId = node != null ? node.getNodeId() : null;
-                if (nodeId != null) {
+                if (nodeId == null) {
+                    LOG.warn("L1 without nodeid.");
+                } else {
                     if (nodeId.equals(CONTROLLER)) {
                         // Do not forward any controller related events to devicemanager
                         LOG.debug("Stop processing for [{}]", nodeId);
                     } else {
-                        if (modificationTyp != null) {
-                            switch (modificationTyp) {
-                                case SUBTREE_MODIFIED: // Create or modify sub level node
-                                case WRITE: // Create or modify top level node
-                                    // Treat an overwrite as an update
-                                    // leaveconnected state.before = connected; state.after != connected
-                                    // enterConnected state.after == connected
-                                    // => Here create or update by checking root.getDataBefore() != null
-
-                                    boolean connectedBefore, connectedAfter, created = false;
-                                    NetconfNode nNodeAfter = getNetconfNode(root.getDataAfter());
-                                    connectedAfter = isConnected(nNodeAfter);
-                                    if (root.getDataBefore() != null) {
-                                        // It is an update
-                                        NetconfNode nodeBefore = getNetconfNode(root.getDataBefore());
-                                        connectedBefore = isConnected(nodeBefore);
-                                    } else {
-                                        // It is a create
-                                        connectedBefore = false;
-                                        created = true;
-                                    }
-
-                                    LOG.info(
-                                            "L1 NETCONF Node change with id:{} ConnectedBefore:{} connectedAfter {}:cluster status:{} akkaIsCluster:{}",
-                                            nodeId, connectedBefore, connectedAfter,
-                                            getClusteredConnectionStatus(nNodeAfter), isCluster);
-
-                                    if (created) {
-                                        netconfNodeStateListenerList.forEach(item -> {
-                                            try {
-                                                item.onCreated(nodeId, nNodeAfter);
-                                            } catch (Exception e) {
-                                                LOG.info("Exception during onCreated listener call", e);
-                                            }
-                                        });
-                                    }
-                                    if (!connectedBefore && connectedAfter) {
-                                        enterConnectedState(nodeId, nNodeAfter);
-                                    } else {
-                                        LOG.debug("State change {} {}", connectedBefore, connectedAfter);
-                                        if (connectedBefore && !connectedAfter) {
-                                            leaveConnectedState(nodeId, Optional.of(nNodeAfter));
-                                        }
-                                        netconfNodeStateListenerList.forEach(item -> {
-                                            try {
-                                                item.onStateChange(nodeId, nNodeAfter);
-                                            } catch (Exception e) {
-                                                LOG.info("Exception during onStateChange listener call", e);
-                                            }
-                                        });
-                                    }
-                                    // doProcessing(update ? Action.UPDATE : Action.CREATE, nodeId, root);
-                                    break;
-                                case DELETE:
-                                    // Node removed
-                                    // leaveconnected state.before = connected;
-                                    leaveConnectedState(nodeId, Optional.empty());
-                                    netconfNodeStateListenerList.forEach(item -> {
-                                        try {
-                                            item.onRemoved(nodeId);
-                                        } catch (Exception e) {
-                                            LOG.info("Exception during onRemoved listener call", e);
-                                        }
-                                    });
-                                    // doProcessing(Action.REMOVE, nodeId, root);
-                                    break;
+                        if (modificationTyp == null) {
+                            LOG.warn("L1 empty modification type");
+                        } else {
+                            LOG.trace("handle data tree change with async={}",this.handleDataTreeAsync);
+                            if (this.handleDataTreeAsync) {
+                                this.handlingPool.execute(nodeId, new NetconfChangeDataHolder(root, modificationTyp));
+
+                            } else {
+                                handleDataTreeChange(root, nodeId, modificationTyp);
                             }
                         }
                     }
                 }
-            } catch (NullPointerException e) {
-                LOG.info("Data not available at ", e);
+            } catch (NullPointerException | IllegalStateException e) {
+                LOG.debug("Data not available at ", e);
             }
         } //for
+        LOG.debug("datatreechanged handler completed");
     }
 
     // ---- subclasses for listeners
@@ -542,9 +603,10 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
     private class L1 implements ClusteredDataTreeChangeListener<Node> {
         @Override
         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
-            LOG.info("L1 TreeChange enter changes:{}", changes.size());
-            new Thread(() -> onDataTreeChangedHandler(changes)).start();
-            LOG.info("L1 TreeChange leave");
+            LOG.debug("L1 TreeChange enter changes:{}", changes.size());
+            //Debug AkkTimeout NetconfNodeStateServiceImpl.this.pool.execute(new Thread( () -> onDataTreeChangedHandler(changes)));
+            onDataTreeChangedHandler(changes);
+            LOG.debug("L1 TreeChange leave");
         }
     }
 
@@ -555,9 +617,9 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
 
         @Override
         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
-            LOG.info("L2 TreeChange enter changes:{}", changes.size());
+            LOG.debug("L2 TreeChange enter changes:{}", changes.size());
             // Do nothing
-            LOG.info("L2 TreeChange leave");
+            LOG.debug("L2 TreeChange leave");
         }
     }
 
@@ -608,12 +670,11 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
         if (this.isCluster) {
             LOG.debug("check if me is responsible for node");
             ClusteredConnectionStatus ccs = nNode.getClusteredConnectionStatus();
-            @SuppressWarnings("null")
             @NonNull
             String masterNodeName =
                     ccs == null || ccs.getNetconfMasterNode() == null ? "null" : ccs.getNetconfMasterNode();
-            LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + this.clusterName);
-            if (!masterNodeName.equals(this.clusterName)) {
+            LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + clusterName);
+            if (!masterNodeName.equals(clusterName)) {
                 LOG.debug("netconf change but me is not master for this node");
                 return false;
             }
@@ -622,15 +683,24 @@ public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, Rpc
     }
 
 
-    private void sleepMs(int milliseconds) {
-        try {
-            Thread.sleep(milliseconds);
-        } catch (InterruptedException e) {
-            LOG.debug("Interrupted sleep");
-            // Restore interrupted state...
-            Thread.currentThread().interrupt();
-        }
+
+    @Override
+    public void onConfigChanged() {
+        this.handleDataTreeAsync = this.config.handleAsync();
+        //setting poolsize is not possible atm
+        //this.handlingPool.setPoolSize(this.config.getAsyncHandlingPoolsize());
+
     }
 
+    public class NetconfChangeDataHolder {
+
+        protected final DataObjectModification<Node> root;
+        protected final ModificationType modificationTyp;
 
+        public NetconfChangeDataHolder(DataObjectModification<Node> root, ModificationType modificationTyp) {
+            this.root = root;
+            this.modificationTyp = modificationTyp;
+        }
+
+    }
 }