2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp.datajobs.subscription
23 import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import io.cloudevents.core.builder.CloudEventBuilder
28 import io.cloudevents.kafka.CloudEventSerializer
29 import io.cloudevents.kafka.CloudEventDeserializer
30 import java.nio.charset.StandardCharsets
31 import java.time.Duration
32 import org.apache.kafka.clients.producer.ProducerRecord
33 import org.apache.kafka.common.serialization.StringSerializer
34 import org.onap.cps.integration.base.CpsIntegrationSpecBase
35 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpInEventConsumer
36 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
37 import org.slf4j.LoggerFactory
38 import org.springframework.beans.factory.annotation.Autowired
39 import org.springframework.beans.factory.annotation.Value
41 class CmSubscriptionSpec extends CpsIntegrationSpecBase {
44 CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService
46 @Value('${app.ncmp.avc.cm-subscription-ncmp-in}')
49 @Value('${app.ncmp.avc.cm-subscription-dmi-out}')
52 @Value('${app.ncmp.avc.cm-subscription-dmi-in}')
56 def testRequestProducer
57 def testResponseProducer
59 def listAppender = new ListAppender<ILoggingEvent>()
63 registerCmHandlesForSubscriptions()
64 kafkaTestContainer.start()
65 dmiInConsumer = kafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
66 dmiInConsumer.subscribe([dmiInTopic])
67 dmiInConsumer.poll(Duration.ofMillis(500))
68 testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
69 testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class)
70 logger = LoggerFactory.getLogger(NcmpInEventConsumer)
72 logger.addAppender(listAppender)
76 dmiInConsumer.unsubscribe()
78 testRequestProducer.close()
79 testResponseProducer.close()
80 kafkaTestContainer.close()
81 deregisterCmHandles('dmi-0', ['cmHandle0'])
82 deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2'])
83 deregisterCmHandles('dmi-2', ['cmHandle3', 'cmHandle4'])
86 def 'Create subscription and send to multiple DMIs'() {
87 given: 'a data node selector on DMI-1'
88 def dmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
89 and: 'a data node selector on DMI-2'
90 def dmi2DataNodeSelector = '/parent[id=\\\"3\\\"]/child'
91 and: 'an event payload'
92 def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector)
93 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId', eventDataNodeSelector)
94 when: 'a subscription create request is sent'
95 sendSubscriptionCreateRequest(subscriptionTopic, 'key', eventPayload)
96 then: 'log shows event is consumed by ncmp'
97 def messages = listAppender.list*.formattedMessage
98 messages.any { msg -> msg.contains('myDataJobId') && msg.contains('dataJobCreated')}
99 and: 'the 3 different data node selectors for the given data job id is persisted'
100 assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId').size() == 3
101 and: 'get correlation ids from event sent to DMIs'
102 def correlationIds = getAllConsumedCorrelationIds()
103 and: 'there is correlation IDs (event) for each affected dmi (DMI-1, DMI-2)'
104 assert correlationIds.size() == 2
105 assert correlationIds.containsAll(['myDataJobId#dmi-1', 'myDataJobId#dmi-2'])
108 def 'Update subscription status'() {
109 given: 'a persisted subscription'
110 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n')
111 sendSubscriptionCreateRequest(subscriptionTopic, 'newDataJob', eventPayload)
112 when: 'dmi accepts the subscription create request'
113 sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0')
114 then: 'there are no more inactive data node selector for given datajob id'
115 assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('newDataJob').size() == 0
116 and: 'status for the data node selector for given data job id is ACCEPTED'
117 def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
118 '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY)
119 assert affectedDataNodes.leaves.every( entry -> entry.get('status') == 'ACCEPTED')
122 def 'Create new subscription which partially overlaps with an existing active subscription'() {
123 given: 'an existing data node selector on DMI-1'
124 def existingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n'''
125 and: 'a new data node selector on DMI-2'
126 def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]'
127 and: 'an event payload'
128 def eventDataNodeSelector = (existingDmi1DataNodeSelector + newDmi2DataNodeSelector)
129 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJob', eventDataNodeSelector)
130 and: 'an active subscription in database'
131 createAndAcceptSubscriptionA()
132 when: 'a new subscription create request is sent'
133 sendSubscriptionCreateRequest(subscriptionTopic, 'partialOverlappingDataJob', eventPayload)
134 then: 'log shows event is consumed by ncmp'
135 def messages = listAppender.list*.formattedMessage
136 messages.any { msg -> msg.contains('partialOverlappingDataJob') && msg.contains('dataJobCreated')}
137 and: 'the 3 data node selectors for the given data job id is persisted'
138 assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
139 '//subscription/dataJobId[text()=\'partialOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 3
140 and: 'only one data node selector is not active'
141 assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJob').size() == 1
142 and: 'get correlation ids from event sent to DMIs'
143 def correlationIds = getAllConsumedCorrelationIds()
144 and: 'there is correlation IDs (event) for only the affected dmi (DMI-2)'
145 assert !correlationIds.contains('partialOverlappingDataJob#dmi-1')
146 assert correlationIds.contains('partialOverlappingDataJob#dmi-2')
149 def 'Create new subscription which completely overlaps with an active existing subscriptions'() {
150 given: 'a new data node selector'
151 def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
152 and: 'an event payload'
153 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJob', dataNodeSelector)
154 and: 'existing active subscriptions in database'
155 createAndAcceptSubscriptionA()
156 createAndAcceptSubscriptionB()
157 when: 'a new subscription create request is sent'
158 sendSubscriptionCreateRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload)
159 then: 'log shows event is consumed by ncmp'
160 def messages = listAppender.list*.formattedMessage
161 messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated')}
162 and: 'the 2 data node selectors for the given data job id is persisted'
163 assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
164 '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
165 and: 'there are no inactive data node selector'
166 assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0
167 and: 'get correlation ids from event sent to DMIs'
168 def correlationIds = getAllConsumedCorrelationIds()
169 and: 'there is no correlation IDs (event) for any dmi'
170 assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') }
173 def registerCmHandlesForSubscriptions() {
174 registerCmHandle('dmi-0', 'cmHandle0', '','/parent=0')
175 registerCmHandle('dmi-1', 'cmHandle1', '','/parent=1')
176 registerCmHandle('dmi-1', 'cmHandle2', '','/parent=2')
177 registerCmHandle('dmi-2', 'cmHandle3', '','/parent=3')
178 registerCmHandle('dmi-2', 'cmHandle4', '','/parent=4')
181 def createSubscriptionEventPayload(eventType, dataJobId, dataNodeSelector) {
182 def eventPayload = readResourceDataFile('datajobs/subscription/createSubscriptionEvent.json')
183 eventPayload = eventPayload.replace('#eventType', eventType)
184 eventPayload = eventPayload.replace('#dataJobId', dataJobId)
185 eventPayload = eventPayload.replace('#dataNodeSelector', dataNodeSelector)
189 def createAndAcceptSubscriptionA() {
190 def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child'''
191 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector)
192 sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobA', eventPayload)
193 sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1')
194 sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2')
197 def createAndAcceptSubscriptionB() {
198 def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]'''
199 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector)
200 sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobB', eventPayload)
201 sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2')
204 def sendSubscriptionCreateRequest(topic, eventKey, eventPayload) {
205 def event = new ProducerRecord<>(topic, eventKey, eventPayload);
206 testRequestProducer.send(event)
210 def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) {
211 def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json')
212 eventPayload = eventPayload.replace('#statusCode', statusCode)
213 eventPayload = eventPayload.replace('#statusMessage', statusMessage)
214 def cloudEvent = CloudEventBuilder.v1()
215 .withData(eventPayload.getBytes(StandardCharsets.UTF_8))
216 .withId('random-uuid')
218 .withSource(URI.create(eventSource))
219 .withExtension('correlationid', correlationId).build()
220 def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent);
221 testResponseProducer.send(event)
225 def getAllConsumedCorrelationIds() {
226 def consumedEvents = dmiInConsumer.poll(Duration.ofMillis(1000))
227 def headersMap = getAllHeaders(consumedEvents)
228 return headersMap.get('ce_correlationid')
231 def getAllHeaders(consumedEvents) {
232 def headersMap = [:].withDefault { [] }
233 consumedEvents.each { event ->
234 event.headers().each { header ->
235 def key = header.key()
236 def value = new String(header.value())
237 headersMap[key] << value