f862a77bf265c06381d2a0c36dafd7434a11978d
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : ccsdk features
4  * ================================================================================
5  * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
6  * All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  */
22 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom;
23
24 import static java.util.stream.Collectors.toList;
25 import com.google.common.util.concurrent.FluentFuture;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import java.time.Instant;
28 import java.time.format.DateTimeFormatter;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Objects;
36 import java.util.Optional;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import org.eclipse.jdt.annotation.NonNull;
41 import org.eclipse.jdt.annotation.Nullable;
42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.DomContext;
43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfDomAccessor;
44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl;
45 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.mdsal.MdsalApi;
46 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
47 //import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
48 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
49 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
51 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
52 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
53 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
54 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
55 import org.opendaylight.mdsal.dom.api.DOMRpcService;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.create.subscription.input.Filter;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
63 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.binding.DataObject;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.opendaylight.yangtools.yang.common.QName;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
71 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74
75 public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements NetconfDomAccessor {
76
77     private static final Logger LOG = LoggerFactory.getLogger(NetconfDomAccessorImpl.class);
78
79     private static final QName CREATE_SUBSCRIPTION = QName.create(CreateSubscriptionInput.QNAME, "create-subscription");
80     private static final YangInstanceIdentifier STREAMS_PATH =
81             YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build();
82
83     protected final DOMDataBroker dataBroker;
84     protected final DOMMountPoint mountpoint;
85     protected final DomContext domContext;
86     private final DOMNotificationService notificationService;
87     private final BindingNormalizedNodeSerializer serializer;
88     private final DOMRpcService rpcService;
89
90
91     public NetconfDomAccessorImpl(NetconfAccessorImpl accessor, DOMDataBroker domDataBroker,
92             DOMMountPoint mountpoint, DomContext domContext) {
93         super(accessor);
94         this.dataBroker = Objects.requireNonNull(domDataBroker);
95         this.mountpoint = Objects.requireNonNull(mountpoint);
96         this.domContext = Objects.requireNonNull(domContext);
97         this.serializer = domContext.getBindingNormalizedNodeSerializer();
98         this.rpcService = MdsalApi.getMountpointService(mountpoint, DOMRpcService.class);
99         this.notificationService = MdsalApi.getMountpointService(mountpoint, DOMNotificationService.class);
100     }
101
102     @Override
103     public DOMDataBroker getDataBroker() {
104         return dataBroker;
105     }
106
107     @Override
108     public DOMMountPoint getMountpoint() {
109         return mountpoint;
110     }
111
112     @Override
113     public DOMRpcService getRpcService() {
114         return rpcService;
115     }
116
117     @Override
118     public DomContext getDomContext() {
119         return domContext;
120     }
121
122     @Override
123     public <T extends DataObject> Optional<T> readData(LogicalDatastoreType dataStoreType, YangInstanceIdentifier path,
124             Class<T> clazz) {
125         LOG.debug("Read to object datastore:{} path:{}", dataStoreType, path);
126
127         try {
128             return convertNormalizedNode(domContext.getBindingNormalizedNodeSerializer(),
129                     readDataNode(dataStoreType, path), path, clazz);
130         } catch (CanNotConvertException e) {
131             LOG.info("Incomplete read to class transaction {} {}", dataStoreType, path, e);
132             return Optional.empty();
133         }
134     }
135
136     @Override
137     public Optional<NormalizedNode> readDataNode(LogicalDatastoreType dataStoreType,
138             YangInstanceIdentifier path) {
139         LOG.debug("Read to node datastore:{} path:{}", dataStoreType, path);
140
141         // Don't use try with resource because the implicit close of this construct is not handled
142         // correctly by underlying opendaylight NETCONF service
143         DOMDataTreeReadTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
144         try {
145             FluentFuture<Optional<NormalizedNode>> foData = readOnlyTransaction.read(dataStoreType, path);
146
147             Optional<NormalizedNode> data = foData.get(120, TimeUnit.SECONDS);
148             LOG.trace("read is done - {} ", foData.isDone());
149             return data;
150         } catch (InterruptedException e) {
151             LOG.debug("Incomplete read to node transaction {} {}", dataStoreType, path, e);
152             Thread.currentThread().interrupt();
153             return Optional.empty();
154         } catch (ExecutionException | TimeoutException e) {
155             LOG.debug("Incomplete read to node transaction {} {}", dataStoreType, path, e);
156             return Optional.empty();
157         } catch (IllegalArgumentException e) {
158                 LOG.debug("IllegalArgumentException occurred, Incomplete read to node transaction {} {}", dataStoreType, path, e);
159                 return Optional.empty();
160         }
161     }
162
163     @Override
164     public Optional<NormalizedNode> readControllerDataNode(LogicalDatastoreType dataStoreType,
165             YangInstanceIdentifier path) {
166         LOG.debug("Read to controller node datastore:{} path:{}", dataStoreType, path);
167
168         DOMDataTreeReadTransaction readOnlyTransaction = this.getControllerDOMDataBroker().newReadOnlyTransaction();
169         try {
170             FluentFuture<Optional<NormalizedNode>> foData = readOnlyTransaction.read(dataStoreType, path);
171
172             Optional<NormalizedNode> data = foData.get(120, TimeUnit.SECONDS);
173             LOG.trace("read is done - {} ", foData.isDone());
174             return data;
175         } catch (InterruptedException e) {
176             LOG.debug("Incomplete read to node transaction {} {}", dataStoreType, path, e);
177             Thread.currentThread().interrupt();
178             return Optional.empty();
179         } catch (ExecutionException | TimeoutException e) {
180             LOG.debug("Incomplete read to node transaction {} {}", dataStoreType, path, e);
181             return Optional.empty();
182         }
183     }
184
185     @SuppressWarnings("unchecked")
186     private static <T extends DataObject> Optional<T> convertNormalizedNode(BindingNormalizedNodeSerializer serializer,
187             Optional<NormalizedNode> oData, YangInstanceIdentifier path, Class<T> clazz)
188             throws CanNotConvertException {
189         if (oData.isPresent()) {
190             NormalizedNode data = oData.get();
191             LOG.debug("convertNormalizedNode data identifier: {} data nodetype: {}", data.getIdentifier(),
192                     data.getIdentifier().getNodeType());
193             @Nullable
194             Entry<InstanceIdentifier<?>, DataObject> entry = serializer.fromNormalizedNode(path, data);
195             if (entry != null) {
196                 LOG.debug("object identifier: {}", entry.getKey());
197                 DataObject value = entry.getValue();
198                 if (clazz.isInstance(value)) {
199                     return Optional.of((T) value);
200                 } else {
201                     throw new CanNotConvertException("Unexpected class. Expected:" + clazz.getName() + " provided:"
202                             + value.getClass().getName() + " Nodetype:" + data.getIdentifier().getNodeType());
203                 }
204             } else {
205                 throw new CanNotConvertException(
206                         "No object created for path:" + path + " Nodetype:" + data.getIdentifier().getNodeType());
207             }
208         } else {
209             throw new CanNotConvertException("No data received for path:" + path);
210         }
211     }
212
213     @Override
214     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
215             @NonNull T listener, Collection<Absolute> types) {
216         LOG.debug("Begin register listener for Mountpoint {}", mountpoint.getIdentifier().toString());
217
218         final ListenerRegistration<DOMNotificationListener> ranListenerRegistration =
219                 notificationService.registerNotificationListener(listener, types);
220
221         LOG.debug("End registration listener for Mountpoint {} Listener: {} Result: {}",
222                 mountpoint.getIdentifier().toString(), notificationService, ranListenerRegistration);
223
224         return ranListenerRegistration;
225     }
226
227     @Override
228     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
229             @NonNull T listener, Absolute[] types) {
230         return doRegisterNotificationListener(listener, Arrays.asList(types));
231     }
232
233     @Override
234     public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
235             @NonNull T listener, QName[] types) {
236         List<Absolute> schemaPathList = Arrays.stream(types).map(qname -> Absolute.of(qname)).collect(toList());
237         return doRegisterNotificationListener(listener, schemaPathList);
238     }
239
240
241     @Override
242     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(CreateSubscriptionInput input) {
243         final ContainerNode nnInput = serializer.toNormalizedNodeRpcData(input);
244         return rpcService.invokeRpc(CREATE_SUBSCRIPTION, nnInput);
245     }
246
247     @Override
248     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Optional<Stream> oStream,
249             Optional<Filter> filter, Optional<Instant> startTime, Optional<Instant> stopTime) {
250
251         CreateSubscriptionInputBuilder inputBuilder = new CreateSubscriptionInputBuilder();
252         boolean replayIsSupported = false;
253         if (oStream.isPresent()) {
254             Stream stream = oStream.get();
255             if (stream.getName() != null) {
256                 inputBuilder.setStream(stream.getName());
257             }
258             replayIsSupported = Boolean.TRUE.equals(stream.requireReplaySupport());
259
260         }
261         filter.ifPresent(inputBuilder::setFilter);
262         if (startTime.isPresent()) {
263             if (replayIsSupported) {
264                 inputBuilder.setStartTime(getDateAndTime(startTime.get()));
265                 if (stopTime.isPresent()) {
266                     if (startTime.get().isBefore(stopTime.get())) {
267                         inputBuilder.setStopTime(getDateAndTime(stopTime.get()));
268                     } else {
269                         throw new IllegalArgumentException("stopTime must be later than startTime");
270                     }
271                 }
272             } else {
273                 throw new IllegalArgumentException("Replay not supported by this stream.");
274             }
275         }
276         return invokeCreateSubscription(inputBuilder.build());
277     }
278
279     @Override
280     public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Stream... streams) {
281         ListenableFuture<? extends DOMRpcResult> res;
282         if (streams.length == 0) {
283             return invokeCreateSubscription(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
284         } else if (streams.length == 1) {
285             return invokeCreateSubscription(Optional.of(streams[0]), Optional.empty(), Optional.empty(),
286                     Optional.empty());
287         } else {
288             for (Stream stream : streams) {
289                 res = invokeCreateSubscription(Optional.of(stream), Optional.empty(), Optional.empty(),
290                         Optional.empty());
291                 try {
292                     if (!res.get().getErrors().isEmpty()) {
293                         return res;
294                     }
295                 } catch (InterruptedException e) {
296                     LOG.warn("InterruptedException during rpc call", e);
297                     Thread.currentThread().interrupt();
298                     return res;
299                 } catch (ExecutionException e) {
300                     LOG.warn("ExecutionException during rpc call", e);
301                     return res;
302                 }
303             }
304         }
305         throw new IllegalStateException("Could never be reached"); //avoid eclipse error
306     }
307
308     @Override
309     public @NonNull Map<StreamKey, Stream> getNotificationStreamsAsMap() {
310         Optional<Streams> oStreams = readData(LogicalDatastoreType.OPERATIONAL, STREAMS_PATH, Streams.class);
311         return oStreams.map(Streams::nonnullStream).orElse(Collections.emptyMap());
312     }
313
314         /*
315          * @Override public BindingNormalizedNodeSerializer
316          * getBindingNormalizedNodeSerializer() { return serializer; }
317          */
318     private DateAndTime getDateAndTime(Instant dateTime) {
319         final String formattedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dateTime);
320         return new DateAndTime(formattedDate);
321     }
322 }