Add support for delete operation 75/126175/11
authorRenu Kumari <renu.kumari@bell.ca>
Wed, 8 Dec 2021 16:16:58 +0000 (11:16 -0500)
committerRenu Kumari <renu.kumari@bell.ca>
Wed, 15 Dec 2021 00:02:06 +0000 (00:02 +0000)
- Added new column operation in DB
- Updated existing rows to have 'UPDATE' value for operation field
- Added ability to process both V1 and V2 event schema
- Changed code and testcase to support operation field

Issue-ID: CPS-790
Signed-off-by: Renu Kumari <renu.kumari@bell.ca>
Change-Id: Ife24daa4b442e1499094b162727cc8704c25011e

15 files changed:
pom.xml
src/main/java/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListener.java
src/main/java/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapper.java
src/main/java/org/onap/cps/temporal/domain/NetworkData.java
src/main/java/org/onap/cps/temporal/domain/Operation.java [new file with mode: 0644]
src/main/java/org/onap/cps/temporal/service/NetworkDataServiceImpl.java
src/main/resources/db/changelog/changelog-master.xml
src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml [new file with mode: 0644]
src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerIntegrationSpec.groovy
src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/DataUpdatedEventListenerSpec.groovy
src/test/groovy/org/onap/cps/temporal/controller/event/listener/kafka/EventFixtures.groovy
src/test/groovy/org/onap/cps/temporal/controller/event/model/CpsDataUpdatedEventMapperSpec.groovy
src/test/groovy/org/onap/cps/temporal/repository/NetworkDataRepositorySpec.groovy
src/test/groovy/org/onap/cps/temporal/service/NetworkDataServiceImplSpec.groovy
src/test/resources/data/network-data-changes.sql

diff --git a/pom.xml b/pom.xml
index f3f101b..957be6c 100755 (executable)
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,7 @@
         <maven.build.timestamp.format>yyyyMMdd'T'HHmmss'Z'</maven.build.timestamp.format>
         <minimum-coverage>0.8</minimum-coverage>
         <!-- Application dependencies versions -->
-        <cps.version>2.0.0</cps.version>
+        <cps.version>2.1.0-SNAPSHOT</cps.version>
         <mapstruct.version>1.4.2.Final</mapstruct.version>
     </properties>
 
index 5fce94e..2ae675e 100644 (file)
@@ -6,13 +6,15 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *         http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
 
index d180509..cb06d5f 100644 (file)
@@ -25,10 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.time.OffsetDateTime;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
+import org.onap.cps.event.model.Content;
 import org.onap.cps.event.model.CpsDataUpdatedEvent;
 import org.onap.cps.event.model.Data;
 import org.onap.cps.temporal.controller.utils.DateTimeUtility;
 import org.onap.cps.temporal.domain.NetworkData;
+import org.onap.cps.temporal.domain.Operation;
 
 /**
  * Mapper for data updated event schema.
@@ -43,6 +45,7 @@ public abstract class CpsDataUpdatedEventMapper {
     @Mapping(source = "content.schemaSetName", target = "schemaSet")
     @Mapping(source = "content.anchorName", target = "anchor")
     @Mapping(source = "content.data", target = "payload")
+    @Mapping(source = "content.operation", target = "operation")
     @Mapping(expression = "java(null)", target = "createdTimestamp")
     public abstract NetworkData eventToEntity(CpsDataUpdatedEvent cpsDataUpdatedEvent);
 
@@ -50,6 +53,10 @@ public abstract class CpsDataUpdatedEventMapper {
         return data != null ? objectMapper.writeValueAsString(data) : null;
     }
 
+    Operation map(final Content.Operation inputOperation) {
+        return inputOperation == null ? Operation.UPDATE : Operation.valueOf(inputOperation.toString());
+    }
+
     OffsetDateTime map(final String timestamp) {
         return DateTimeUtility.toOffsetDateTime(timestamp);
     }
index 1537e4a..e147871 100644 (file)
@@ -6,13 +6,15 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *         http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
 
@@ -23,6 +25,8 @@ import java.io.Serializable;
 import java.time.OffsetDateTime;
 import javax.persistence.Column;
 import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
 import javax.persistence.Id;
 import javax.persistence.IdClass;
 import javax.persistence.Table;
@@ -70,6 +74,10 @@ public class NetworkData implements Serializable {
     private String schemaSet;
 
     @NotNull
+    @Column(updatable = false)
+    @Enumerated(EnumType.STRING)
+    private Operation operation;
+
     @Type(type = "jsonb")
     @Column(columnDefinition = "jsonb", updatable = false)
     private String payload;
diff --git a/src/main/java/org/onap/cps/temporal/domain/Operation.java b/src/main/java/org/onap/cps/temporal/domain/Operation.java
new file mode 100644 (file)
index 0000000..06b5099
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.temporal.domain;
+
+public enum Operation {
+    CREATE,
+    UPDATE,
+    DELETE
+}
index 3eba6fb..8d282b4 100644 (file)
@@ -25,6 +25,7 @@ import javax.validation.ValidationException;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.temporal.domain.NetworkData;
 import org.onap.cps.temporal.domain.NetworkDataId;
+import org.onap.cps.temporal.domain.Operation;
 import org.onap.cps.temporal.domain.SearchCriteria;
 import org.onap.cps.temporal.repository.NetworkDataRepository;
 import org.springframework.beans.factory.annotation.Value;
@@ -42,26 +43,35 @@ public class NetworkDataServiceImpl implements NetworkDataService {
     private final int maxPageSize;
 
     public NetworkDataServiceImpl(final NetworkDataRepository networkDataRepository,
-        final @Value("${app.query.response.max-page-size}") int maxPageSize) {
+                                  final @Value("${app.query.response.max-page-size}") int maxPageSize) {
         this.networkDataRepository = networkDataRepository;
         this.maxPageSize = maxPageSize;
     }
 
     @Override
     public NetworkData addNetworkData(final NetworkData networkData) {
+        validateNetworkData(networkData);
         final var savedNetworkData = networkDataRepository.save(networkData);
         if (savedNetworkData.getCreatedTimestamp() == null) {
             // Data already exists and can not be inserted
             final var id =
-                new NetworkDataId(
-                    networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor());
+                    new NetworkDataId(
+                            networkData.getObservedTimestamp(), networkData.getDataspace(), networkData.getAnchor());
             final Optional<NetworkData> existingNetworkData = networkDataRepository.findById(id);
             throw new ServiceException(
-                "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null)));
+                    "Failed to create network data. It already exists: " + (existingNetworkData.orElse(null)));
         }
         return savedNetworkData;
     }
 
+    private void validateNetworkData(final NetworkData networkData) {
+        if (networkData.getOperation() != Operation.DELETE
+                && networkData.getPayload() == null) {
+            throw new ValidationException(
+                    String.format("The operation %s must not have null payload", networkData.getOperation()));
+        }
+    }
+
     @Override
     public Slice<NetworkData> searchNetworkData(final SearchCriteria searchCriteria) {
         if (searchCriteria.getPageable().getPageSize() > maxPageSize) {
index 6ec36fb..7986d5e 100644 (file)
@@ -26,5 +26,5 @@
     <include file="db/changelog/schema/01-init-schema.xml"/>
     <include file="db/changelog/data/02-init-data.xml"/>
     <include file="db/changelog/schema/03-rename-network-data-timestamp-fields.xml"/>
-
+    <include file="db/changelog/schema/04-added-operation-field-in-network-data.xml"/>
 </databaseChangeLog>
diff --git a/src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml b/src/main/resources/db/changelog/schema/04-added-operation-field-in-network-data.xml
new file mode 100644 (file)
index 0000000..62b93b9
--- /dev/null
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ============LICENSE_START=======================================================
+  Copyright (c) 2021 Bell Canada.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+  SPDX-License-Identifier: Apache-2.0
+  ============LICENSE_END=========================================================
+-->
+
+<databaseChangeLog
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+        xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
+            http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd">
+
+    <changeSet id="4.1" author="cps">
+        <comment>Add operation field in network data timescale table
+            and set default 'UPDATE' value for existing data
+        </comment>
+        <addColumn tableName="network_data">
+            <column name="operation" type="VARCHAR(20)"
+                    remarks="Field to store the operation type">
+            </column>
+        </addColumn>
+        <update tableName="network_data">
+            <column name="operation" value="UPDATE"/>
+            <where>operation is NULL</where>
+        </update>
+        <rollback>
+            <dropColumn tableName="network_data">
+                <column name="operation"/>
+            </dropColumn>
+        </rollback>
+    </changeSet>
+    <changeSet id="4.2" author="cps">
+        <comment>Remove not null constraint from payload to support delete operation</comment>
+        <dropNotNullConstraint tableName="network_data" columnName="payload"/>
+    </changeSet>
+</databaseChangeLog>
index 2ba011f..01bb92d 100644 (file)
@@ -6,19 +6,20 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *         http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
-*/
+ */
 
 package org.onap.cps.temporal.controller.event.listener.kafka
 
-import groovy.util.logging.Slf4j
 import org.onap.cps.event.model.CpsDataUpdatedEvent
 import org.onap.cps.temporal.repository.containers.TimescaleContainer
 import org.springframework.beans.factory.annotation.Autowired
@@ -42,16 +43,16 @@ import java.util.concurrent.TimeUnit
  */
 @SpringBootTest
 @Testcontainers
-@Slf4j
 class DataUpdatedEventListenerIntegrationSpec extends Specification {
 
     @Shared
-    TimescaleContainer databaseTestContainer = TimescaleContainer.getInstance()
+    TimescaleContainer timescaleTestContainer = TimescaleContainer.getInstance()
 
     static kafkaTestContainer = new KafkaContainer()
     static {
         Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
     }
+
     def setupSpec() {
         kafkaTestContainer.start()
     }
@@ -65,60 +66,49 @@ class DataUpdatedEventListenerIntegrationSpec extends Specification {
     @Value('${app.listener.data-updated.topic}')
     String topic
 
-    // Define event data
-    def aTimestamp = EventFixtures.currentIsoTimestamp()
-    def aDataspace = 'my-dataspace'
-    def aSchemaSet = 'my-schema-set'
-    def anAnchor = 'my-anchor'
-
-    // Define sql queries for data validation
-    def sqlCount = "select count(*) from network_data"
-    def sqlSelect =  "select * from network_data"
-    def sqlWhereClause =
-            ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
-                    'and dataspace = ? ' +
-                    'and schema_set = ? ' +
-                    'and anchor = ?'
-    def sqlCountWithConditions = sqlCount + sqlWhereClause
-    def sqlSelectWithConditions = sqlSelect + sqlWhereClause
-
     def 'Processing a valid event'() {
-        given: "no event has been proceeded"
-            def initialRecordsCount =
-                    jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class,
-                            aTimestamp, aDataspace, aSchemaSet, anAnchor)
-            assert (initialRecordsCount == 0)
+        def aTimestamp = EventFixtures.currentIsoTimestamp()
+        given: 'no data exist for the anchor'
+            assert networkDataConditionalCount(aTimestamp, 'my-dataspace', 'my-schema-set', 'my-anchor') == 0
         when: 'an event is produced'
             def event =
                     EventFixtures.buildEvent(
-                            timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor)
+                            observedTimestamp: aTimestamp, dataspace: 'my-dataspace', schemaSet: 'my-schema-set',
+                            anchor: 'my-anchor', data: ['my-data-name': 'my-data-value'])
             this.kafkaTemplate.send(topic, event)
-        then: 'the event is proceeded'
+        then: 'the event is processed and data exists in database'
             def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2)
             pollingCondition.eventually {
-                def finalRecordsCount =
-                        jdbcTemplate.queryForObject(
-                                sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor)
-                assert (finalRecordsCount == 1)
+                assert networkDataConditionalCount(aTimestamp, 'my-dataspace', 'my-schema-set', 'my-anchor') == 1
             }
-            Map<String, Object> result =
-                    jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor)
-            log.debug("Data retrieved from db: {}", result)
     }
 
     def 'Processing an invalid event'() {
         given: 'the number of network data records if known'
-            def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class)
+            def initialRecordsCount = networkDataAllRecordCount()
         when: 'an invalid event is produced'
             this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null)
         then: 'the event is not proceeded and no more network data record is created'
             TimeUnit.SECONDS.sleep(3)
-            assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount)
+            networkDataAllRecordCount() == initialRecordsCount
+    }
+
+    def networkDataAllRecordCount() {
+        return jdbcTemplate.queryForObject('SELECT COUNT(1) FROM network_data', Integer.class)
+    }
+
+    def networkDataConditionalCount(observedTimestamp, dataspaceName, schemaSetName, anchorName) {
+        return jdbcTemplate.queryForObject('SELECT COUNT(1) FROM network_data ' +
+                'WHERE observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
+                'AND dataspace = ? ' +
+                'AND schema_set = ? ' +
+                'AND anchor = ?',
+                Integer.class, observedTimestamp, dataspaceName, schemaSetName, anchorName)
     }
 
     @DynamicPropertySource
     static void registerKafkaProperties(DynamicPropertyRegistry registry) {
-        registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers)
+        registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
     }
 
 }
index 055147f..e7e2570 100644 (file)
@@ -6,22 +6,26 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *         http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
 
 package org.onap.cps.temporal.controller.event.listener.kafka
 
 import org.mapstruct.factory.Mappers
+import org.onap.cps.event.model.Content
 import org.onap.cps.event.model.CpsDataUpdatedEvent
 import org.onap.cps.temporal.controller.event.listener.exception.InvalidEventEnvelopException
 import org.onap.cps.temporal.controller.event.model.CpsDataUpdatedEventMapper
+import org.onap.cps.temporal.domain.Operation
 import org.onap.cps.temporal.service.NetworkDataService
 import spock.lang.Specification
 
@@ -36,15 +40,9 @@ class DataUpdatedEventListenerSpec extends Specification {
     public static final String EXPECTED_SCHEMA_EXCEPTION_MESSAGE = 'urn:cps:org.onap.cps:data-updated-event-schema:v99'
 
     // Define event data
-    def anEventType = 'my-event-type'
     def anEventSchema = new URI('my-event-schema')
     def anEventSource = new URI('my-event-source')
     def aTimestamp = EventFixtures.currentIsoTimestamp()
-    def aDataspace = 'my-dataspace'
-    def aSchemaSet = 'my-schema-set'
-    def anAnchor = 'my-anchor'
-    def aDataName = 'my-data-name'
-    def aDataValue = 'my-data-value'
 
     // Define service mock
     def mockService = Mock(NetworkDataService)
@@ -56,21 +54,54 @@ class DataUpdatedEventListenerSpec extends Specification {
     def objectUnderTest = new DataUpdatedEventListener(mockService, mapper)
 
     def 'Event message consumption'() {
-        when: 'an event is received'
+        when: 'an event is received #scenario'
+            def defaultEventProperties = [observedTimestamp: aTimestamp,
+                                          dataspace        : 'my-dataspace',
+                                          schemaSet        : 'my-schema-set',
+                                          anchor           : 'my-anchor',
+                                          data             : ['my-data-name': 'my-data-value']]
+            def addOperationField = specifiedOperation != null ? [operation: Content.Operation.valueOf(specifiedOperation)] : []
             def event =
-                    EventFixtures.buildEvent(
-                            timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor,
-                            dataName: aDataName, dataValue: aDataValue)
+                    EventFixtures.buildEvent(defaultEventProperties + addOperationField)
             objectUnderTest.consume(event)
         then: 'network data service is requested to persisted the data change'
             1 * mockService.addNetworkData(
                     {
                         it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp)
-                        && it.getDataspace() == aDataspace
-                        && it.getSchemaSet() == aSchemaSet
-                        && it.getAnchor() == anAnchor
-                        && it.getPayload() == String.format('{"%s":"%s"}', aDataName, aDataValue)
-                        && it.getCreatedTimestamp() == null
+                                && it.getDataspace() == 'my-dataspace'
+                                && it.getSchemaSet() == 'my-schema-set'
+                                && it.getAnchor() == 'my-anchor'
+                                && it.getCreatedTimestamp() == null
+                                && it.getOperation() == expectedOperation
+                                && it.getPayload() == '{"my-data-name":"my-data-value"}'
+
+                    }
+            )
+        where:
+            scenario                  | specifiedOperation || expectedOperation
+            'without operation field' | null               || Operation.UPDATE
+            'create operation'        | 'CREATE'           || Operation.CREATE
+    }
+
+    def 'Delete Event message consumption'() {
+        when: 'an delete event is received'
+            def deleteEvent =
+                    EventFixtures.buildEvent([observedTimestamp: aTimestamp,
+                                              dataspace        : 'my-dataspace',
+                                              schemaSet        : 'my-schema-set',
+                                              anchor           : 'my-anchor',
+                                              operation        : Content.Operation.DELETE])
+            objectUnderTest.consume(deleteEvent)
+        then: 'network data service is requested to persisted the data change'
+            1 * mockService.addNetworkData(
+                    {
+                        it.getObservedTimestamp() == EventFixtures.toOffsetDateTime(aTimestamp)
+                                && it.getDataspace() == 'my-dataspace'
+                                && it.getSchemaSet() == 'my-schema-set'
+                                && it.getAnchor() == 'my-anchor'
+                                && it.getCreatedTimestamp() == null
+                                && it.getOperation() == Operation.DELETE
+                                && it.getPayload() == null
                     }
             )
     }
@@ -85,16 +116,16 @@ class DataUpdatedEventListenerSpec extends Specification {
             e.getInvalidFields().size() == 4
             e.getInvalidFields().contains(
                     new InvalidEventEnvelopException.InvalidField(
-                            UNEXPECTED,"schema", null, EXPECTED_SCHEMA_EXCEPTION_MESSAGE))
+                            UNEXPECTED, 'schema', null, EXPECTED_SCHEMA_EXCEPTION_MESSAGE))
             e.getInvalidFields().contains(
                     new InvalidEventEnvelopException.InvalidField(
-                            MISSING, "id", null, null))
+                            MISSING, 'id', null, null))
             e.getInvalidFields().contains(
                     new InvalidEventEnvelopException.InvalidField(
-                            UNEXPECTED, "source", null, EventFixtures.defaultEventSource.toString()))
+                            UNEXPECTED, 'source', null, EventFixtures.defaultEventSource.toString()))
             e.getInvalidFields().contains(
                     new InvalidEventEnvelopException.InvalidField(
-                            UNEXPECTED, "type", null, EventFixtures.defaultEventType))
+                            UNEXPECTED, 'type', null, EventFixtures.defaultEventType))
             e.getMessage().contains(e.getInvalidFields().toString())
     }
 
@@ -105,7 +136,7 @@ class DataUpdatedEventListenerSpec extends Specification {
                             .withId('my-id')
                             .withSchema(anEventSchema)
                             .withSource(anEventSource)
-                            .withType(anEventType)
+                            .withType('my-event-type')
             objectUnderTest.consume(invalidEvent)
         then: 'an exception is thrown with 2 invalid fields'
             def e = thrown(InvalidEventEnvelopException)
@@ -113,14 +144,14 @@ class DataUpdatedEventListenerSpec extends Specification {
             e.getInvalidFields().size() == 3
             e.getInvalidFields().contains(
                     new InvalidEventEnvelopException.InvalidField(
-                            UNEXPECTED, "schema", anEventSchema.toString(),
+                            UNEXPECTED, 'schema', anEventSchema.toString(),
                             EXPECTED_SCHEMA_EXCEPTION_MESSAGE))
             e.getInvalidFields().contains(
                     new InvalidEventEnvelopException.InvalidField(
-                            UNEXPECTED, "type", anEventType, EventFixtures.defaultEventType))
+                            UNEXPECTED, 'type', 'my-event-type', EventFixtures.defaultEventType))
             e.getInvalidFields().contains(
                     new InvalidEventEnvelopException.InvalidField(
-                            UNEXPECTED, "source", anEventSource.toString(),
+                            UNEXPECTED, 'source', anEventSource.toString(),
                             EventFixtures.defaultEventSource.toString()))
     }
 
index 7c4dee6..95c47ce 100644 (file)
@@ -6,13 +6,15 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *         http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
 
@@ -32,36 +34,40 @@ class EventFixtures {
 
     static DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
     static String defaultEventType = 'org.onap.cps.data-updated-event'
-    static URI defaultEventSchema = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1')
+    static URI defaultEventSchema = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v2')
     static URI defaultEventSource = new URI('urn:cps:org.onap.cps')
 
-    static CpsDataUpdatedEvent buildEvent(final Map map) {
-        CpsDataUpdatedEvent event =
-                new CpsDataUpdatedEvent()
-                        .withSchema(
-                                map.eventSchema != null ? new URI(map.eventSchema.toString()) : defaultEventSchema)
-                        .withId(
-                                map.id != null ? map.id.toString() : UUID.randomUUID().toString())
-                        .withType(
-                                map.eventType != null ? map.eventType.toString() : defaultEventType)
-                        .withSource(
-                                map.eventSource != null ? new URI(map.eventSource.toString()) : defaultEventSource)
-                        .withContent(
-                                new Content()
-                                        .withObservedTimestamp(
-                                                map.timestamp != null ? map.timestamp.toString() : currentTimestamp())
-                                        .withDataspaceName(
-                                                map.dataspace != null ? map.dataspace.toString() : 'a-dataspace')
-                                        .withSchemaSetName(
-                                                map.schemaSet != null ? map.schemaSet.toString() : 'a-schema-set')
-                                        .withAnchorName(
-                                                map.anchor != null ? map.anchor.toString() : 'an-anchor')
-                                        .withData(
-                                                new Data().withAdditionalProperty(
-                                                        map.dataName != null ? map.dataName.toString() : 'a-data-name',
-                                                        map.dataValue != null ? map.dataValue : 'a-data-value')))
+    static def defaultEventValue = [
+            eventSchema      : defaultEventSchema,
+            id               : UUID.randomUUID().toString(),
+            eventType        : defaultEventType,
+            eventSource      : defaultEventSource
+    ]
+
+    static CpsDataUpdatedEvent buildEvent(final Map inputMap) {
+        def mergedMap = defaultEventValue + inputMap
+
+        def dataExist = mergedMap.containsKey('data')
+        Data data = null
+        if (dataExist) {
+            data = new Data()
+            mergedMap.data.each { k, v -> data.withAdditionalProperty(k, v) }
+        }
+
+        def content = new Content()
+                .withObservedTimestamp(mergedMap.observedTimestamp)
+                .withDataspaceName(mergedMap.dataspace)
+                .withSchemaSetName(mergedMap.schemaSet)
+                .withAnchorName(mergedMap.anchor)
+                .withOperation(mergedMap.operation)
+                .withData(data)
 
-        return event
+        return new CpsDataUpdatedEvent()
+                .withSchema(mergedMap.eventSchema)
+                .withId(mergedMap.id)
+                .withType(mergedMap.eventType)
+                .withSource(mergedMap.eventSource)
+                .withContent(content)
     }
 
     static String currentIsoTimestamp() {
index a51c4fe..0c82782 100644 (file)
@@ -6,13 +6,15 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *         http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
 
@@ -24,6 +26,7 @@ import org.onap.cps.event.model.Content
 import org.onap.cps.event.model.CpsDataUpdatedEvent
 import org.onap.cps.event.model.Data
 import org.onap.cps.temporal.domain.NetworkData
+import org.onap.cps.temporal.domain.Operation
 import spock.lang.Specification
 
 import java.time.OffsetDateTime
@@ -75,7 +78,7 @@ class CpsDataUpdatedEventMapperSpec extends Specification {
         then: 'the result entity is not null'
             result != null
         and: 'the result entity payload is an empty json '
-            result.getPayload() == "{}"
+            result.getPayload() == '{}'
     }
 
     def 'Mapping an event whose content data is invalid'() {
@@ -103,20 +106,33 @@ class CpsDataUpdatedEventMapperSpec extends Specification {
                                             .withDataspaceName('a-dataspace')
                                             .withSchemaSetName('a-schema-set')
                                             .withAnchorName('an-anchor')
+                                            .withOperation(Content.Operation.CREATE)
                                             .withData(new Data().withAdditionalProperty(aDataName, aDataValue)))
         when: 'the event is mapped to an entity'
             NetworkData result = objectUnderTest.eventToEntity(event)
         then: 'the result entity is not null'
             result != null
         and: 'all result entity properties are the ones from the event'
-            result.getObservedTimestamp() ==
-                OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter)
-            result.getDataspace() == event.getContent().getDataspaceName()
-            result.getSchemaSet() == event.getContent().getSchemaSetName()
-            result.getAnchor() == event.getContent().getAnchorName()
+            with(result) {
+                observedTimestamp ==
+                    OffsetDateTime.parse(event.getContent().getObservedTimestamp(), isoTimestampFormatter)
+                dataspace == event.getContent().getDataspaceName()
+                schemaSet == event.getContent().getSchemaSetName()
+                operation == Operation.CREATE
+                anchor == event.getContent().getAnchorName()
+                createdTimestamp == null
+            }
             result.getPayload().contains(aDataValue)
             result.getPayload().contains(aDataValue)
-            result.getCreatedTimestamp() == null
+    }
+
+    def 'Mapping event without operation field' () {
+        given: 'event without operation field in content'
+            def cpsDataUpdatedEvent = new CpsDataUpdatedEvent().withContent(new Content())
+        when: 'event is mapped to network data'
+            def networkData = objectUnderTest.eventToEntity(cpsDataUpdatedEvent)
+        then: 'the operation field has default UPDATE value'
+            networkData.operation == Operation.UPDATE
     }
 
     private void assertEntityPropertiesAreNull(NetworkData networkData) {
index 2c7fc5e..d076f4d 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.cps.temporal.repository
 
 import org.onap.cps.temporal.domain.NetworkData
+import org.onap.cps.temporal.domain.Operation
 import org.onap.cps.temporal.repository.containers.TimescaleContainer
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase
@@ -51,8 +52,13 @@ class NetworkDataRepositorySpec extends Specification {
     @Autowired
     NetworkDataRepository networkDataRepository
 
-    def networkData = NetworkData.builder().observedTimestamp(observedTimestamp).dataspace(myDataspaceName)
-        .schemaSet(mySchemaSetName).anchor(myAnchorName).payload(payload).build()
+    def networkData = NetworkData.builder()
+        .observedTimestamp(observedTimestamp)
+        .dataspace(myDataspaceName)
+        .schemaSet(mySchemaSetName)
+        .anchor(myAnchorName)
+        .operation(Operation.CREATE)
+        .payload(payload).build()
 
     @Shared
     TimescaleContainer databaseTestContainer = TimescaleContainer.getInstance()
@@ -62,11 +68,14 @@ class NetworkDataRepositorySpec extends Specification {
             NetworkData savedData = networkDataRepository.save(networkData)
             TestTransaction.end()
         then: ' the saved Network Data is returned'
-            savedData.getDataspace() == networkData.getDataspace()
-            savedData.getSchemaSet() == networkData.getSchemaSet()
-            savedData.getAnchor() == networkData.getAnchor()
-            savedData.getPayload() == networkData.getPayload()
-            savedData.getObservedTimestamp() == networkData.getObservedTimestamp()
+            with(savedData) {
+                dataspace == networkData.getDataspace()
+                schemaSet == networkData.getSchemaSet()
+                anchor == networkData.getAnchor()
+                payload == networkData.getPayload()
+                observedTimestamp == networkData.getObservedTimestamp()
+                operation == networkData.operation
+            }
         and: ' createdTimestamp is auto populated by db '
             networkData.getCreatedTimestamp() == null
             savedData.getCreatedTimestamp() != null
index 2e04ca8..952faf7 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.cps.temporal.service
 
 import org.onap.cps.temporal.domain.NetworkDataId
+import org.onap.cps.temporal.domain.Operation
 import org.onap.cps.temporal.domain.SearchCriteria
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
@@ -51,10 +52,12 @@ class NetworkDataServiceImplSpec extends Specification {
     @Value('${app.query.response.max-page-size}')
     int maxPageSize
 
-    def networkData = new NetworkData()
+    def networkData = NetworkData.builder().operation(Operation.UPDATE).payload("{}").build()
 
     def 'Add network data successfully.'() {
-        given: 'network data repository is persisting network data it is asked to save'
+        given: 'a network data'
+            def networkData = NetworkData.builder().operation(operation).payload(payload).build()
+        and: 'network data repository is persisting network data'
             def persistedNetworkData = new NetworkData()
             persistedNetworkData.setCreatedTimestamp(OffsetDateTime.now())
             mockNetworkDataRepository.save(networkData) >> persistedNetworkData
@@ -64,17 +67,36 @@ class NetworkDataServiceImplSpec extends Specification {
             result == persistedNetworkData
             result.getCreatedTimestamp() != null
             networkData.getCreatedTimestamp() == null
+        where: 'the following data is used'
+            operation        | payload
+            Operation.CREATE | '{ "key" : "value" }'
+            Operation.UPDATE | '{ "key" : "value" }'
+            Operation.DELETE | null
+    }
+
+    def 'Error Handling: Payload missing for #operation'() {
+        given: 'a network data'
+            def networkData = NetworkData.builder().operation(operation).build()
+        when: 'a new network data is added'
+            objectUnderTest.addNetworkData(networkData)
+        then: 'Validation exception is thrown'
+            def exception = thrown(ValidationException)
+            exception.getMessage().contains('null payload')
+        where: 'following operations are used'
+            operation  << [ Operation.CREATE, Operation.UPDATE]
     }
 
     def 'Add network data fails because already added'() {
         given:
             'network data repository is not able to create data it is asked to persist ' +
-                'and reveals it with null created timestamp on network data entity'
+                    'and reveals it with null created timestamp on network data entity'
             def persistedNetworkData = new NetworkData()
             persistedNetworkData.setCreatedTimestamp(null)
             mockNetworkDataRepository.save(networkData) >> persistedNetworkData
         and: 'existing data can be retrieved'
             def existing = new NetworkData()
+            existing.setOperation(Operation.UPDATE)
+            existing.setPayload('{}')
             existing.setCreatedTimestamp(OffsetDateTime.now().minusYears(1))
             mockNetworkDataRepository.findById(_ as NetworkDataId) >> Optional.of(existing)
         when: 'a new network data is added'
@@ -86,35 +108,30 @@ class NetworkDataServiceImplSpec extends Specification {
     def 'Query network data by search criteria.'() {
         given: 'search criteria'
             def searchCriteria = SearchCriteria.builder()
-                .dataspaceName('my-dataspaceName')
-                .schemaSetName('my-schemaset')
-                .pagination(0, 10)
-                .build()
+                    .dataspaceName('my-dataspaceName')
+                    .schemaSetName('my-schemaset')
+                    .pagination(0, 10)
+                    .build()
         and: 'response from repository'
             def pageFromRepository = new PageImpl<>(Collections.emptyList(), searchCriteria.getPageable(), 10)
             mockNetworkDataRepository.findBySearchCriteria(searchCriteria) >> pageFromRepository
-
         when: 'search is executed'
             def resultPage = objectUnderTest.searchNetworkData(searchCriteria)
-
         then: 'data is fetched from repository and returned'
             resultPage == pageFromRepository
-
     }
 
     def 'Query network data with more than max page-size'() {
         given: 'search criteria with more than max page size'
             def searchCriteria = SearchCriteria.builder()
-                .dataspaceName('my-dataspaceName')
-                .schemaSetName('my-schemaset')
-                .pagination(0, maxPageSize + 1)
-                .build()
+                    .dataspaceName('my-dataspaceName')
+                    .schemaSetName('my-schemaset')
+                    .pagination(0, maxPageSize + 1)
+                    .build()
         when: 'search is executed'
             objectUnderTest.searchNetworkData(searchCriteria)
-
-        then: 'throws error'
+        then: 'a validation exception is thrown'
             thrown(ValidationException)
-
     }
 
 }
index ce15f19..6ed52d6 100644 (file)
@@ -5,25 +5,25 @@ COMMIT;
 -- Test pagination data
 -- Test created Before filter
 -- Test observed After Filter
-INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP)
+INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP)
 VALUES
-('2021-07-22 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-22 23:00:01.000'),
-('2021-07-22 01:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "down" }'::jsonb, '2021-07-22 23:00:01.000'),
-('2021-07-23 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-23 23:00:01.000');
+('2021-07-22 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'CREATE', '{ "status" : "up" }'::jsonb, '2021-07-22 23:00:01.000'),
+('2021-07-22 01:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "status" : "down" }'::jsonb, '2021-07-22 23:00:01.000'),
+('2021-07-23 00:00:01.000', 'DATASPACE-01', 'ANCHOR-01', 'SCHEMA-SET-01', 'DELETE', NULL, '2021-07-23 23:00:01.000');
 
 -- Test sorting on multiple fields
-INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP)
+INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP)
 VALUES
-('2021-07-24 00:00:01.000', 'DATASPACE-01', 'ANCHOR-02', 'SCHEMA-SET-01', '{ "status" : "up" }'::jsonb, '2021-07-24 23:00:01.000');
+('2021-07-24 00:00:01.000', 'DATASPACE-01', 'ANCHOR-02', 'SCHEMA-SET-01', 'UPDATE', '{ "status" : "up" }'::jsonb, '2021-07-24 23:00:01.000');
 
 
 -- Test simple payload filter on multiple field
-INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, PAYLOAD, CREATED_TIMESTAMP)
+INSERT INTO NETWORK_DATA (OBSERVED_TIMESTAMP, DATASPACE, ANCHOR, SCHEMA_SET, OPERATION, PAYLOAD, CREATED_TIMESTAMP)
 VALUES
-('2021-07-24 00:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "01", "status" : "up" } ]}'::jsonb, '2021-07-24 01:00:01.000'),
-('2021-07-24 01:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "01", "status" : "down" } ]}'::jsonb, '2021-07-24 02:00:01.000'),
-('2021-07-24 02:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "02", "status" : "up" } ]}'::jsonb, '2021-07-24 03:00:01.000'),
-('2021-07-24 03:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', '{ "interfaces": [ { "id" : "03", "status" : "up" } ]}'::jsonb, '2021-07-24 04:00:01.000');
+('2021-07-24 00:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'CREATE', '{ "interfaces": [ { "id" : "01", "status" : "up" } ]}'::jsonb, '2021-07-24 01:00:01.000'),
+('2021-07-24 01:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "interfaces": [ { "id" : "01", "status" : "down" } ]}'::jsonb, '2021-07-24 02:00:01.000'),
+('2021-07-24 02:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'UPDATE', '{ "interfaces": [ { "id" : "02", "status" : "up" } ]}'::jsonb, '2021-07-24 03:00:01.000'),
+('2021-07-24 03:00:01.000', 'DATASPACE-02', 'ANCHOR-01', 'SCHEMA-SET-01', 'DELETE', NULL, '2021-07-24 04:00:01.000');
 
 COMMIT;