2 * ============LICENSE_START=======================================================
3 * ONAP : ccsdk features
4 * ================================================================================
5 * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom;
24 import static java.util.stream.Collectors.toList;
25 import com.google.common.util.concurrent.FluentFuture;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import java.time.Instant;
28 import java.time.format.DateTimeFormatter;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.List;
34 import java.util.Map.Entry;
35 import java.util.Objects;
36 import java.util.Optional;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import org.eclipse.jdt.annotation.NonNull;
41 import org.eclipse.jdt.annotation.Nullable;
42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfDomAccessor;
43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorImpl;
44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.mdsal.MdsalApi;
45 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
46 //import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
50 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
51 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
52 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
53 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
54 import org.opendaylight.mdsal.dom.api.DOMRpcService;
55 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.create.subscription.input.Filter;
58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.yang.binding.DataObject;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.common.QName;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
70 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
74 public class NetconfDomAccessorImpl extends NetconfAccessorImpl implements NetconfDomAccessor {
76 private static final Logger LOG = LoggerFactory.getLogger(NetconfDomAccessorImpl.class);
78 private static final QName CREATE_SUBSCRIPTION = QName.create(CreateSubscriptionInput.QNAME, "create-subscription");
79 private static final YangInstanceIdentifier STREAMS_PATH =
80 YangInstanceIdentifier.builder().node(Netconf.QNAME).node(Streams.QNAME).build();
82 protected final DOMDataBroker dataBroker;
83 protected final DOMMountPoint mountpoint;
84 protected final DomContext domContext;
85 private final DOMNotificationService notificationService;
86 private final BindingNormalizedNodeSerializer serializer;
87 private final DOMRpcService rpcService;
90 public NetconfDomAccessorImpl(NetconfAccessorImpl accessor, DOMDataBroker domDataBroker,
91 DOMMountPoint mountpoint, DomContext domContext) {
93 this.dataBroker = Objects.requireNonNull(domDataBroker);
94 this.mountpoint = Objects.requireNonNull(mountpoint);
95 this.domContext = Objects.requireNonNull(domContext);
96 this.serializer = domContext.getBindingNormalizedNodeSerializer();
97 this.rpcService = MdsalApi.getMountpointService(mountpoint, DOMRpcService.class);
98 this.notificationService = MdsalApi.getMountpointService(mountpoint, DOMNotificationService.class);
102 public DOMDataBroker getDataBroker() {
107 public DOMMountPoint getMountpoint() {
112 public <T extends DataObject> Optional<T> readData(LogicalDatastoreType dataStoreType, YangInstanceIdentifier path,
114 LOG.debug("Read to object datastore:{} path:{}", dataStoreType, path);
117 return convertNormalizedNode(domContext.getBindingNormalizedNodeSerializer(),
118 readDataNode(dataStoreType, path), path, clazz);
119 } catch (CanNotConvertException e) {
120 LOG.info("Incomplete read to class transaction {} {}", dataStoreType, path, e);
121 return Optional.empty();
126 public Optional<NormalizedNode> readDataNode(LogicalDatastoreType dataStoreType,
127 YangInstanceIdentifier path) {
128 LOG.debug("Read to node datastore:{} path:{}", dataStoreType, path);
130 // Don't use try with resource because the implicit close of this construct is not handled
131 // correctly by underlying opendaylight NETCONF service
132 DOMDataTreeReadTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
134 FluentFuture<Optional<NormalizedNode>> foData = readOnlyTransaction.read(dataStoreType, path);
136 Optional<NormalizedNode> data = foData.get(120, TimeUnit.SECONDS);
137 LOG.trace("read is done - {} ", foData.isDone());
139 } catch (InterruptedException e) {
140 LOG.debug("Incomplete read to node transaction {} {}", dataStoreType, path, e);
141 Thread.currentThread().interrupt();
142 return Optional.empty();
143 } catch (ExecutionException | TimeoutException e) {
144 LOG.debug("Incomplete read to node transaction {} {}", dataStoreType, path, e);
145 return Optional.empty();
146 } catch (IllegalArgumentException e) {
147 LOG.debug("IllegalArgumentException occurred, Incomplete read to node transaction {} {}", dataStoreType, path, e);
148 return Optional.empty();
153 public Optional<NormalizedNode> readControllerDataNode(LogicalDatastoreType dataStoreType,
154 YangInstanceIdentifier path) {
155 LOG.debug("Read to controller node datastore:{} path:{}", dataStoreType, path);
157 DOMDataTreeReadTransaction readOnlyTransaction = this.getControllerDOMDataBroker().newReadOnlyTransaction();
159 FluentFuture<Optional<NormalizedNode>> foData = readOnlyTransaction.read(dataStoreType, path);
161 Optional<NormalizedNode> data = foData.get(120, TimeUnit.SECONDS);
162 LOG.trace("read is done - {} ", foData.isDone());
164 } catch (InterruptedException e) {
165 LOG.debug("Incomplete read to node transaction {} {}", dataStoreType, path, e);
166 Thread.currentThread().interrupt();
167 return Optional.empty();
168 } catch (ExecutionException | TimeoutException e) {
169 LOG.debug("Incomplete read to node transaction {} {}", dataStoreType, path, e);
170 return Optional.empty();
174 @SuppressWarnings("unchecked")
175 private static <T extends DataObject> Optional<T> convertNormalizedNode(BindingNormalizedNodeSerializer serializer,
176 Optional<NormalizedNode> oData, YangInstanceIdentifier path, Class<T> clazz)
177 throws CanNotConvertException {
178 if (oData.isPresent()) {
179 NormalizedNode data = oData.get();
180 LOG.debug("convertNormalizedNode data identifier: {} data nodetype: {}", data.getIdentifier(),
181 data.getIdentifier().getNodeType());
183 Entry<InstanceIdentifier<?>, DataObject> entry = serializer.fromNormalizedNode(path, data);
185 LOG.debug("object identifier: {}", entry.getKey());
186 DataObject value = entry.getValue();
187 if (clazz.isInstance(value)) {
188 return Optional.of((T) value);
190 throw new CanNotConvertException("Unexpected class. Expected:" + clazz.getName() + " provided:"
191 + value.getClass().getName() + " Nodetype:" + data.getIdentifier().getNodeType());
194 throw new CanNotConvertException(
195 "No object created for path:" + path + " Nodetype:" + data.getIdentifier().getNodeType());
198 throw new CanNotConvertException("No data received for path:" + path);
203 public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
204 @NonNull T listener, Collection<Absolute> types) {
205 LOG.debug("Begin register listener for Mountpoint {}", mountpoint.getIdentifier().toString());
207 final ListenerRegistration<DOMNotificationListener> ranListenerRegistration =
208 notificationService.registerNotificationListener(listener, types);
210 LOG.debug("End registration listener for Mountpoint {} Listener: {} Result: {}",
211 mountpoint.getIdentifier().toString(), notificationService, ranListenerRegistration);
213 return ranListenerRegistration;
217 public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
218 @NonNull T listener, Absolute[] types) {
219 return doRegisterNotificationListener(listener, Arrays.asList(types));
223 public @NonNull <T extends DOMNotificationListener> ListenerRegistration<DOMNotificationListener> doRegisterNotificationListener(
224 @NonNull T listener, QName[] types) {
225 List<Absolute> schemaPathList = Arrays.stream(types).map(qname -> Absolute.of(qname)).collect(toList());
226 return doRegisterNotificationListener(listener, schemaPathList);
231 public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(CreateSubscriptionInput input) {
232 final ContainerNode nnInput = serializer.toNormalizedNodeRpcData(input);
233 return rpcService.invokeRpc(CREATE_SUBSCRIPTION, nnInput);
237 public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Optional<Stream> oStream,
238 Optional<Filter> filter, Optional<Instant> startTime, Optional<Instant> stopTime) {
240 CreateSubscriptionInputBuilder inputBuilder = new CreateSubscriptionInputBuilder();
241 boolean replayIsSupported = false;
242 if (oStream.isPresent()) {
243 Stream stream = oStream.get();
244 if (stream.getName() != null) {
245 inputBuilder.setStream(stream.getName());
247 replayIsSupported = Boolean.TRUE.equals(stream.requireReplaySupport());
250 filter.ifPresent(inputBuilder::setFilter);
251 if (startTime.isPresent()) {
252 if (replayIsSupported) {
253 inputBuilder.setStartTime(getDateAndTime(startTime.get()));
254 if (stopTime.isPresent()) {
255 if (startTime.get().isBefore(stopTime.get())) {
256 inputBuilder.setStopTime(getDateAndTime(stopTime.get()));
258 throw new IllegalArgumentException("stopTime must be later than startTime");
262 throw new IllegalArgumentException("Replay not supported by this stream.");
265 return invokeCreateSubscription(inputBuilder.build());
269 public ListenableFuture<? extends DOMRpcResult> invokeCreateSubscription(Stream... streams) {
270 ListenableFuture<? extends DOMRpcResult> res;
271 if (streams.length == 0) {
272 return invokeCreateSubscription(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
273 } else if (streams.length == 1) {
274 return invokeCreateSubscription(Optional.of(streams[0]), Optional.empty(), Optional.empty(),
277 for (Stream stream : streams) {
278 res = invokeCreateSubscription(Optional.of(stream), Optional.empty(), Optional.empty(),
281 if (!res.get().getErrors().isEmpty()) {
284 } catch (InterruptedException e) {
285 LOG.warn("InterruptedException during rpc call", e);
286 Thread.currentThread().interrupt();
288 } catch (ExecutionException e) {
289 LOG.warn("ExecutionException during rpc call", e);
294 throw new IllegalStateException("Could never be reached"); //avoid eclipse error
298 public @NonNull Map<StreamKey, Stream> getNotificationStreamsAsMap() {
299 Optional<Streams> oStreams = readData(LogicalDatastoreType.OPERATIONAL, STREAMS_PATH, Streams.class);
300 return oStreams.map(Streams::nonnullStream).orElse(Collections.emptyMap());
304 * @Override public BindingNormalizedNodeSerializer
305 * getBindingNormalizedNodeSerializer() { return serializer; }
307 private DateAndTime getDateAndTime(Instant dateTime) {
308 final String formattedDate = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(dateTime);
309 return new DateAndTime(formattedDate);