2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp.datajobs.subscription
23 import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
25 import ch.qos.logback.classic.Logger
26 import ch.qos.logback.classic.spi.ILoggingEvent
27 import ch.qos.logback.core.read.ListAppender
28 import io.cloudevents.core.builder.CloudEventBuilder
29 import io.cloudevents.kafka.CloudEventSerializer
30 import java.nio.charset.StandardCharsets
31 import java.time.Duration
32 import org.apache.kafka.clients.producer.ProducerRecord
33 import org.apache.kafka.common.serialization.StringSerializer
34 import org.onap.cps.integration.base.CpsIntegrationSpecBase
35 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpInEventConsumer
36 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
37 import org.slf4j.LoggerFactory
38 import org.springframework.beans.factory.annotation.Autowired
39 import org.springframework.beans.factory.annotation.Value
41 class CmSubscriptionSpec extends CpsIntegrationSpecBase {
44 CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService
46 @Value('${app.ncmp.avc.cm-subscription-ncmp-in}')
49 @Value('${app.ncmp.avc.cm-subscription-dmi-out}')
52 @Value('${app.ncmp.avc.cm-subscription-dmi-in}')
56 def testRequestProducer
57 def testResponseProducer
59 def listAppender = new ListAppender<ILoggingEvent>()
60 def logger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer)
63 registerCmHandlesForSubscriptions()
64 kafkaTestContainer.start()
65 dmiInConsumer = kafkaTestContainer.getCloudEventConsumer('test-group')
66 dmiInConsumer.subscribe([dmiInTopic])
67 dmiInConsumer.poll(Duration.ofMillis(500))
68 testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
69 testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class)
71 logger.addAppender(listAppender)
75 logger.detachAndStopAllAppenders()
76 dmiInConsumer.unsubscribe()
78 testRequestProducer.close()
79 testResponseProducer.close()
80 kafkaTestContainer.close()
81 deregisterCmHandles('dmi-0', ['cmHandle0'])
82 deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2', 'cmHandle5'])
83 deregisterCmHandles('dmi-2', ['cmHandle3', 'cmHandle4'])
86 def 'Create subscription and send to multiple DMIs'() {
87 given: 'data node selector with two paths on DMI-1'
88 def dmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
89 and: 'data node selector with one path on DMI-2'
90 def dmi2DataNodeSelector = '/parent[id=\\\"3\\\"]/child'
91 and: 'an event payload'
92 def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector)
93 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId01', eventDataNodeSelector)
94 when: 'a subscription create request is sent'
95 sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01')
96 then: 'log shows event is consumed by ncmp'
97 def messages = listAppender.list*.formattedMessage
98 messages.any { msg -> msg.contains('myDataJobId01') && msg.contains('dataJobCreated') }
99 and: 'the 3 different data node selectors for the given data job id is persisted'
100 assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId01').size() == 3
101 and: 'get correlation ids from event sent to DMIs'
102 def correlationIds = getAllConsumedCorrelationIds()
103 and: 'there is correlation IDs (event) for each affected dmi (DMI-1, DMI-2)'
104 assert correlationIds.size() == 2
105 assert correlationIds.containsAll(['myDataJobId01#dmi-1', 'myDataJobId01#dmi-2'])
109 def 'Create subscription accepted by DMI.'() {
110 given: 'a persisted subscription'
111 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n')
112 sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob')
113 when: 'dmi accepts the subscription create request'
114 sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0')
115 then: 'there are no more inactive data node selector for given datajob id'
116 assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('newDataJob').size() == 0
117 and: 'status for the data node selector for given data job id is ACCEPTED'
118 def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
119 '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY)
120 assert affectedDataNodes.leaves.every(entry -> entry.get('status') == 'ACCEPTED')
123 def 'Create new subscription which partially overlaps with an existing active subscription'() {
124 given: 'an active subscription in database'
125 createAndAcceptSubscriptionA()
126 and: 'and a partial overlapping subscription'
127 def overlappingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n'''
128 def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]'
129 def eventDataNodeSelector = (overlappingDmi1DataNodeSelector + newDmi2DataNodeSelector)
130 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJobId', eventDataNodeSelector)
131 when: 'create request event for overlapping subscription is sent'
132 sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId')
133 then: 'log shows event is consumed by ncmp'
134 def messages = listAppender.list*.formattedMessage
135 messages.any { msg -> msg.contains('partialOverlappingDataJobId') && msg.contains('dataJobCreated') }
136 and: 'the 3 data node selectors for the given data job id is persisted'
137 assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
138 '//subscription/dataJobId[text()=\'partialOverlappingDataJobId\']', DIRECT_CHILDREN_ONLY).size() == 3
139 and: 'only one data node selector is not active'
140 assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJobId').size() == 1
141 and: 'get correlation ids from event sent to DMIs'
142 def correlationIds = getAllConsumedCorrelationIds()
143 and: 'there is correlation IDs (event) for only the affected dmi (DMI-2)'
144 assert !correlationIds.contains('partialOverlappingDataJobId#dmi-1')
145 assert correlationIds.contains('partialOverlappingDataJobId#dmi-2')
148 def 'Create new subscription which completely overlaps with an active existing subscriptions'() {
149 given: 'a new data node selector'
150 def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
151 and: 'an event payload'
152 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJob', dataNodeSelector)
153 and: 'existing active subscriptions in database'
154 createAndAcceptSubscriptionA()
155 createAndAcceptSubscriptionB()
156 when: 'a new subscription create request is sent'
157 sendSubscriptionRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload, 'myDataJobId')
158 then: 'log shows event is consumed by ncmp'
159 def messages = listAppender.list*.formattedMessage
160 messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated') }
161 and: 'the 2 data node selectors for the given data job id is persisted'
162 assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
163 '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
164 and: 'there are no inactive data node selector'
165 assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0
166 and: 'get correlation ids from event sent to DMIs'
167 def correlationIds = getAllConsumedCorrelationIds()
168 and: 'there is no correlation IDs (event) for any dmi'
169 assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') }
172 def 'Delete subscription removes last subscriber.'() {
173 given: 'an existing subscription with only one data node selector'
174 def dataNodeSelector = '/parent[id=\\\"5\\\"]'
175 and: 'a subscription created'
176 def createEventPayload = createSubscriptionEventPayload('dataJobCreated', 'lastDataJobId', dataNodeSelector)
177 sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId')
178 and: 'data nodes is persisted '
179 def dataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
180 "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
181 assert dataNodes.size() == 1
182 assert dataNodes.iterator().next().leaves.dataNodeSelector == '/parent[id="5"]'
183 when: 'a delete event is received for the subscription'
184 def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'lastDataJobId', dataNodeSelector)
185 sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId')
186 then: 'the subscription is fully removed from persistence'
187 def remainingDataNodeSelector = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
188 "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
189 assert remainingDataNodeSelector.isEmpty()
190 and: 'no other subscriptions exist for the same dataJobId'
191 def remainingDataJobId = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
192 "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
193 assert remainingDataJobId.isEmpty()
194 and: 'a DMI delete event is published for the affected DMI'
195 def correlationIds = getAllConsumedCorrelationIds()
196 assert correlationIds.contains('lastDataJobId#dmi-1')
199 def 'Delete subscription removes one of multiple subscribers.'() {
200 given: 'data node selector that is used by other subscriptions'
201 def dataNodeSelector = '/parent[id=\\\"1\\\"]'
202 def existingSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
203 "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId
204 assert !existingSubscription.isEmpty()
205 and: 'a new subscription'
206 def createEventPayload1 = createSubscriptionEventPayload('dataJobCreated', 'id-to-remove', dataNodeSelector)
207 sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove')
208 when: 'a delete event is received'
209 def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'id-to-remove', dataNodeSelector)
210 sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove')
211 then: 'the data job id does not exist in database'
212 def resultForDeletedSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
213 "//subscription/dataJobId[text()='id-to-remove']", DIRECT_CHILDREN_ONLY)
214 assert resultForDeletedSubscription.isEmpty()
215 and: 'subscription still exist for the same data node selector'
216 def remainingSubscriptions = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
217 "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId
218 assert !remainingSubscriptions.isEmpty()
219 assert !remainingSubscriptions.contains('id-to-remove')
220 and: 'no DMI delete event is published'
221 def correlationIds = getAllConsumedCorrelationIds()
222 assert !correlationIds.contains(['id-to-remove#dmi-1'])
225 def 'Deleting non-existent subscription.'() {
226 given: 'an event payload'
227 def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'nonExistingDataJobId', '/nonExisting')
228 when: 'a delete event is received for the non-existent subscription'
229 sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'myDataJobId')
230 then: 'no exception is thrown'
232 and: 'nothing is sent to DMI'
233 getAllConsumedCorrelationIds().isEmpty()
236 def registerCmHandlesForSubscriptions() {
237 registerCmHandle('dmi-0', 'cmHandle0', '', '/parent=0')
238 registerCmHandle('dmi-1', 'cmHandle1', '', '/parent=1')
239 registerCmHandle('dmi-1', 'cmHandle2', '', '/parent=2')
240 registerCmHandle('dmi-2', 'cmHandle3', '', '/parent=3')
241 registerCmHandle('dmi-2', 'cmHandle4', '', '/parent=4')
242 registerCmHandle('dmi-1', 'cmHandle5', '', '/parent=5')
245 def createSubscriptionEventPayload(eventType, dataJobId, dataNodeSelector) {
246 def eventPayload = readResourceDataFile('datajobs/subscription/createSubscriptionEvent.json')
247 eventPayload = eventPayload.replace('#eventType', eventType)
248 eventPayload = eventPayload.replace('#dataJobId', dataJobId)
249 eventPayload = eventPayload.replace('#dataNodeSelector', dataNodeSelector)
253 def createAndAcceptSubscriptionA() {
254 def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child'''
255 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector)
256 sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA')
257 sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1')
258 sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2')
261 def createAndAcceptSubscriptionB() {
262 def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]'''
263 def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector)
264 sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB')
265 sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2')
268 def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId) {
269 def event = new ProducerRecord<>(topic, eventKey, eventPayload)
270 testRequestProducer.send(event)
271 //TODO Add polling within for log to report dataJobId is finished (separate commit)
275 def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) {
276 def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json')
277 eventPayload = eventPayload.replace('#statusCode', statusCode)
278 eventPayload = eventPayload.replace('#statusMessage', statusMessage)
279 def cloudEvent = CloudEventBuilder.v1()
280 .withData(eventPayload.getBytes(StandardCharsets.UTF_8))
281 .withId('random-uuid')
283 .withSource(URI.create(eventSource))
284 .withExtension('correlationid', correlationId).build()
285 def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent)
286 testResponseProducer.send(event)
290 def getAllConsumedCorrelationIds() {
291 def consumedEvents = getLatestConsumerRecordsWithMaxPollOf1Second(dmiInConsumer, 1)
292 def headersMap = getAllHeaders(consumedEvents)
293 return headersMap.get('ce_correlationid')
296 def getAllHeaders(consumedEvents) {
297 def headersMap = [:].withDefault { [] }
298 consumedEvents.each { event ->
299 event.headers().each { header ->
300 def key = header.key()
301 def value = new String(header.value())
302 headersMap[key] << value