X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=sdnr%2Fwt%2Fnetconfnode-state-service%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fccsdk%2Ffeatures%2Fsdnr%2Fwt%2Fnetconfnodestateservice%2Fimpl%2FNetconfNodeStateServiceImpl.java;h=1b1676c42a502e45c908263a201145f3dbb367fc;hb=f3ab13d054a29fd59da14b1a5ef6af4fed024fc9;hp=d3752cdc49e78c539f73452b419715ee9f6356ef;hpb=a252be83694ae33260d99d5371ed48c1558aa2e8;p=ccsdk%2Ffeatures.git diff --git a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/NetconfNodeStateServiceImpl.java b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/NetconfNodeStateServiceImpl.java index d3752cdc4..1b1676c42 100644 --- a/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/NetconfNodeStateServiceImpl.java +++ b/sdnr/wt/netconfnode-state-service/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/netconfnodestateservice/impl/NetconfNodeStateServiceImpl.java @@ -18,18 +18,16 @@ package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.annotation.Nullable; import org.eclipse.jdt.annotation.NonNull; import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener; +import org.onap.ccsdk.features.sdnr.wt.common.threading.GenericRunnableFactory; +import org.onap.ccsdk.features.sdnr.wt.common.threading.KeyBasedThreadpool; import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.IEntityDataProvider; import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.StatusChangedHandler.StatusKey; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor; @@ -58,6 +56,7 @@ import org.opendaylight.mdsal.binding.api.NotificationPublishService; import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; @@ -75,8 +74,7 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.model.parser.api.YangParserException; -import org.opendaylight.yangtools.yang.model.parser.api.YangParserFactory; +import org.opendaylight.yangtools.yang.parser.api.YangParserFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,9 +99,11 @@ public class NetconfNodeStateServiceImpl // Name of ODL controller NETCONF instance private static final NodeId CONTROLLER = new NodeId("controller-config"); + private static final int ASYNC_EXECUTION_POOLSIZE = 20; // -- OSGi services, provided private DataBroker dataBroker; + private DOMDataBroker domDataBroker; private MountPointService mountPointService; private DOMMountPointService domMountPointService; private RpcProviderService rpcProviderRegistry; @@ -145,7 +145,8 @@ public class NetconfNodeStateServiceImpl private String clusterName; /** nodeId to threadPool (size=1) for datatreechange handling) **/ - private final Map handlingPool; + // private final Map handlingPool; + private KeyBasedThreadpool handlingPool; private boolean handleDataTreeAsync; @@ -159,6 +160,7 @@ public class NetconfNodeStateServiceImpl LOG.info("Creating provider for {}", APPLICATION_NAME); this.dataBroker = null; + this.domDataBroker = null; this.mountPointService = null; this.domMountPointService = null; this.rpcProviderRegistry = null; @@ -174,14 +176,16 @@ public class NetconfNodeStateServiceImpl this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>(); this.vesNotificationListenerList = new CopyOnWriteArrayList<>(); this.accessorManager = null; - this.handlingPool = new HashMap<>(); - + this.handlingPool = null; } - public void setDataBroker(DataBroker dataBroker) { this.dataBroker = dataBroker; } + public void setDomDataBroker(DOMDataBroker domDataBroker) { + this.domDataBroker = domDataBroker; + } + public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) { this.rpcProviderRegistry = rpcProviderRegistry; } @@ -214,13 +218,17 @@ public class NetconfNodeStateServiceImpl this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer; } - /** Blueprint initialization - * @throws YangParserException **/ + /** + * Blueprint initialization + * + * @throws YangParserException + **/ public void init() { LOG.info("Session Initiated start {}", APPLICATION_NAME); this.domContext = new DomContext(this.yangParserFactory, this.bindingNormalizedNodeSerializer); - this.netconfCommunicatorManager = new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext); + this.netconfCommunicatorManager = + new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext); this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext, this); // Start RPC Service this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList); @@ -250,7 +258,19 @@ public class NetconfNodeStateServiceImpl listenerL1 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1()); listenerL2 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L2()); - + this.handlingPool = new KeyBasedThreadpool(this.config.getAsyncHandlingPoolsize(), 1, + new GenericRunnableFactory<>() { + public Runnable create(final NodeId key, final NetconfChangeDataHolder arg) { + return new Runnable() { + + @Override + public void run() { + NetconfNodeStateServiceImpl.this.handleDataTreeChange(arg.root, key, + arg.modificationTyp); + } + }; + }; + }); this.initializationSuccessful = true; LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful); @@ -263,15 +283,19 @@ public class NetconfNodeStateServiceImpl } public DomContext getDomContext() { - return Objects.requireNonNull(domContext, "Initialization not completed for domContext" ); + return Objects.requireNonNull(domContext, "Initialization not completed for domContext"); } public DataBroker getDataBroker() { return dataBroker; } + public DOMDataBroker getDOMDataBroker() { + return domDataBroker; + } + public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() { - return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService" ); + return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService"); } @Override @@ -282,7 +306,7 @@ public class NetconfNodeStateServiceImpl @Override public @NonNull ListenerRegistration registerNetconfNodeConnectListener( final @NonNull L netconfNodeConnectListener) { - LOG.info("Register connect listener {}", netconfNodeConnectListener.getClass().getName()); + LOG.debug("Register connect listener {}", netconfNodeConnectListener.getClass().getName()); netconfNodeConnectListenerList.add(netconfNodeConnectListener); return new ListenerRegistration() { @@ -293,7 +317,7 @@ public class NetconfNodeStateServiceImpl @Override public void close() { - LOG.info("Remove connect listener {}", netconfNodeConnectListener); + LOG.debug("Remove connect listener {}", netconfNodeConnectListener); netconfNodeConnectListenerList.remove(netconfNodeConnectListener); } }; @@ -302,7 +326,7 @@ public class NetconfNodeStateServiceImpl @Override public @NonNull ListenerRegistration registerNetconfNodeStateListener( @NonNull L netconfNodeStateListener) { - LOG.info("Register state listener {}", netconfNodeStateListener.getClass().getName()); + LOG.debug("Register state listener {}", netconfNodeStateListener.getClass().getName()); netconfNodeStateListenerList.add(netconfNodeStateListener); return new ListenerRegistration() { @@ -313,7 +337,7 @@ public class NetconfNodeStateServiceImpl @Override public void close() { - LOG.info("Remove state listener {}", netconfNodeStateListener); + LOG.debug("Remove state listener {}", netconfNodeStateListener); netconfNodeStateListenerList.remove(netconfNodeStateListener); } }; @@ -322,7 +346,7 @@ public class NetconfNodeStateServiceImpl @Override public @NonNull ListenerRegistration registerVesNotifications( @NonNull L vesNotificationListener) { - LOG.info("Register Ves notification listener {}", vesNotificationListener.getClass().getName()); + LOG.debug("Register Ves notification listener {}", vesNotificationListener.getClass().getName()); vesNotificationListenerList.add(vesNotificationListener); return new ListenerRegistration() { @@ -333,7 +357,7 @@ public class NetconfNodeStateServiceImpl @Override public void close() { - LOG.info("Remove Ves notification listener {}", vesNotificationListener); + LOG.debug("Remove Ves notification listener {}", vesNotificationListener); vesNotificationListenerList.remove(vesNotificationListener); } }; @@ -388,7 +412,7 @@ public class NetconfNodeStateServiceImpl private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) { String mountPointNodeName = nNodeId.getValue(); - LOG.info("Access connected state for mountpoint {}", mountPointNodeName); + LOG.debug("Access connected state for mountpoint {}", mountPointNodeName); boolean preConditionMissing = false; if (mountPointService == null) { @@ -404,22 +428,22 @@ public class NetconfNodeStateServiceImpl } boolean isNetconfNodeMaster = isNetconfNodeMaster(netconfNode); - LOG.info("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName); + LOG.debug("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName); if (isNetconfNodeMaster) { NetconfAccessor acessor = accessorManager.getAccessor(nNodeId, netconfNode); - /* - * --> Call Listers for onConnect() Indication - for (all) - */ - netconfNodeConnectListenerList.forEach(item -> { - try { - item.onEnterConnected(acessor); - } catch (Exception e) { - LOG.info("Exception during onEnterConnected listener call", e); - } - }); + /* + * --> Call Listers for onConnect() Indication + for (all) + */ + netconfNodeConnectListenerList.forEach(item -> { + try { + item.onEnterConnected(acessor); + } catch (Exception e) { + LOG.debug("Exception during onEnterConnected listener call", e); + } + }); - LOG.info("Connect indication forwarded for {}", mountPointNodeName); + LOG.debug("Connect indication forwarded for {}", mountPointNodeName); } } @@ -432,7 +456,7 @@ public class NetconfNodeStateServiceImpl */ private void leaveConnectedState(NodeId nNodeId, Optional optionalNetconfNode) { String mountPointNodeName = nNodeId.getValue(); - LOG.info("leaveConnectedState id {}", mountPointNodeName); + LOG.debug("leaveConnectedState id {}", mountPointNodeName); if (this.accessorManager.containes(nNodeId)) { netconfNodeConnectListenerList.forEach(item -> { @@ -443,13 +467,13 @@ public class NetconfNodeStateServiceImpl LOG.warn("Unexpeced null item during onleave"); } } catch (Exception e) { - LOG.info("Exception during onLeaveConnected listener call", e); + LOG.debug("Exception during onLeaveConnected listener call", e); } }); - LOG.info("Remove Master mountpoint {}", mountPointNodeName); + LOG.debug("Remove Master mountpoint {}", mountPointNodeName); this.accessorManager.removeAccessor(nNodeId); } else { - LOG.info("Master mountpoint already removed {}", mountPointNodeName); + LOG.debug("Master mountpoint already removed {}", mountPointNodeName); } } @@ -471,7 +495,7 @@ public class NetconfNodeStateServiceImpl connectedBefore = false; created = true; } - LOG.info("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId, + LOG.debug("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId, modificationTyp, created, connectedBefore, connectedAfter, isCluster, getClusteredConnectionStatus(nNodeAfter)); switch (modificationTyp) { @@ -554,18 +578,9 @@ public class NetconfNodeStateServiceImpl if (modificationTyp == null) { LOG.warn("L1 empty modification type"); } else { + LOG.trace("handle data tree change with async={}",this.handleDataTreeAsync); if (this.handleDataTreeAsync) { - ExecutorService executor = this.handlingPool.getOrDefault(nodeId.getValue(), null); - if (executor == null) { - executor = Executors.newFixedThreadPool(5); - this.handlingPool.put(nodeId.getValue(), executor); - } - executor.execute(new Thread() { - @Override - public void run() { - handleDataTreeChange(root, nodeId, modificationTyp); - } - }); + this.handlingPool.execute(nodeId, new NetconfChangeDataHolder(root, modificationTyp)); } else { handleDataTreeChange(root, nodeId, modificationTyp); @@ -574,10 +589,10 @@ public class NetconfNodeStateServiceImpl } } } catch (NullPointerException | IllegalStateException e) { - LOG.info("Data not available at ", e); + LOG.debug("Data not available at ", e); } } //for - LOG.info("datatreechanged handler completed"); + LOG.debug("datatreechanged handler completed"); } // ---- subclasses for listeners @@ -588,10 +603,10 @@ public class NetconfNodeStateServiceImpl private class L1 implements ClusteredDataTreeChangeListener { @Override public void onDataTreeChanged(@NonNull Collection> changes) { - LOG.info("L1 TreeChange enter changes:{}", changes.size()); + LOG.debug("L1 TreeChange enter changes:{}", changes.size()); //Debug AkkTimeout NetconfNodeStateServiceImpl.this.pool.execute(new Thread( () -> onDataTreeChangedHandler(changes))); onDataTreeChangedHandler(changes); - LOG.info("L1 TreeChange leave"); + LOG.debug("L1 TreeChange leave"); } } @@ -602,9 +617,9 @@ public class NetconfNodeStateServiceImpl @Override public void onDataTreeChanged(@NonNull Collection> changes) { - LOG.info("L2 TreeChange enter changes:{}", changes.size()); + LOG.debug("L2 TreeChange enter changes:{}", changes.size()); // Do nothing - LOG.info("L2 TreeChange leave"); + LOG.debug("L2 TreeChange leave"); } } @@ -672,6 +687,20 @@ public class NetconfNodeStateServiceImpl @Override public void onConfigChanged() { this.handleDataTreeAsync = this.config.handleAsync(); + //setting poolsize is not possible atm + //this.handlingPool.setPoolSize(this.config.getAsyncHandlingPoolsize()); + + } + + public class NetconfChangeDataHolder { + + protected final DataObjectModification root; + protected final ModificationType modificationTyp; + + public NetconfChangeDataHolder(DataObjectModification root, ModificationType modificationTyp) { + this.root = root; + this.modificationTyp = modificationTyp; + } } }