Add kafka listener for data updated events 00/121700/10
authorBruno Sakoto <bruno.sakoto@bell.ca>
Fri, 4 Jun 2021 11:49:14 +0000 (07:49 -0400)
committerBruno Sakoto <bruno.sakoto@bell.ca>
Mon, 5 Jul 2021 22:37:15 +0000 (18:37 -0400)
See "Running via Docker Compose" section from README.md file to have an
example of event processing

Issue-ID: CPS-371
Signed-off-by: Bruno Sakoto <bruno.sakoto@bell.ca>
Change-Id: Id3abfa32fb04e07102a5f28e6e43a9b533391188

25 files changed:
README.md
docker-compose.yml
pom.xml
src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java [new file with mode: 0644]
src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java [new file with mode: 0644]
src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java [new file with mode: 0644]
src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java [new file with mode: 0644]
src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java [new file with mode: 0644]
src/main/java/org/onap/cps/temporal/controller/web/QueryController.java [moved from src/main/java/org/onap/cps/temporal/controller/QueryController.java with 96% similarity]
src/main/java/org/onap/cps/temporal/domain/NetworkData.java
src/main/java/org/onap/cps/temporal/domain/NetworkDataId.java
src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
src/main/java/org/onap/cps/temporal/service/ServiceException.java [new file with mode: 0644]
src/main/resources/application-sasl-ssl-kafka.yml [new file with mode: 0644]
src/main/resources/application.yml
src/main/resources/logback.xml [new file with mode: 0644]
src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy [new file with mode: 0644]
src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy [new file with mode: 0644]
src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy [new file with mode: 0644]
src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy [new file with mode: 0644]
src/test/groovy/org/onap/cps/temporal/controller/web/QueryControllerSpec.groovy [moved from src/test/groovy/org/onap/cps/temporal/controller/QueryControllerSpec.groovy with 96% similarity]
src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy
src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy
src/test/java/org/onap/cps/temporal/architecture/LayeredArchitectureTest.java
src/test/resources/application.yml

index d7d200e..42e1d5f 100755 (executable)
--- a/README.md
+++ b/README.md
@@ -20,12 +20,48 @@ mvn clean install -Pcps-temporal-docker -Ddocker.repository.push=
 ## Running via Docker Compose
 
 `docker-compose.yml` file is provided to be run with `docker-compose` tool and local image previously built.
-It starts both Postgres Timescale database and CPS Temporal service.
+It starts following services:
 
-Execute following command from project root folder:
+* CPS Temporal service (cps-temporal)
+* Postgres Timescale database (timescaledb)
+* Kafka broker (zookeeper and kafka)
+
+Execute following command from project root folder to start all services:
+
+```bash
+docker-compose up
+```
+
+Then, use `kafkacat` tool to produce a data updated event into the Kafka topic:
+
+```bash
+docker run -i --rm --network=host edenhill/kafkacat:1.6.0 -b localhost:19092 -t cps.cfg-state-events -D/ -P <<EOF
+{
+    "schema": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT",
+    "id": "38aa6cc6-264d-4ede-b534-18f5c1f403ea",
+    "source": "urn:cps:org.onap.cps",
+    "type": "org.onap.cps.data-updated-event",
+    "content": {
+        "observedTimestamp": "2021-06-09T13:00:00.123-0400",
+        "dataspaceName": "my-dataspace",
+        "schemaSetName": "my-schema-set",
+        "anchorName": "my-anchor",
+        "data": {
+            "interface": {
+                "name": "itf-1",
+                "status": "up"
+            }
+        }
+    }
+}
+EOF
+```
+
+Finally, verify that CPS Temporal data is persisted as expected:
 
 ```bash
-docker-compose up -d
+psql -h localhost -p 5433 -d cpstemporaldb -U cpstemporal -c \
+  "select * from network_data order by created_timestamp desc limit 1"
 ```
 
 ## Alternative local db setup
index fae1cbc..fe863fd 100755 (executable)
@@ -1,5 +1,6 @@
 # ============LICENSE_START=======================================================
 # Copyright (C) 2021 Nordix Foundation.
+# Modifications Copyright (C) 2021 Bell Canada.
 # ================================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -28,16 +29,39 @@ services:
       DB_PORT: 5432
       DB_USERNAME: cpstemporal
       DB_PASSWORD: cpstemporal
+      KAFKA_BOOTSTRAP_SERVER: kafka:9092
+
     restart: unless-stopped
     depends_on:
       - timescaledb
+      - kafka
 
   timescaledb:
     container_name: timescaledb
     image: timescale/timescaledb:2.1.1-pg13
     ports:
-      - '5432:5432'
+      - '5433:5432'
     environment:
       POSTGRES_DB: cpstemporaldb
-      POSTGRES_USER: ${DB_USERNAME}
-      POSTGRES_PASSWORD: ${DB_PASSWORD}
+      POSTGRES_USER: cpstemporal
+      POSTGRES_PASSWORD: cpstemporal
+
+  zookeeper:
+    image: confluentinc/cp-zookeeper:6.1.1
+    container_name: zookeeper
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+
+  kafka:
+    image: confluentinc/cp-kafka:6.1.1
+    container_name: kafka
+    ports:
+      - "19092:19092"
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
diff --git a/pom.xml b/pom.xml
index c38b565..7f2318b 100755 (executable)
--- a/pom.xml
+++ b/pom.xml
         <maven.build.timestamp.format>yyyyMMdd'T'HHmmss'Z'</maven.build.timestamp.format>
         <minimum-coverage>0.8</minimum-coverage>
         <!-- Application dependencies versions -->
-        <spring-boot-dependencies.version>2.3.8.RELEASE</spring-boot-dependencies.version>
+        <cps.events.version>1.1.0-SNAPSHOT</cps.events.version>
         <hibernate-types.version>2.10.0</hibernate-types.version>
         <liquibase-core.version>4.3.2</liquibase-core.version>
+        <lombok.version>1.18.20</lombok.version>
+        <mapstruct.version>1.4.2.Final</mapstruct.version>
+        <spring-boot-dependencies.version>2.3.8.RELEASE</spring-boot-dependencies.version>
         <!-- Tests dependencies versions -->
-        <spock-bom.version>2.0-M4-groovy-3.0</spock-bom.version>
+        <archunit-junit5.version>0.18.0</archunit-junit5.version>
         <groovy.version>3.0.7</groovy.version>
         <junit-jupiter.version>1.15.2</junit-jupiter.version>
+        <spock-bom.version>2.0-M4-groovy-3.0</spock-bom.version>
         <testcontainers-postgresql.version>1.15.2</testcontainers-postgresql.version>
-        <archunit-junit5.version>0.18.0</archunit-junit5.version>
         <!-- Plugins and plugins dependencies versions -->
-        <spring-boot-maven-plugin.version>2.3.3.RELEASE</spring-boot-maven-plugin.version>
+        <bug-pattern.version>1.5.0</bug-pattern.version>
+        <cps.checkstyle.version>1.0.1</cps.checkstyle.version>
+        <cps.spotbugs.version>1.0.1</cps.spotbugs.version>
         <gmavenplus-plugin.version>1.12.1</gmavenplus-plugin.version>
         <jib-maven-plugin.version>3.0.0</jib-maven-plugin.version>
         <oparent.version>3.2.0</oparent.version>
-        <cps.checkstyle.version>1.0.1</cps.checkstyle.version>
-        <cps.spotbugs.version>1.0.1</cps.spotbugs.version>
         <spotbugs-maven-plugin.version>4.1.3</spotbugs-maven-plugin.version>
-        <spotbugs.version>4.2.0</spotbugs.version>
         <spotbugs.slf4j.version>1.8.0-beta4</spotbugs.slf4j.version>
-        <bug-pattern.version>1.5.0</bug-pattern.version>
+        <spotbugs.version>4.2.0</spotbugs.version>
+        <spring-boot-maven-plugin.version>2.3.3.RELEASE</spring-boot-maven-plugin.version>
     </properties>
 
     <dependencyManagement>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.mapstruct</groupId>
+            <artifactId>mapstruct</artifactId>
+            <version>${mapstruct.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.cps</groupId>
+            <artifactId>cps-events</artifactId>
+            <version>${cps.events.version}</version>
+        </dependency>
         <!-- Runtime dependencies-->
         <dependency>
             <groupId>org.postgresql</groupId>
             <version>${testcontainers-postgresql.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>1.15.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>com.tngtech.archunit</groupId>
             <artifactId>archunit-junit5</artifactId>
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.1</version>
+                <configuration>
+                    <annotationProcessorPaths>
+                        <path>
+                            <groupId>org.projectlombok</groupId>
+                            <artifactId>lombok</artifactId>
+                            <version>${lombok.version}</version>
+                        </path>
+                        <path>
+                            <groupId>org.mapstruct</groupId>
+                            <artifactId>mapstruct-processor</artifactId>
+                            <version>${mapstruct.version}</version>
+                        </path>
+                    </annotationProcessorPaths>
+                </configuration>
+            </plugin>
             <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/EventListenerException.java
new file mode 100644 (file)
index 0000000..a9d1ce2
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.exception;
+
+/**
+ * Class representing a listener exception related to system event error.
+ */
+public class EventListenerException extends RuntimeException {
+
+    public EventListenerException(final String message) {
+        super(message);
+    }
+
+    public EventListenerException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/exception/InvalidEventEnvelopException.java
new file mode 100644 (file)
index 0000000..df4e756
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.exception;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Class representing an invalid event envelop exception.
+ */
+@Getter
+public class InvalidEventEnvelopException extends EventListenerException {
+
+    private final List<InvalidField> invalidFields = new ArrayList<>();
+
+    public InvalidEventEnvelopException(final String message) {
+        super(message);
+    }
+
+    public void addInvalidField(final InvalidField invalidField) {
+        this.invalidFields.add(invalidField);
+    }
+
+    public boolean hasInvalidFields() {
+        return ! this.invalidFields.isEmpty();
+    }
+
+    @Override
+    public String getMessage() {
+        return String.format("%s. invalidFields: %s", super.getMessage(), this.invalidFields.toString());
+    }
+
+    @AllArgsConstructor
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class InvalidField implements Serializable {
+
+        private static final long serialVersionUID = -7118283787669377391L;
+
+        private final ErrorType errorType;
+        private final String fieldName;
+        private final String actualValue;
+        private final String expectedValue;
+
+        public enum ErrorType {
+            UNEXPECTED, MISSING
+        }
+
+    }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
new file mode 100644 (file)
index 0000000..79c9d92
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka;
+
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING;
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.event.model.CpsDataUpdatedEvent;
+import org.onap.cps.temporal.controller.event.listener.exception.EventListenerException;
+import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException;
+import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper;
+import org.onap.cps.temporal.service.NetworkDataService;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+/**
+ * Listener for data updated events.
+ */
+@Component
+@Slf4j
+public class DataUpdatedEventListener {
+
+    private static final URI EVENT_SOURCE;
+
+    static {
+        try {
+            EVENT_SOURCE = new URI("urn:cps:org.onap.cps");
+        } catch (final URISyntaxException e) {
+            throw new EventListenerException("Invalid URI for event source.", e);
+        }
+    }
+
+    private static final String EVENT_TYPE = "org.onap.cps.data-updated-event";
+
+    private final NetworkDataService networkDataService;
+    private final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper;
+
+    /**
+     * Constructor.
+     */
+    public DataUpdatedEventListener(
+            final NetworkDataService networkDataService, final CpsDataUpdatedEventMapper cpsDataUpdatedEventMapper) {
+        this.networkDataService = networkDataService;
+        this.cpsDataUpdatedEventMapper = cpsDataUpdatedEventMapper;
+    }
+
+    /**
+     * Consume the specified event.
+     *
+     * @param cpsDataUpdatedEvent the data updated event to be consumed and persisted.
+     */
+    @KafkaListener(topics = "${app.listener.data-updated.topic}", errorHandler = "dataUpdatedEventListenerErrorHandler")
+    public void consume(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
+
+        log.debug("Receiving {} ...", cpsDataUpdatedEvent);
+
+        // Validate event envelop
+        validateEventEnvelop(cpsDataUpdatedEvent);
+
+        // Map event to entity
+        final var networkData = this.cpsDataUpdatedEventMapper.eventToEntity(cpsDataUpdatedEvent);
+        log.debug("Persisting {} ...", networkData);
+
+        // Persist entity
+        final var persistedNetworkData = this.networkDataService.addNetworkData(networkData);
+        log.debug("Persisted {}", persistedNetworkData);
+
+    }
+
+    private void validateEventEnvelop(final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
+
+        final var invalidEventEnvelopException = new InvalidEventEnvelopException("Validation failure");
+
+        // Schema
+        if (cpsDataUpdatedEvent.getSchema() == null) {
+            invalidEventEnvelopException.addInvalidField(
+                    new InvalidEventEnvelopException.InvalidField(
+                            MISSING, "schema", null,
+                            CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
+                                    .value()));
+        }
+        // Id
+        if (!StringUtils.hasText(cpsDataUpdatedEvent.getId())) {
+            invalidEventEnvelopException.addInvalidField(
+                    new InvalidEventEnvelopException.InvalidField(
+                            MISSING, "id", null, null));
+        }
+        // Source
+        if (cpsDataUpdatedEvent.getSource() == null || !cpsDataUpdatedEvent.getSource().equals(EVENT_SOURCE)) {
+            invalidEventEnvelopException.addInvalidField(
+                    new InvalidEventEnvelopException.InvalidField(
+                            UNEXPECTED, "source",
+                            cpsDataUpdatedEvent.getSource() != null
+                                    ? cpsDataUpdatedEvent.getSource().toString() : null, EVENT_SOURCE.toString()));
+        }
+        // Type
+        if (cpsDataUpdatedEvent.getType() == null || !cpsDataUpdatedEvent.getType().equals(EVENT_TYPE)) {
+            invalidEventEnvelopException.addInvalidField(
+                    new InvalidEventEnvelopException.InvalidField(
+                            UNEXPECTED, "type", cpsDataUpdatedEvent.getType(), EVENT_TYPE));
+        }
+
+        if (invalidEventEnvelopException.hasInvalidFields()) {
+            throw invalidEventEnvelopException;
+        }
+
+    }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java b/src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerErrorHandler.java
new file mode 100644 (file)
index 0000000..7a4ee7f
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.listener.KafkaListenerErrorHandler;
+import org.springframework.kafka.listener.ListenerExecutionFailedException;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+/**
+ * Class responsible to handle errors for data updated event listener.
+ */
+@Component
+@Slf4j
+class DataUpdatedEventListenerErrorHandler implements KafkaListenerErrorHandler {
+
+    @Override
+    public Object handleError(final Message<?> message, final ListenerExecutionFailedException exception) {
+        log.error(
+                "Failed to process message {}. Error cause is {}.",
+                message,
+                exception.getCause() != null ? exception.getCause().toString() : null,
+                exception);
+        return exception;
+    }
+
+}
diff --git a/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java b/src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
new file mode 100644 (file)
index 0000000..9ef25d5
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.onap.cps.event.model.CpsDataUpdatedEvent;
+import org.onap.cps.event.model.Data;
+import org.onap.cps.temporal.domain.NetworkData;
+
+/**
+ * Mapper for data updated event schema.
+ */
+@Mapper(componentModel = "spring")
+public abstract class CpsDataUpdatedEventMapper {
+
+    private static final DateTimeFormatter ISO_TIMESTAMP_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+    @Mapping(source = "content.observedTimestamp", target = "observedTimestamp")
+    @Mapping(source = "content.dataspaceName", target = "dataspace")
+    @Mapping(source = "content.schemaSetName", target = "schemaSet")
+    @Mapping(source = "content.anchorName", target = "anchor")
+    @Mapping(source = "content.data", target = "payload")
+    @Mapping(expression = "java(null)", target = "createdTimestamp")
+    public abstract NetworkData eventToEntity(CpsDataUpdatedEvent cpsDataUpdatedEvent);
+
+    String map(final Data data) throws JsonProcessingException {
+        return data != null ? new ObjectMapper().writeValueAsString(data) : null;
+    }
+
+    OffsetDateTime map(final String timestamp) {
+        return timestamp != null ? OffsetDateTime.parse(timestamp, ISO_TIMESTAMP_FORMATTER) : null;
+    }
+
+}
@@ -16,7 +16,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.cps.temporal.controller;
+package org.onap.cps.temporal.controller.web;
 
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;
index aa2ce95..1537e4a 100644 (file)
@@ -48,27 +48,30 @@ import org.hibernate.annotations.TypeDef;
 @TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)
 public class NetworkData implements Serializable {
 
-    private static final long serialVersionUID = -8032810412816532433L;
+    private static final long serialVersionUID = 8886477871334560919L;
 
     @Id
+    @NotNull
     @Column
     private OffsetDateTime observedTimestamp;
 
     @Id
+    @NotNull
     @Column
     private String dataspace;
 
     @Id
+    @NotNull
     @Column
     private String anchor;
 
     @NotNull
-    @Column
+    @Column(updatable = false)
     private String schemaSet;
 
     @NotNull
     @Type(type = "jsonb")
-    @Column(columnDefinition = "jsonb")
+    @Column(columnDefinition = "jsonb", updatable = false)
     private String payload;
 
     @CreationTimestamp
index e9742e2..18c4dcf 100644 (file)
@@ -32,10 +32,10 @@ import lombok.NoArgsConstructor;
 @EqualsAndHashCode
 public class NetworkDataId implements Serializable {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -1039604338648260766L;
 
+    private OffsetDateTime observedTimestamp;
     private String dataspace;
     private String anchor;
-    private OffsetDateTime observedTimestamp;
 
-}
\ No newline at end of file
+}
index 2e7afb2..687ba85 100644 (file)
 
 package org.onap.cps.temporal.service;
 
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.temporal.domain.NetworkData;
+import org.onap.cps.temporal.domain.NetworkDataId;
 import org.onap.cps.temporal.repository.NetworkDataRepository;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 /**
  * Service implementation for Network Data.
  */
-@Component
+@Service
 @Slf4j
 public class NetworkDataServiceImpl implements NetworkDataService {
 
-    @Autowired
-    NetworkDataRepository networkDataRepository;
+    private final NetworkDataRepository networkDataRepository;
+
+    public NetworkDataServiceImpl(final NetworkDataRepository networkDataRepository) {
+        this.networkDataRepository = networkDataRepository;
+    }
 
     @Override
     public NetworkData addNetworkData(final NetworkData networkData) {
-        return networkDataRepository.save(networkData);
+        final var savedNetworkData = networkDataRepository.save(networkData);
+        if (savedNetworkData.getCreatedTimestamp() == null) {
+            // Data already exists and can not be inserted
+            final var id =
+                    new NetworkDataId(
+                            networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor());
+            final Optional<NetworkData> existingNetworkData = networkDataRepository.findById(id);
+            throw new ServiceException(
+                    "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null)));
+        }
+        return savedNetworkData;
     }
+
 }
diff --git a/src/main/java/org/onap/cps/temporal/service/ServiceException.java b/src/main/java/org/onap/cps/temporal/service/ServiceException.java
new file mode 100644 (file)
index 0000000..b9d7184
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.service;
+
+/**
+ * Class representing a service exception related to business error.
+ */
+public class ServiceException extends RuntimeException {
+
+    /**
+     * Instantiate a service exception with the specified message.
+     * @param message the exception message
+     */
+    public ServiceException(final String message) {
+        super(message);
+    }
+
+}
diff --git a/src/main/resources/application-sasl-ssl-kafka.yml b/src/main/resources/application-sasl-ssl-kafka.yml
new file mode 100644 (file)
index 0000000..fdc6458
--- /dev/null
@@ -0,0 +1,31 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 Bell Canada.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+# Spring profile configuration for sasl ssl Kafka
+
+spring:
+    kafka:
+        bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+        security:
+            protocol: SASL_SSL
+        ssl:
+            trust-store-type: JKS
+            trust-store-location: ${KAFKA_SSL_TRUST_STORE_LOCATION}
+            trust-store-password: ${KAFKA_SSL_TRUST_STORE_PASSWORD}
+        properties:
+            sasl.mechanism: SCRAM-SHA-512
+            sasl.jaas.config: ${KAFKA_SASL_JAAS_CONFIG}
+            ssl.endpoint.identification.algorithm:
index d4c799c..5fe30b0 100755 (executable)
@@ -27,10 +27,26 @@ spring:
         change-log: classpath:/db/changelog/changelog-master.xml
     jpa:
         properties:
-            hibernate:
-                dialect: org.hibernate.dialect.PostgreSQLDialect
+            hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect
+            hibernate.format_sql: true
+            hibernate.generate_statistics: false
+    kafka:
+        bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+        security:
+            protocol: PLAINTEXT
+        consumer:
+            group-id: ${KAFKA_CONSUMER_GROUP_ID:cps-temporal-group}
+            # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
+            # See https://docs.spring.io/spring-kafka/docs/2.5.11.RELEASE/reference/html/#error-handling-deserializer
+            # and https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/
+            key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+            value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+            properties:
+                spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+                spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+                spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent
 
-logging:
-    level:
-        org:
-            springframework: INFO
+app:
+    listener:
+        data-updated:
+            topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events}
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
new file mode 100644 (file)
index 0000000..a75b7aa
--- /dev/null
@@ -0,0 +1,43 @@
+<!--
+  ============LICENSE_START=======================================================
+  Copyright (c) 2021 Bell Canada.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  ============LICENSE_END=========================================================
+-->
+
+<configuration>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d - %highlight(%-5level) [%-20.20thread] %cyan(%logger{36}) - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <!-- Logger for cps classes -->
+    <logger name="org.onap.cps" level="info"/>
+
+    <!-- Logger for sql statements. Set to info to disable, debug to enable -->
+    <logger name="org.hibernate.SQL" level="info"/>
+
+    <!-- Logger for sql bindings. Set to info to disable, to trace to enable -->
+    <logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="info"/>
+
+    <!-- Logger for hibernate statistics. Set to warn to disable, to info to enable -->
+    <logger name="org.hibernate.engine.internal.StatisticalLoggingSessionEventListener" level="warn"/>
+
+    <root level="info">
+        <appender-ref ref="STDOUT" />
+    </root>
+
+</configuration>
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy
new file mode 100644 (file)
index 0000000..4c362ad
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+*/
+
+package org.onap.cps.temporal.controller.event.listener.kafka
+
+import groovy.util.logging.Slf4j
+import org.onap.cps.event.model.CpsDataUpdatedEvent
+import org.onap.cps.temporal.repository.containers.TimescaleContainer
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.jdbc.core.JdbcTemplate
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.test.context.DynamicPropertyRegistry
+import org.springframework.test.context.DynamicPropertySource
+import org.testcontainers.containers.KafkaContainer
+import spock.lang.Shared
+import spock.lang.Specification
+import spock.util.concurrent.PollingConditions
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * Integration test specification for data updated event listener.
+ * This integration test is running database and kafka dependencies as docker containers.
+ */
+@SpringBootTest
+@Slf4j
+class DataUpdatedEventListenerIntegrationSpec extends Specification {
+
+    @Shared
+    def databaseTestContainer = TimescaleContainer.getInstance()
+
+    static kafkaTestContainer = new KafkaContainer()
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
+    }
+
+    def setupSpec() {
+        databaseTestContainer.start()
+        kafkaTestContainer.start()
+    }
+
+    @Autowired
+    KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate
+
+    @Autowired
+    JdbcTemplate jdbcTemplate
+
+    @Value('${app.listener.data-updated.topic}')
+    String topic
+
+    // Define event data
+    def aTimestamp = EventFixtures.currentIsoTimestamp()
+    def aDataspace = 'my-dataspace'
+    def aSchemaSet = 'my-schema-set'
+    def anAnchor = 'my-anchor'
+
+    // Define sql queries for data validation
+    def sqlCount = "select count(*) from network_data"
+    def sqlSelect =  "select * from network_data"
+    def sqlWhereClause =
+            ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
+                    'and dataspace = ? ' +
+                    'and schema_set = ? ' +
+                    'and anchor = ?'
+    def sqlCountWithConditions = sqlCount + sqlWhereClause
+    def sqlSelectWithConditions = sqlSelect + sqlWhereClause
+
+    def 'Processing a valid event'() {
+        given: "no event has been proceeded"
+            def initialRecordsCount =
+                    jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class,
+                            aTimestamp, aDataspace, aSchemaSet, anAnchor)
+            assert (initialRecordsCount == 0)
+        when: 'an event is produced'
+            def event =
+                    EventFixtures.buildEvent(
+                            timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor)
+            this.kafkaTemplate.send(topic, event)
+        then: 'the event is proceeded'
+            def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2)
+            pollingCondition.eventually {
+                def finalRecordsCount =
+                        jdbcTemplate.queryForObject(
+                                sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor)
+                assert (finalRecordsCount == 1)
+            }
+            Map<String, Object> result =
+                    jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor)
+            log.debug("Data retrieved from db: {}", result)
+    }
+
+    def 'Processing an invalid event'() {
+        given: 'the number of network data records if known'
+            def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class)
+        when: 'an invalid event is produced'
+            this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null)
+        then: 'the event is not proceeded and no more network data record is created'
+            TimeUnit.SECONDS.sleep(3)
+            assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount)
+    }
+
+    @DynamicPropertySource
+    static void registerKafkaProperties(DynamicPropertyRegistry registry) {
+        registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers)
+    }
+
+}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
new file mode 100644 (file)
index 0000000..d3a407c
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka
+
+import org.mapstruct.factory.Mappers
+import org.onap.cps.event.model.CpsDataUpdatedEvent
+import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException
+import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper
+import org.onap.cps.temporal.service.NetworkDataService
+import spock.lang.Specification
+
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.MISSING
+import static org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException.InvalidField.ErrorType.UNEXPECTED
+
+/**
+ * Test specification for data updated event listener.
+ */
+class DataUpdatedEventListenerSpec extends Specification {
+
+    // Define event data
+    def anEventType = 'my-event-type'
+    def anEventSource = new URI('my-event-source')
+    def aTimestamp = EventFixtures.currentIsoTimestamp()
+    def aDataspace = 'my-dataspace'
+    def aSchemaSet = 'my-schema-set'
+    def anAnchor = 'my-anchor'
+    def aDataName = 'my-data-name'
+    def aDataValue = 'my-data-value'
+
+    // Define service mock
+    def mockService = Mock(NetworkDataService)
+
+    // Define mapper
+    def mapper = Mappers.getMapper(CpsDataUpdatedEventMapper.class)
+
+    // Define listener under test
+    def objectUnderTest = new DataUpdatedEventListener(mockService, mapper)
+
+    def 'Event message consumption'() {
+        when: 'an event is received'
+            def event =
+                    EventFixtures.buildEvent(
+                            timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor,
+                            dataName: aDataName, dataValue: aDataValue)
+            objectUnderTest.consume(event)
+        then: 'network data service is requested to persisted the data change'
+            1 * mockService.addNetworkData(
+                    {
+                        it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp)
+                        && it.getDataspace() == aDataspace
+                        && it.getSchemaSet() == aSchemaSet
+                        && it.getAnchor() == anAnchor
+                        && it.getPayload() == String.format('{"%s":"%s"}', aDataName, aDataValue)
+                        && it.getCreatedTimestamp() == null
+                    }
+            )
+    }
+
+    def 'Event message consumption fails because of missing envelop'() {
+        when: 'an event without envelop information is received'
+            def invalidEvent = new CpsDataUpdatedEvent().withSchema(null)
+            objectUnderTest.consume(invalidEvent)
+        then: 'an exception is thrown with 4 invalid fields'
+            def e = thrown(InvalidEventEnvelopException)
+            e.getInvalidFields().size() == 4
+            e.getInvalidFields().contains(
+                    new InvalidEventEnvelopException.InvalidField(
+                            MISSING,"schema", null,
+                            CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
+                                    .value()))
+            e.getInvalidFields().contains(
+                    new InvalidEventEnvelopException.InvalidField(
+                            MISSING, "id", null, null))
+            e.getInvalidFields().contains(
+                    new InvalidEventEnvelopException.InvalidField(
+                            UNEXPECTED, "source", null, EventFixtures.defaultEventSource.toString()))
+            e.getInvalidFields().contains(
+                    new InvalidEventEnvelopException.InvalidField(
+                            UNEXPECTED, "type", null, EventFixtures.defaultEventType))
+            e.getMessage().contains(e.getInvalidFields().toString())
+    }
+
+    def 'Event message consumption fails because of invalid envelop'() {
+        when: 'an event with an invalid envelop is received'
+            def invalidEvent =
+                    new CpsDataUpdatedEvent()
+                            .withId('my-id').withSource(anEventSource).withType(anEventType)
+            objectUnderTest.consume(invalidEvent)
+        then: 'an exception is thrown with 2 invalid fields'
+            def e = thrown(InvalidEventEnvelopException)
+            e.getInvalidFields().size() == 2
+            e.getInvalidFields().contains(
+                    new InvalidEventEnvelopException.InvalidField(
+                            UNEXPECTED, "type", anEventType, EventFixtures.defaultEventType))
+            e.getInvalidFields().contains(
+                    new InvalidEventEnvelopException.InvalidField(
+                            UNEXPECTED, "source", anEventSource.toString(),
+                            EventFixtures.defaultEventSource.toString()))
+    }
+
+}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
new file mode 100644 (file)
index 0000000..44a28de
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.listener.kafka
+
+import org.onap.cps.event.model.Content
+import org.onap.cps.event.model.CpsDataUpdatedEvent
+import org.onap.cps.event.model.Data
+
+import java.time.OffsetDateTime
+import java.time.format.DateTimeFormatter
+
+/**
+ * This class contains utility fixtures methods for building and manipulating event data.
+ */
+class EventFixtures {
+
+    static DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
+    static String defaultEventType = 'org.onap.cps.data-updated-event'
+    static URI defaultEventSource = new URI('urn:cps:org.onap.cps')
+
+    static CpsDataUpdatedEvent buildEvent(final Map map) {
+        CpsDataUpdatedEvent event =
+                new CpsDataUpdatedEvent()
+                        .withId(
+                                map.id != null ? map.id.toString() : UUID.randomUUID().toString())
+                        .withType(
+                                map.eventType != null ? map.eventType.toString() : defaultEventType)
+                        .withSource(
+                                map.eventSource != null ? new URI(map.eventSource.toString()) : defaultEventSource)
+                        .withContent(
+                                new Content()
+                                        .withObservedTimestamp(
+                                                map.timestamp != null ? map.timestamp.toString() : currentTimestamp())
+                                        .withDataspaceName(
+                                                map.dataspace != null ? map.dataspace.toString() : 'a-dataspace')
+                                        .withSchemaSetName(
+                                                map.schemaSet != null ? map.schemaSet.toString() : 'a-schema-set')
+                                        .withAnchorName(
+                                                map.anchor != null ? map.anchor.toString() : 'an-anchor')
+                                        .withData(
+                                                new Data().withAdditionalProperty(
+                                                        map.dataName != null ? map.dataName.toString() : 'a-data-name',
+                                                        map.dataValue != null ? map.dataValue : 'a-data-value')))
+
+        return event
+    }
+
+    static String currentIsoTimestamp() {
+        return isoTimestampFormatter.format(OffsetDateTime.now())
+    }
+
+    static OffsetDateTime toOffsetDateTime(String timestamp) {
+        return OffsetDateTime.parse(timestamp, isoTimestampFormatter)
+    }
+
+}
diff --git a/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy b/src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy
new file mode 100644 (file)
index 0000000..132ff6d
--- /dev/null
@@ -0,0 +1,131 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.controller.event.model
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import org.mapstruct.factory.Mappers
+import org.onap.cps.event.model.Content
+import org.onap.cps.event.model.CpsDataUpdatedEvent
+import org.onap.cps.event.model.Data
+import org.onap.cps.temporal.domain.NetworkData
+import spock.lang.Specification
+
+import java.time.OffsetDateTime
+import java.time.format.DateTimeFormatter
+
+/**
+ * Test specification for data updated event mapper.
+ */
+class CpsDataUpdatedEventMapperSpec extends Specification {
+
+    def objectUnderTest = Mappers.getMapper(CpsDataUpdatedEventMapper.class);
+
+    def 'Mapping a null event'() {
+        given: 'a null event'
+            def event = null
+        when: 'the event is mapped to an entity'
+            NetworkData result = objectUnderTest.eventToEntity(event)
+        then: 'the result entity is null'
+            result == null
+    }
+
+    def 'Mapping an event whose properties are null'() {
+        given: 'an event whose properties are null'
+            def event = new CpsDataUpdatedEvent()
+        when: 'the event is mapped to an entity'
+            NetworkData result = objectUnderTest.eventToEntity(event)
+        then: 'the result entity is not null'
+            result != null
+        and: 'all result entity properties are null'
+            assertEntityPropertiesAreNull(result)
+    }
+
+    def 'Mapping an event whose content properties are null'() {
+        given: 'an event whose content properties are null'
+            def event = new CpsDataUpdatedEvent().withContent(new Content())
+        when: 'the event is mapped to an entity'
+            NetworkData result = objectUnderTest.eventToEntity(event)
+        then: 'the result entity is not null'
+            result != null
+        and: 'all result entity properties are null'
+            assertEntityPropertiesAreNull(result)
+    }
+
+    def 'Mapping an event whose content data is empty'() {
+        given: 'an event whose content data is empty'
+            def event = new CpsDataUpdatedEvent().withContent(new Content().withData(new Data()))
+        when: 'the event is mapped to an entity'
+            NetworkData result = objectUnderTest.eventToEntity(event)
+        then: 'the result entity is not null'
+            result != null
+        and: 'the result entity payload is an empty json '
+            result.getPayload() == "{}"
+    }
+
+    def 'Mapping an event whose content data is invalid'() {
+        given: 'an event whose content data is invalid'
+            def event =
+                    new CpsDataUpdatedEvent().withContent(new Content().withData(
+                            new Data().withAdditionalProperty(null, null)))
+        when: 'the event is mapped to an entity'
+            NetworkData result = objectUnderTest.eventToEntity(event)
+        then: 'an runtime exception is thrown'
+            def e = thrown(RuntimeException)
+            e.getCause() instanceof JsonProcessingException
+    }
+
+    def 'Mapping a valid complete event'() {
+        given: 'a valid complete event'
+            def isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
+            def aDataName = 'a-data-name'
+            def aDataValue = 'a-data-value'
+            def event =
+                    new CpsDataUpdatedEvent()
+                            .withContent(
+                                    new Content()
+                                            .withObservedTimestamp(isoTimestampFormatter.format(OffsetDateTime.now()))
+                                            .withDataspaceName('a-dataspace')
+                                            .withSchemaSetName('a-schema-set')
+                                            .withAnchorName('an-anchor')
+                                            .withData(new Data().withAdditionalProperty(aDataName, aDataValue)))
+        when: 'the event is mapped to an entity'
+            NetworkData result = objectUnderTest.eventToEntity(event)
+        then: 'the result entity is not null'
+            result != null
+        and: 'all result entity properties are the ones from the event'
+            result.getObservedTimestamp() ==
+                    OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter)
+            result.getDataspace() == event.getContent().getDataspaceName()
+            result.getSchemaSet() == event.getContent().getSchemaSetName()
+            result.getAnchor() == event.getContent().getAnchorName()
+            result.getPayload().contains(aDataValue)
+            result.getPayload().contains(aDataValue)
+            result.getCreatedTimestamp() == null
+    }
+
+    private void assertEntityPropertiesAreNull(NetworkData networkData) {
+        assert networkData.getObservedTimestamp() == null
+        assert networkData.getDataspace() == null
+        assert networkData.getSchemaSet() == null
+        assert networkData.getAnchor() == null
+        assert networkData.getPayload() == null
+        assert networkData.getCreatedTimestamp() == null
+    }
+
+}
@@ -16,7 +16,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.cps.temporal.controller
+package org.onap.cps.temporal.controller.web
 
 import spock.lang.Specification
 
@@ -34,4 +34,4 @@ class QueryControllerSpec extends Specification {
             ! response.empty
     }
 
-}
\ No newline at end of file
+}
index ec976ee..f66b35e 100644 (file)
@@ -29,6 +29,9 @@ import spock.lang.Specification
 
 import java.time.OffsetDateTime
 
+/**
+ * Test specification for network data repository.
+ */
 @SpringBootTest
 @Testcontainers
 class NetworkDataRepositorySpec extends Specification {
index 70ac2bc..9847f54 100644 (file)
 
 package org.onap.cps.temporal.service
 
+import org.onap.cps.temporal.domain.NetworkDataId
+
 import java.time.OffsetDateTime
 import org.onap.cps.temporal.domain.NetworkData
 import org.onap.cps.temporal.repository.NetworkDataRepository
 import spock.lang.Specification
 
+/**
+ * Test specification for network data service.
+ */
 class NetworkDataServiceImplSpec extends Specification {
 
-    def objectUnderTest = new NetworkDataServiceImpl()
-
     def mockNetworkDataRepository = Mock(NetworkDataRepository)
 
+    def objectUnderTest = new NetworkDataServiceImpl(mockNetworkDataRepository)
+
     def networkData = new NetworkData()
 
-    def setup() {
-        objectUnderTest.networkDataRepository = mockNetworkDataRepository
+    def 'Add network data successfully.'() {
+        given: 'network data repository is persisting network data it is asked to save'
+            def persistedNetworkData = new NetworkData()
+            persistedNetworkData.setCreatedTimestamp(OffsetDateTime.now())
+            mockNetworkDataRepository.save(networkData) >> persistedNetworkData
+        when: 'a new network data is added'
+            def result = objectUnderTest.addNetworkData(networkData)
+        then: 'result network data is the one that has been persisted'
+            result == persistedNetworkData
+            result.getCreatedTimestamp() != null
+            networkData.getCreatedTimestamp() == null
     }
 
-    def 'Add network data in timeseries database.'() {
+    def 'Add network data fails because already added'() {
+        given: 'network data repository is not able to create data it is asked to persist ' +
+                'and reveals it with null created timestamp on network data entity'
+            def persistedNetworkData = new NetworkData()
+            persistedNetworkData.setCreatedTimestamp(null)
+            mockNetworkDataRepository.save(networkData) >> persistedNetworkData
+        and: 'existing data can be retrieved'
+            def existing = new NetworkData()
+            existing.setCreatedTimestamp(OffsetDateTime.now().minusYears(1))
+            mockNetworkDataRepository.findById(_ as NetworkDataId) >> Optional.of(existing)
         when: 'a new network data is added'
             objectUnderTest.addNetworkData(networkData)
-        then: ' repository service is called with the correct parameters'
-            1 * mockNetworkDataRepository.save(networkData)
+        then: 'network service exception is thrown'
+            thrown(ServiceException)
     }
 
 }
index d47e8a5..a70e914 100644 (file)
@@ -70,4 +70,4 @@ public class LayeredArchitectureTest {
                     .should().onlyHaveDependentClassesThat()
                     .resideInAnyPackage(SERVICE_PACKAGE, REPOSITORY_PACKAGE);
 
-}
\ No newline at end of file
+}
index afaff6c..3ac13a9 100644 (file)
@@ -20,17 +20,37 @@ server:
 spring:
     datasource:
         url: ${DB_URL}
-        password: ${DB_PASSWORD}
         username: ${DB_USERNAME}
+        password: ${DB_PASSWORD}
     liquibase:
         change-log: classpath:/db/changelog/changelog-master.xml
     jpa:
-        open-in-view: false
         properties:
-            hibernate:
-                dialect: org.hibernate.dialect.PostgreSQLDialect
+            hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect
+            hibernate.format_sql: true
+            hibernate.generate_statistics: false
+    kafka:
+        bootstrap-servers: localhost:9092
+        security:
+            protocol: PLAINTEXT
+        consumer:
+            group-id: cps-temporal-group
+            # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
+            # See https://docs.spring.io/spring-kafka/docs/2.5.11.RELEASE/reference/html/#error-handling-deserializer
+            # and https://www.confluent.io/blog/spring-kafka-can-your-kafka-consumers-handle-a-poison-pill/
+            key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+            value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+            auto-offset-reset: earliest
+            properties:
+                spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+                spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+                spring.json.value.default.type: org.onap.cps.event.model.CpsDataUpdatedEvent
+        # Following is not cps-temporal configuration. It is configuration for the producer used for integration tests
+        producer:
+            key-serializer: org.apache.kafka.common.serialization.StringSerializer
+            value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
 
-logging:
-    level:
-        org:
-            springframework: INFO
+app:
+    listener:
+        data-updated:
+            topic: cps.cfg-state-events