2  * ============LICENSE_START=======================================================
 
   3  * ONAP : ccsdk features
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *     http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom;
 
  24 import static java.util.stream.Collectors.toList;
 
  25 import com.google.common.util.concurrent.FluentFuture;
 
  26 import com.google.common.util.concurrent.ListenableFuture;
 
  27 import java.time.Instant;
 
  28 import java.time.format.DateTimeFormatter;
 
  29 import java.util.Arrays;
 
  30 import java.util.Collection;
 
  31 import java.util.Collections;
 
  32 import java.util.List;
 
  34 import java.util.Map.Entry;
 
  35 import java.util.Objects;
 
  36 import java.util.Optional;
 
  37 import java.util.concurrent.ExecutionException;
 
  38 import java.util.concurrent.TimeUnit;
 
  39 import java.util.concurrent.TimeoutException;
 
  40 import org.eclipse.jdt.annotation.NonNull;
 
  41 import org.eclipse.jdt.annotation.Nullable;
 
  42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfDomAccessor;
 
  43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl;
 
  44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.mdsal.MdsalApi;
 
  45 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 
  46 //import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
 
  47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 
  48 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 
  49 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
 
  50 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
 
  51 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
 
  52 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
 
  53 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 
  54 import org.opendaylight.mdsal.dom.api.DOMRpcService;
 
  55 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
 
  56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
 
  57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
 
  58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.create.subscription.input.Filter;
 
  59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
 
  60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
 
  61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
 
  62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
 
  63 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 
  64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
  65 import org.opendaylight.yangtools.yang.binding.DataObject;
 
  66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
  67 import org.opendaylight.yangtools.yang.common.QName;
 
  68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
  69 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 
  70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
  71 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 
  72 import org.slf4j.Logger;
 
  73 import org.slf4j.LoggerFactory;
 
  75 public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements NetconfDomAccessor {
 
  77     private static final Logger LOG = LoggerFactory.getLogger(NetconfDomAccessorImpl.class);
 
  79     private static final QName CREATE_SUBSCRIPTION = QName.create(CreateSubscriptionInput.QNAME, "create-subscription");
 
  80     private static final YangInstanceIdentifier STREAMS_PATH =
 
  81             YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build();
 
  83     protected final DOMDataBroker dataBroker;
 
  84     protected final DOMMountPoint mountpoint;
 
  85     protected final DomContext domContext;
 
  86     private final DOMNotificationService notificationService;
 
  87     private final BindingNormalizedNodeSerializer serializer;
 
  88     private final DOMRpcService rpcService;
 
  91     public NetconfDomAccessorImpl(NetconfAccessorImpl accessor, DOMDataBroker domDataBroker,
 
  92             DOMMountPoint mountpoint, DomContext domContext) {
 
  94         this.dataBroker = Objects.requireNonNull(domDataBroker);
 
  95         this.mountpoint = Objects.requireNonNull(mountpoint);
 
  96         this.domContext = Objects.requireNonNull(domContext);
 
  97         this.serializer = domContext.getBindingNormalizedNodeSerializer();
 
  98         this.rpcService = MdsalApi.getMountpointService(mountpoint, DOMRpcService.class);
 
  99         this.notificationService = MdsalApi.getMountpointService(mountpoint, DOMNotificationService.class);
 
 103     public DOMDataBroker getDataBroker() {
 
 108     public DOMMountPoint getMountpoint() {
 
 113     public <T extends DataObject> Optional<T> readData(LogicalDatastoreType dataStoreType, YangInstanceIdentifier path,
 
 115         LOG.debug("Read to object datastore:{} path:{}", dataStoreType, path);
 
 118             return convertNormalizedNode(domContext.getBindingNormalizedNodeSerializer(),
 
 119                     readDataNode(dataStoreType, path), path, clazz);
 
 120         } catch (CanNotConvertException e) {
 
 121             LOG.info("Incomplete read to class transaction {} {}", dataStoreType, path, e);
 
 122             return Optional.empty();
 
 127     public Optional<NormalizedNode<?, ?>> readDataNode(LogicalDatastoreType dataStoreType,
 
 128             YangInstanceIdentifier path) {
 
 129         LOG.debug("Read to node datastore:{} path:{}", dataStoreType, path);
 
 131         try (DOMDataTreeReadTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction()) {
 
 132             FluentFuture<Optional<NormalizedNode<?, ?>>> foData = readOnlyTransaction.read(dataStoreType, path);
 
 133             // RAVI - Add a few debug here, like what ? Speak to Micha....
 
 135             Optional<NormalizedNode<?, ?>> data = foData.get(120, TimeUnit.SECONDS);
 
 136             LOG.info("read is done - {} ", foData.isDone());
 
 139         } catch (InterruptedException | ExecutionException | TimeoutException e) {
 
 140             LOG.info("Incomplete read to node transaction {} {}", dataStoreType, path, e);
 
 141             return Optional.empty();
 
 145     @SuppressWarnings("unchecked")
 
 146     private static <T extends DataObject> Optional<T> convertNormalizedNode(BindingNormalizedNodeSerializer serializer,
 
 147             Optional<NormalizedNode<?, ?>> oData, YangInstanceIdentifier path, Class<T> clazz)
 
 148             throws CanNotConvertException {
 
 149         if (oData.isPresent()) {
 
 150             NormalizedNode<?, ?> data = oData.get();
 
 151             LOG.debug("convertNormalizedNode data identifier: {} data nodetype: {}", data.getIdentifier(),
 
 154             Entry<InstanceIdentifier<?>, DataObject> entry = serializer.fromNormalizedNode(path, data);
 
 156                 LOG.debug("object identifier: {}", entry.getKey());
 
 157                 DataObject value = entry.getValue();
 
 158                 if (clazz.isInstance(value)) {
 
 159                     return Optional.of((T) value);
 
 161                     throw new CanNotConvertException("Unexpected class. Expected:" + clazz.getName() + " provided:"
 
 162                             + value.getClass().getName() + " Nodetype:" + data.getNodeType());
 
 165                 throw new CanNotConvertException(
 
 166                         "No object created for path:" + path + " Nodetype:" + data.getNodeType());
 
 169             throw new CanNotConvertException("No data received for path:" + path);
 
 174     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
 
 175             @NonNull T listener, Collection<Absolute> types) {
 
 176         LOG.info("Begin register listener for Mountpoint {}", mountpoint.getIdentifier().toString());
 
 178         final ListenerRegistration<DOMNotificationListener> ranListenerRegistration =
 
 179                 notificationService.registerNotificationListener(listener, types);
 
 181         LOG.info("End registration listener for Mountpoint {} Listener: {} Result: {}",
 
 182                 mountpoint.getIdentifier().toString(), notificationService, ranListenerRegistration);
 
 184         return ranListenerRegistration;
 
 188     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
 
 189             @NonNull T listener, Absolute[] types) {
 
 190         return doRegisterNotificationListener(listener, Arrays.asList(types));
 
 194     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
 
 195             @NonNull T listener, QName[] types) {
 
 196         List<Absolute> schemaPathList = Arrays.stream(types).map(qname -> Absolute.of(qname)).collect(toList());
 
 197         return doRegisterNotificationListener(listener, schemaPathList);
 
 202     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(CreateSubscriptionInput input) {
 
 203         final ContainerNode nnInput = serializer.toNormalizedNodeRpcData(input);
 
 204         return rpcService.invokeRpc(CREATE_SUBSCRIPTION, nnInput);
 
 208     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Optional<Stream> oStream,
 
 209             Optional<Filter> filter, Optional<Instant> startTime, Optional<Instant> stopTime) {
 
 211         CreateSubscriptionInputBuilder inputBuilder = new CreateSubscriptionInputBuilder();
 
 212         boolean replayIsSupported = false;
 
 213         if (oStream.isPresent()) {
 
 214             Stream stream = oStream.get();
 
 215             if (stream.getName() != null) {
 
 216                 inputBuilder.setStream(stream.getName());
 
 218             replayIsSupported = Boolean.TRUE.equals(stream.isReplaySupport());
 
 221         if (filter.isPresent()) {
 
 222             inputBuilder.setFilter(filter.get());
 
 224         if (startTime.isPresent()) {
 
 225             if (replayIsSupported) {
 
 226                 inputBuilder.setStartTime(getDateAndTime(startTime.get()));
 
 227                 if (stopTime.isPresent()) {
 
 228                     if (startTime.get().isBefore(stopTime.get())) {
 
 229                         inputBuilder.setStopTime(getDateAndTime(stopTime.get()));
 
 231                         throw new IllegalArgumentException("stopTime must be later than startTime");
 
 235                 throw new IllegalArgumentException("Replay not supported by this stream.");
 
 238         return invokeCreateSubscription(inputBuilder.build());
 
 242     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Stream... streams) {
 
 243         ListenableFuture<? extends DOMRpcResult> res;
 
 244         if (streams.length == 0) {
 
 245             return invokeCreateSubscription(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
 
 246         } else if (streams.length == 1) {
 
 247             return invokeCreateSubscription(Optional.of(streams[0]), Optional.empty(), Optional.empty(),
 
 250             for (Stream stream : streams) {
 
 251                 res = invokeCreateSubscription(Optional.of(stream), Optional.empty(), Optional.empty(),
 
 254                     if (!res.get().getErrors().isEmpty()) {
 
 257                 } catch (InterruptedException | ExecutionException e) {
 
 258                     LOG.warn("Exception during rpc call", e);
 
 263         throw new IllegalStateException("Could never be reached"); //avoid eclipse error
 
 267     public @NonNull Map<StreamKey, Stream> getNotificationStreamsAsMap() {
 
 268         Optional<Streams> oStreams = readData(LogicalDatastoreType.OPERATIONAL, STREAMS_PATH, Streams.class);
 
 269         return oStreams.isPresent() ? oStreams.get().nonnullStream() : Collections.emptyMap();
 
 273     public BindingNormalizedNodeSerializer getBindingNormalizedNodeSerializer() {
 
 277     private DateAndTime getDateAndTime(Instant dateTime) {
 
 278         final String formattedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dateTime);
 
 279         return new DateAndTime(formattedDate);