/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2021 Bell Canada.
+ * Copyright (c) 2021-2022 Bell Canada.
+ * Modifications Copyright (C) 2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.notification;
+import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import javax.annotation.PostConstruct;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.api.CpsAdminService;
+import org.onap.cps.spi.model.Anchor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@Slf4j
+@RequiredArgsConstructor
public class NotificationService {
- private NotificationProperties notificationProperties;
- private NotificationPublisher notificationPublisher;
- private CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory;
- private NotificationErrorHandler notificationErrorHandler;
+ private final NotificationProperties notificationProperties;
+ private final NotificationPublisher notificationPublisher;
+ private final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory;
+ private final NotificationErrorHandler notificationErrorHandler;
+ private final CpsAdminService cpsAdminService;
private List<Pattern> dataspacePatterns;
- /**
- * Create an instance of Notification Subscriber.
- *
- * @param notificationProperties properties for notification
- * @param notificationPublisher notification Publisher
- * @param cpsDataUpdatedEventFactory to create CPSDataUpdatedEvent
- * @param notificationErrorHandler error handler
- */
- public NotificationService(
- final NotificationProperties notificationProperties,
- final NotificationPublisher notificationPublisher,
- final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory,
- final NotificationErrorHandler notificationErrorHandler) {
+ @PostConstruct
+ public void init() {
log.info("Notification Properties {}", notificationProperties);
- this.notificationProperties = notificationProperties;
- this.notificationPublisher = notificationPublisher;
- this.cpsDataUpdatedEventFactory = cpsDataUpdatedEventFactory;
- this.notificationErrorHandler = notificationErrorHandler;
this.dataspacePatterns = getDataspaceFilterPatterns(notificationProperties);
}
/**
* Process Data Updated Event and publishes the notification.
*
- * @param dataspaceName dataspace name
- * @param anchorName anchor name
+ * @param dataspaceName dataspaceName
+ * @param anchorName anchorName
+ * @param xpath xpath of changed data node
+ * @param operation operation
+ * @param observedTimestamp observedTimestamp
* @return future
*/
@Async("notificationExecutor")
- public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName) {
- log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName);
+ public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName,
+ final String xpath, final Operation operation, final OffsetDateTime observedTimestamp) {
+
+ final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
+ log.debug("process data updated event for anchor '{}'", anchor);
try {
if (shouldSendNotification(dataspaceName)) {
final var cpsDataUpdatedEvent =
- cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, anchorName);
+ cpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(anchor,
+ observedTimestamp, getRootNodeOperation(xpath, operation));
log.debug("data updated event to be published {}", cpsDataUpdatedEvent);
notificationPublisher.sendNotification(cpsDataUpdatedEvent);
}
CPS operation should not fail if sending event fails for any reason.
*/
notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
- exception, dataspaceName, anchorName);
+ exception, anchor, xpath, operation);
}
return CompletableFuture.completedFuture(null);
}
.anyMatch(pattern -> pattern.matcher(dataspaceName).find());
}
+ private Operation getRootNodeOperation(final String xpath, final Operation operation) {
+ return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE;
+ }
+
+ private static boolean isRootXpath(final String xpath) {
+ return "/".equals(xpath) || "".equals(xpath);
+ }
+
+ private static boolean isRootContainerNodeXpath(final String xpath) {
+ return 0 == xpath.lastIndexOf('/');
+ }
+
}