package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.eclipse.jdt.annotation.NonNull;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
+import org.onap.ccsdk.features.sdnr.wt.common.threading.GenericRunnableFactory;
+import org.onap.ccsdk.features.sdnr.wt.common.threading.KeyBasedThreadpool;
import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.IEntityDataProvider;
import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.StatusChangedHandler.StatusKey;
import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.parser.api.YangParserException;
-import org.opendaylight.yangtools.yang.model.parser.api.YangParserFactory;
+import org.opendaylight.yangtools.yang.parser.api.YangParserFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Name of ODL controller NETCONF instance
private static final NodeId CONTROLLER = new NodeId("controller-config");
+ private static final int ASYNC_EXECUTION_POOLSIZE = 20;
// -- OSGi services, provided
private DataBroker dataBroker;
+ private DOMDataBroker domDataBroker;
private MountPointService mountPointService;
private DOMMountPointService domMountPointService;
private RpcProviderService rpcProviderRegistry;
private String clusterName;
/** nodeId to threadPool (size=1) for datatreechange handling) **/
- private final Map<String, ExecutorService> handlingPool;
+ // private final Map<String, ExecutorService> handlingPool;
+ private KeyBasedThreadpool<NodeId, NetconfChangeDataHolder> handlingPool;
private boolean handleDataTreeAsync;
LOG.info("Creating provider for {}", APPLICATION_NAME);
this.dataBroker = null;
+ this.domDataBroker = null;
this.mountPointService = null;
this.domMountPointService = null;
this.rpcProviderRegistry = null;
this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>();
this.vesNotificationListenerList = new CopyOnWriteArrayList<>();
this.accessorManager = null;
- this.handlingPool = new HashMap<>();
-
+ this.handlingPool = null;
}
-
public void setDataBroker(DataBroker dataBroker) {
this.dataBroker = dataBroker;
}
+ public void setDomDataBroker(DOMDataBroker domDataBroker) {
+ this.domDataBroker = domDataBroker;
+ }
+
public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
this.rpcProviderRegistry = rpcProviderRegistry;
}
this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
}
- /** Blueprint initialization
- * @throws YangParserException **/
+ /**
+ * Blueprint initialization
+ *
+ * @throws YangParserException
+ **/
public void init() {
LOG.info("Session Initiated start {}", APPLICATION_NAME);
this.domContext = new DomContext(this.yangParserFactory, this.bindingNormalizedNodeSerializer);
- this.netconfCommunicatorManager = new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext);
- this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext);
+ this.netconfCommunicatorManager =
+ new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext);
+ this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext, this);
// Start RPC Service
this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList);
// Get configuration
listenerL1 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1());
listenerL2 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L2());
-
+ this.handlingPool = new KeyBasedThreadpool<NodeId, NetconfChangeDataHolder>(this.config.getAsyncHandlingPoolsize(), 1,
+ new GenericRunnableFactory<>() {
+ public Runnable create(final NodeId key, final NetconfChangeDataHolder arg) {
+ return new Runnable() {
+
+ @Override
+ public void run() {
+ NetconfNodeStateServiceImpl.this.handleDataTreeChange(arg.root, key,
+ arg.modificationTyp);
+ }
+ };
+ };
+ });
this.initializationSuccessful = true;
LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful);
}
public DomContext getDomContext() {
- return Objects.requireNonNull(domContext, "Initialization not completed for domContext" );
+ return Objects.requireNonNull(domContext, "Initialization not completed for domContext");
+ }
+
+ public DataBroker getDataBroker() {
+ return dataBroker;
+ }
+
+ public DOMDataBroker getDOMDataBroker() {
+ return domDataBroker;
}
public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() {
- return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService" );
+ return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService");
}
@Override
@Override
public <L extends NetconfNodeConnectListener> @NonNull ListenerRegistration<L> registerNetconfNodeConnectListener(
final @NonNull L netconfNodeConnectListener) {
- LOG.info("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
+ LOG.debug("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
netconfNodeConnectListenerList.add(netconfNodeConnectListener);
return new ListenerRegistration<L>() {
@Override
public void close() {
- LOG.info("Remove connect listener {}", netconfNodeConnectListener);
+ LOG.debug("Remove connect listener {}", netconfNodeConnectListener);
netconfNodeConnectListenerList.remove(netconfNodeConnectListener);
}
};
@Override
public <L extends NetconfNodeStateListener> @NonNull ListenerRegistration<L> registerNetconfNodeStateListener(
@NonNull L netconfNodeStateListener) {
- LOG.info("Register state listener {}", netconfNodeStateListener.getClass().getName());
+ LOG.debug("Register state listener {}", netconfNodeStateListener.getClass().getName());
netconfNodeStateListenerList.add(netconfNodeStateListener);
return new ListenerRegistration<L>() {
@Override
public void close() {
- LOG.info("Remove state listener {}", netconfNodeStateListener);
+ LOG.debug("Remove state listener {}", netconfNodeStateListener);
netconfNodeStateListenerList.remove(netconfNodeStateListener);
}
};
@Override
public <L extends VesNotificationListener> @NonNull ListenerRegistration<L> registerVesNotifications(
@NonNull L vesNotificationListener) {
- LOG.info("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
+ LOG.debug("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
vesNotificationListenerList.add(vesNotificationListener);
return new ListenerRegistration<L>() {
@Override
public void close() {
- LOG.info("Remove Ves notification listener {}", vesNotificationListener);
+ LOG.debug("Remove Ves notification listener {}", vesNotificationListener);
vesNotificationListenerList.remove(vesNotificationListener);
}
};
private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) {
String mountPointNodeName = nNodeId.getValue();
- LOG.info("Access connected state for mountpoint {}", mountPointNodeName);
+ LOG.debug("Access connected state for mountpoint {}", mountPointNodeName);
boolean preConditionMissing = false;
if (mountPointService == null) {
}
boolean isNetconfNodeMaster = isNetconfNodeMaster(netconfNode);
- LOG.info("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
+ LOG.debug("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
if (isNetconfNodeMaster) {
NetconfAccessor acessor = accessorManager.getAccessor(nNodeId, netconfNode);
- /*
- * --> Call Listers for onConnect() Indication
- for (all)
- */
- netconfNodeConnectListenerList.forEach(item -> {
- try {
- item.onEnterConnected(acessor);
- } catch (Exception e) {
- LOG.info("Exception during onEnterConnected listener call", e);
- }
- });
+ /*
+ * --> Call Listers for onConnect() Indication
+ for (all)
+ */
+ netconfNodeConnectListenerList.forEach(item -> {
+ try {
+ item.onEnterConnected(acessor);
+ } catch (Exception e) {
+ LOG.debug("Exception during onEnterConnected listener call", e);
+ }
+ });
- LOG.info("Connect indication forwarded for {}", mountPointNodeName);
+ LOG.debug("Connect indication forwarded for {}", mountPointNodeName);
}
}
*/
private void leaveConnectedState(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
String mountPointNodeName = nNodeId.getValue();
- LOG.info("leaveConnectedState id {}", mountPointNodeName);
+ LOG.debug("leaveConnectedState id {}", mountPointNodeName);
if (this.accessorManager.containes(nNodeId)) {
netconfNodeConnectListenerList.forEach(item -> {
LOG.warn("Unexpeced null item during onleave");
}
} catch (Exception e) {
- LOG.info("Exception during onLeaveConnected listener call", e);
+ LOG.debug("Exception during onLeaveConnected listener call", e);
}
});
- LOG.info("Remove Master mountpoint {}", mountPointNodeName);
+ LOG.debug("Remove Master mountpoint {}", mountPointNodeName);
this.accessorManager.removeAccessor(nNodeId);
} else {
- LOG.info("Master mountpoint already removed {}", mountPointNodeName);
+ LOG.debug("Master mountpoint already removed {}", mountPointNodeName);
}
}
connectedBefore = false;
created = true;
}
- LOG.info("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId,
+ LOG.debug("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId,
modificationTyp, created, connectedBefore, connectedAfter, isCluster,
getClusteredConnectionStatus(nNodeAfter));
switch (modificationTyp) {
if (modificationTyp == null) {
LOG.warn("L1 empty modification type");
} else {
+ LOG.trace("handle data tree change with async={}",this.handleDataTreeAsync);
if (this.handleDataTreeAsync) {
- ExecutorService executor = this.handlingPool.getOrDefault(nodeId.getValue(), null);
- if (executor == null) {
- executor = Executors.newFixedThreadPool(5);
- this.handlingPool.put(nodeId.getValue(), executor);
- }
- executor.execute(new Thread() {
- @Override
- public void run() {
- handleDataTreeChange(root, nodeId, modificationTyp);
- }
- });
+ this.handlingPool.execute(nodeId, new NetconfChangeDataHolder(root, modificationTyp));
} else {
handleDataTreeChange(root, nodeId, modificationTyp);
}
}
} catch (NullPointerException | IllegalStateException e) {
- LOG.info("Data not available at ", e);
+ LOG.debug("Data not available at ", e);
}
} //for
- LOG.info("datatreechanged handler completed");
+ LOG.debug("datatreechanged handler completed");
}
// ---- subclasses for listeners
private class L1 implements ClusteredDataTreeChangeListener<Node> {
@Override
public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
- LOG.info("L1 TreeChange enter changes:{}", changes.size());
+ LOG.debug("L1 TreeChange enter changes:{}", changes.size());
//Debug AkkTimeout NetconfNodeStateServiceImpl.this.pool.execute(new Thread( () -> onDataTreeChangedHandler(changes)));
onDataTreeChangedHandler(changes);
- LOG.info("L1 TreeChange leave");
+ LOG.debug("L1 TreeChange leave");
}
}
@Override
public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
- LOG.info("L2 TreeChange enter changes:{}", changes.size());
+ LOG.debug("L2 TreeChange enter changes:{}", changes.size());
// Do nothing
- LOG.info("L2 TreeChange leave");
+ LOG.debug("L2 TreeChange leave");
}
}
@Override
public void onConfigChanged() {
this.handleDataTreeAsync = this.config.handleAsync();
+ //setting poolsize is not possible atm
+ //this.handlingPool.setPoolSize(this.config.getAsyncHandlingPoolsize());
+
+ }
+
+ public class NetconfChangeDataHolder {
+
+ protected final DataObjectModification<Node> root;
+ protected final ModificationType modificationTyp;
+
+ public NetconfChangeDataHolder(DataObjectModification<Node> root, ModificationType modificationTyp) {
+ this.root = root;
+ this.modificationTyp = modificationTyp;
+ }
}
}