4c362add3107ac5b31b67da1f71becd23d05f7f4
[cps/cps-temporal.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (c) 2021 Bell Canada.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17 */
18
19 package org.onap.cps.temporal.controller.event.listener.kafka
20
21 import groovy.util.logging.Slf4j
22 import org.onap.cps.event.model.CpsDataUpdatedEvent
23 import org.onap.cps.temporal.repository.containers.TimescaleContainer
24 import org.springframework.beans.factory.annotation.Autowired
25 import org.springframework.beans.factory.annotation.Value
26 import org.springframework.boot.test.context.SpringBootTest
27 import org.springframework.jdbc.core.JdbcTemplate
28 import org.springframework.kafka.core.KafkaTemplate
29 import org.springframework.test.context.DynamicPropertyRegistry
30 import org.springframework.test.context.DynamicPropertySource
31 import org.testcontainers.containers.KafkaContainer
32 import spock.lang.Shared
33 import spock.lang.Specification
34 import spock.util.concurrent.PollingConditions
35
36 import java.util.concurrent.TimeUnit
37
38 /**
39  * Integration test specification for data updated event listener.
40  * This integration test is running database and kafka dependencies as docker containers.
41  */
42 @SpringBootTest
43 @Slf4j
44 class DataUpdatedEventListenerIntegrationSpec extends Specification {
45
46     @Shared
47     def databaseTestContainer = TimescaleContainer.getInstance()
48
49     static kafkaTestContainer = new KafkaContainer()
50     static {
51         Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
52     }
53
54     def setupSpec() {
55         databaseTestContainer.start()
56         kafkaTestContainer.start()
57     }
58
59     @Autowired
60     KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate
61
62     @Autowired
63     JdbcTemplate jdbcTemplate
64
65     @Value('${app.listener.data-updated.topic}')
66     String topic
67
68     // Define event data
69     def aTimestamp = EventFixtures.currentIsoTimestamp()
70     def aDataspace = 'my-dataspace'
71     def aSchemaSet = 'my-schema-set'
72     def anAnchor = 'my-anchor'
73
74     // Define sql queries for data validation
75     def sqlCount = "select count(*) from network_data"
76     def sqlSelect =  "select * from network_data"
77     def sqlWhereClause =
78             ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
79                     'and dataspace = ? ' +
80                     'and schema_set = ? ' +
81                     'and anchor = ?'
82     def sqlCountWithConditions = sqlCount + sqlWhereClause
83     def sqlSelectWithConditions = sqlSelect + sqlWhereClause
84
85     def 'Processing a valid event'() {
86         given: "no event has been proceeded"
87             def initialRecordsCount =
88                     jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class,
89                             aTimestamp, aDataspace, aSchemaSet, anAnchor)
90             assert (initialRecordsCount == 0)
91         when: 'an event is produced'
92             def event =
93                     EventFixtures.buildEvent(
94                             timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor)
95             this.kafkaTemplate.send(topic, event)
96         then: 'the event is proceeded'
97             def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2)
98             pollingCondition.eventually {
99                 def finalRecordsCount =
100                         jdbcTemplate.queryForObject(
101                                 sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor)
102                 assert (finalRecordsCount == 1)
103             }
104             Map<String, Object> result =
105                     jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor)
106             log.debug("Data retrieved from db: {}", result)
107     }
108
109     def 'Processing an invalid event'() {
110         given: 'the number of network data records if known'
111             def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class)
112         when: 'an invalid event is produced'
113             this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null)
114         then: 'the event is not proceeded and no more network data record is created'
115             TimeUnit.SECONDS.sleep(3)
116             assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount)
117     }
118
119     @DynamicPropertySource
120     static void registerKafkaProperties(DynamicPropertyRegistry registry) {
121         registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers)
122     }
123
124 }