2 * ============LICENSE_START=======================================================
3 * Copyright (c) 2021 Bell Canada.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.cps.temporal.controller.event.listener.kafka
21 import groovy.util.logging.Slf4j
22 import org.onap.cps.event.model.CpsDataUpdatedEvent
23 import org.onap.cps.temporal.repository.containers.TimescaleContainer
24 import org.springframework.beans.factory.annotation.Autowired
25 import org.springframework.beans.factory.annotation.Value
26 import org.springframework.boot.test.context.SpringBootTest
27 import org.springframework.jdbc.core.JdbcTemplate
28 import org.springframework.kafka.core.KafkaTemplate
29 import org.springframework.test.context.DynamicPropertyRegistry
30 import org.springframework.test.context.DynamicPropertySource
31 import org.testcontainers.containers.KafkaContainer
32 import org.testcontainers.spock.Testcontainers
33 import spock.lang.Shared
34 import spock.lang.Specification
35 import spock.util.concurrent.PollingConditions
37 import java.util.concurrent.TimeUnit
40 * Integration test specification for data updated event listener.
41 * This integration test is running database and kafka dependencies as docker containers.
46 class DataUpdatedEventListenerIntegrationSpec extends Specification {
49 TimescaleContainer databaseTestContainer = TimescaleContainer.getInstance()
51 static kafkaTestContainer = new KafkaContainer()
53 Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
56 kafkaTestContainer.start()
60 KafkaTemplate<String, CpsDataUpdatedEvent> kafkaTemplate
63 JdbcTemplate jdbcTemplate
65 @Value('${app.listener.data-updated.topic}')
69 def aTimestamp = EventFixtures.currentIsoTimestamp()
70 def aDataspace = 'my-dataspace'
71 def aSchemaSet = 'my-schema-set'
72 def anAnchor = 'my-anchor'
74 // Define sql queries for data validation
75 def sqlCount = "select count(*) from network_data"
76 def sqlSelect = "select * from network_data"
78 ' where observed_timestamp = to_timestamp(?, \'YYYY-MM-DD"T"HH24:MI:SS.USTZHTZM\') ' +
79 'and dataspace = ? ' +
80 'and schema_set = ? ' +
82 def sqlCountWithConditions = sqlCount + sqlWhereClause
83 def sqlSelectWithConditions = sqlSelect + sqlWhereClause
85 def 'Processing a valid event'() {
86 given: "no event has been proceeded"
87 def initialRecordsCount =
88 jdbcTemplate.queryForObject(sqlCountWithConditions, Integer.class,
89 aTimestamp, aDataspace, aSchemaSet, anAnchor)
90 assert (initialRecordsCount == 0)
91 when: 'an event is produced'
93 EventFixtures.buildEvent(
94 timestamp: aTimestamp, dataspace: aDataspace, schemaSet: aSchemaSet, anchor: anAnchor)
95 this.kafkaTemplate.send(topic, event)
96 then: 'the event is proceeded'
97 def pollingCondition = new PollingConditions(timeout: 10, initialDelay: 1, factor: 2)
98 pollingCondition.eventually {
99 def finalRecordsCount =
100 jdbcTemplate.queryForObject(
101 sqlCountWithConditions, Integer.class, aTimestamp, aDataspace, aSchemaSet, anAnchor)
102 assert (finalRecordsCount == 1)
104 Map<String, Object> result =
105 jdbcTemplate.queryForMap(sqlSelectWithConditions, aTimestamp, aDataspace, aSchemaSet, anAnchor)
106 log.debug("Data retrieved from db: {}", result)
109 def 'Processing an invalid event'() {
110 given: 'the number of network data records if known'
111 def initialRecordsCount = jdbcTemplate.queryForObject(sqlCount, Integer.class)
112 when: 'an invalid event is produced'
113 this.kafkaTemplate.send(topic, (CpsDataUpdatedEvent) null)
114 then: 'the event is not proceeded and no more network data record is created'
115 TimeUnit.SECONDS.sleep(3)
116 assert (jdbcTemplate.queryForObject(sqlCount, Integer.class) == initialRecordsCount)
119 @DynamicPropertySource
120 static void registerKafkaProperties(DynamicPropertyRegistry registry) {
121 registry.add("spring.kafka.bootstrap-servers", kafkaTestContainer::getBootstrapServers)