caf3da1a76b9610a5d815c1dd80179b5e3fe25c3
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : ccsdk features
4  * ================================================================================
5  * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
6  * All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  */
22 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom;
23
24 import static java.util.stream.Collectors.toList;
25 import com.google.common.util.concurrent.FluentFuture;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import java.time.Instant;
28 import java.time.format.DateTimeFormatter;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Objects;
36 import java.util.Optional;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import org.eclipse.jdt.annotation.NonNull;
41 import org.eclipse.jdt.annotation.Nullable;
42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfDomAccessor;
43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl;
44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.mdsal.MdsalApi;
45 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
46 //import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
50 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
51 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
52 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
53 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
54 import org.opendaylight.mdsal.dom.api.DOMRpcService;
55 import org.opendaylight.netconf.sal.connect.netconf.util.NetconfMessageTransformUtil;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.create.subscription.input.Filter;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
63 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.binding.DataObject;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.opendaylight.yangtools.yang.common.QName;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
71 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74
75 public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements NetconfDomAccessor {
76
77     private static final Logger LOG = LoggerFactory.getLogger(NetconfDomAccessorImpl.class);
78
79     private static final QName CREATE_SUBSCRIPTION = QName.create(CreateSubscriptionInput.QNAME, "create-subscription");
80     private static final SchemaPath RPC_PATH_CREATE_SUBSCRIPTION =
81             NetconfMessageTransformUtil.toPath(CREATE_SUBSCRIPTION);
82     private static final YangInstanceIdentifier STREAMS_PATH =
83             YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build();
84
85     protected final DOMDataBroker dataBroker;
86     protected final DOMMountPoint mountpoint;
87     protected final DomContext domContext;
88     private final DOMNotificationService notificationService;
89     private final BindingNormalizedNodeSerializer serializer;
90     private final DOMRpcService rpcService;
91
92
93     public NetconfDomAccessorImpl(NetconfAccessorImpl accessor, DOMDataBroker domDataBroker,
94             DOMMountPoint mountpoint, DomContext domContext) {
95         super(accessor);
96         this.dataBroker = Objects.requireNonNull(domDataBroker);
97         this.mountpoint = Objects.requireNonNull(mountpoint);
98         this.domContext = Objects.requireNonNull(domContext);
99         this.serializer = domContext.getBindingNormalizedNodeSerializer();
100         this.rpcService = MdsalApi.getMountpointService(mountpoint, DOMRpcService.class);
101         this.notificationService = MdsalApi.getMountpointService(mountpoint, DOMNotificationService.class);
102     }
103
104     @Override
105     public DOMDataBroker getDataBroker() {
106         return dataBroker;
107     }
108
109     @Override
110     public DOMMountPoint getMountpoint() {
111         return mountpoint;
112     }
113
114     @Override
115     public <T extends DataObject> Optional<T> readData(LogicalDatastoreType dataStoreType, YangInstanceIdentifier path,
116             Class<T> clazz) {
117         LOG.debug("Read to object datastore:{} path:{}", dataStoreType, path);
118
119         try {
120             return convertNormalizedNode(domContext.getBindingNormalizedNodeSerializer(),
121                     readDataNode(dataStoreType, path), path, clazz);
122         } catch (CanNotConvertException e) {
123             LOG.info("Incomplete read to class transaction {} {}", dataStoreType, path, e);
124             return Optional.empty();
125         }
126     }
127
128     @Override
129     public Optional<NormalizedNode<?, ?>> readDataNode(LogicalDatastoreType dataStoreType,
130             YangInstanceIdentifier path) {
131         LOG.debug("Read to node datastore:{} path:{}", dataStoreType, path);
132
133         try (DOMDataTreeReadTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction()) {
134             FluentFuture<Optional<NormalizedNode<?, ?>>> foData = readOnlyTransaction.read(dataStoreType, path);
135             // RAVI - Add a few debug here, like what ? Speak to Micha....
136
137             Optional<NormalizedNode<?, ?>> data = foData.get(120, TimeUnit.SECONDS);
138             LOG.info("read is done - {} ", foData.isDone());
139             return data;
140
141         } catch (InterruptedException | ExecutionException | TimeoutException e) {
142             LOG.info("Incomplete read to node transaction {} {}", dataStoreType, path, e);
143             return Optional.empty();
144         }
145     }
146
147     @SuppressWarnings("unchecked")
148     private static <T extends DataObject> Optional<T> convertNormalizedNode(BindingNormalizedNodeSerializer serializer,
149             Optional<NormalizedNode<?, ?>> oData, YangInstanceIdentifier path, Class<T> clazz)
150             throws CanNotConvertException {
151         if (oData.isPresent()) {
152             NormalizedNode<?, ?> data = oData.get();
153             LOG.debug("convertNormalizedNode data identifier: {} data nodetype: {}", data.getIdentifier(),
154                     data.getNodeType());
155             @Nullable
156             Entry<InstanceIdentifier<?>, DataObject> entry = serializer.fromNormalizedNode(path, data);
157             if (entry != null) {
158                 LOG.debug("object identifier: {}", entry.getKey());
159                 DataObject value = entry.getValue();
160                 if (clazz.isInstance(value)) {
161                     return Optional.of((T) value);
162                 } else {
163                     throw new CanNotConvertException("Unexpected class. Expected:" + clazz.getName() + " provided:"
164                             + value.getClass().getName() + " Nodetype:" + data.getNodeType());
165                 }
166             } else {
167                 throw new CanNotConvertException(
168                         "No object created for path:" + path + " Nodetype:" + data.getNodeType());
169             }
170         } else {
171             throw new CanNotConvertException("No data received for path:" + path);
172         }
173     }
174
175     @Override
176     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
177             @NonNull T listener, Collection<SchemaPath> types) {
178         LOG.info("Begin register listener for Mountpoint {}", mountpoint.getIdentifier().toString());
179
180         final ListenerRegistration<DOMNotificationListener> ranListenerRegistration =
181                 notificationService.registerNotificationListener(listener, types);
182
183         LOG.info("End registration listener for Mountpoint {} Listener: {} Result: {}",
184                 mountpoint.getIdentifier().toString(), notificationService, ranListenerRegistration);
185
186         return ranListenerRegistration;
187     }
188
189     @Override
190     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
191             @NonNull T listener, SchemaPath[] types) {
192         return doRegisterNotificationListener(listener, Arrays.asList(types));
193     }
194
195     @Override
196     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
197             @NonNull T listener, QName[] types) {
198         List<SchemaPath> schemaPathList = Arrays.stream(types).map(qname -> NetconfMessageTransformUtil.toPath(qname)).collect(toList());
199         return doRegisterNotificationListener(listener, schemaPathList);
200     }
201
202
203     @Override
204     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(CreateSubscriptionInput input) {
205         final ContainerNode nnInput = serializer.toNormalizedNodeRpcData(input);
206         return rpcService.invokeRpc(RPC_PATH_CREATE_SUBSCRIPTION, nnInput);
207     }
208
209     @Override
210     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Optional<Stream> oStream,
211             Optional<Filter> filter, Optional<Instant> startTime, Optional<Instant> stopTime) {
212
213         CreateSubscriptionInputBuilder inputBuilder = new CreateSubscriptionInputBuilder();
214         boolean replayIsSupported = false;
215         if (oStream.isPresent()) {
216             Stream stream = oStream.get();
217             if (stream.getName() != null) {
218                 inputBuilder.setStream(stream.getName());
219             }
220             replayIsSupported = Boolean.TRUE.equals(stream.isReplaySupport());
221
222         }
223         if (filter.isPresent()) {
224             inputBuilder.setFilter(filter.get());
225         }
226         if (startTime.isPresent()) {
227             if (replayIsSupported) {
228                 inputBuilder.setStartTime(getDateAndTime(startTime.get()));
229                 if (stopTime.isPresent()) {
230                     if (startTime.get().isBefore(stopTime.get())) {
231                         inputBuilder.setStopTime(getDateAndTime(stopTime.get()));
232                     } else {
233                         throw new IllegalArgumentException("stopTime must be later than startTime");
234                     }
235                 }
236             } else {
237                 throw new IllegalArgumentException("Replay not supported by this stream.");
238             }
239         }
240         return invokeCreateSubscription(inputBuilder.build());
241     }
242
243     @Override
244     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Stream... streams) {
245         ListenableFuture<? extends DOMRpcResult> res;
246         if (streams.length == 0) {
247             return invokeCreateSubscription(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
248         } else if (streams.length == 1) {
249             return invokeCreateSubscription(Optional.of(streams[0]), Optional.empty(), Optional.empty(),
250                     Optional.empty());
251         } else {
252             for (Stream stream : streams) {
253                 res = invokeCreateSubscription(Optional.of(stream), Optional.empty(), Optional.empty(),
254                         Optional.empty());
255                 try {
256                     if (!res.get().getErrors().isEmpty()) {
257                         return res;
258                     }
259                 } catch (InterruptedException | ExecutionException e) {
260                     LOG.warn("Exception during rpc call", e);
261                     return res;
262                 }
263             }
264         }
265         throw new IllegalStateException("Could never be reached"); //avoid eclipse error
266     }
267
268     @Override
269     public @NonNull Map<StreamKey, Stream> getNotificationStreamsAsMap() {
270         Optional<Streams> oStreams = readData(LogicalDatastoreType.OPERATIONAL, STREAMS_PATH, Streams.class);
271         return oStreams.isPresent() ? oStreams.get().nonnullStream() : Collections.emptyMap();
272     }
273
274     @Override
275     public BindingNormalizedNodeSerializer getBindingNormalizedNodeSerializer() {
276         return serializer;
277     }
278
279     private DateAndTime getDateAndTime(Instant dateTime) {
280         final String formattedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dateTime);
281         return new DateAndTime(formattedDate);
282     }
283 }