d122c0e66a46c00dd58fc0c4479fc960cb6b5f22
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.functional.ncmp.datajobs.subscription
22
23 import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
24
25 import ch.qos.logback.classic.Logger
26 import ch.qos.logback.classic.spi.ILoggingEvent
27 import ch.qos.logback.core.read.ListAppender
28 import io.cloudevents.core.builder.CloudEventBuilder
29 import io.cloudevents.kafka.CloudEventSerializer
30 import java.nio.charset.StandardCharsets
31 import java.time.Duration
32 import org.apache.kafka.clients.producer.ProducerRecord
33 import org.apache.kafka.common.serialization.StringSerializer
34 import org.onap.cps.integration.base.CpsIntegrationSpecBase
35 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpInEventConsumer
36 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
37 import org.slf4j.LoggerFactory
38 import org.springframework.beans.factory.annotation.Autowired
39 import org.springframework.beans.factory.annotation.Value
40
41 class CmSubscriptionSpec extends CpsIntegrationSpecBase {
42
43     @Autowired
44     CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService
45
46     @Value('${app.ncmp.avc.cm-subscription-ncmp-in}')
47     def subscriptionTopic
48
49     @Value('${app.ncmp.avc.cm-subscription-dmi-out}')
50     def dmiOutTopic
51
52     @Value('${app.ncmp.avc.cm-subscription-dmi-in}')
53     def dmiInTopic
54
55     def dmiInConsumer
56     def testRequestProducer
57     def testResponseProducer
58
59     def listAppender = new ListAppender<ILoggingEvent>()
60     def logger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer)
61
62     def setup() {
63         registerCmHandlesForSubscriptions()
64         kafkaTestContainer.start()
65         dmiInConsumer = kafkaTestContainer.getCloudEventConsumer('test-group')
66         dmiInConsumer.subscribe([dmiInTopic])
67         dmiInConsumer.poll(Duration.ofMillis(500))
68         testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
69         testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class)
70         listAppender.start()
71         logger.addAppender(listAppender)
72     }
73
74     def cleanup() {
75         logger.detachAndStopAllAppenders()
76         dmiInConsumer.unsubscribe()
77         dmiInConsumer.close()
78         testRequestProducer.close()
79         testResponseProducer.close()
80         kafkaTestContainer.close()
81         deregisterCmHandles('dmi-0', ['cmHandle0'])
82         deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2', 'cmHandle5'])
83         deregisterCmHandles('dmi-2', ['cmHandle3', 'cmHandle4'])
84     }
85
86     def 'Create subscription and send to multiple DMIs'() {
87         given: 'data node selector with two paths on DMI-1'
88             def dmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
89         and: 'data node selector with one path on DMI-2'
90             def dmi2DataNodeSelector = '/parent[id=\\\"3\\\"]/child'
91         and: 'an event payload'
92             def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector)
93             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId01', eventDataNodeSelector)
94         when: 'a subscription create request is sent'
95             sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01')
96         then: 'log shows event is consumed by ncmp'
97             def messages = listAppender.list*.formattedMessage
98             messages.any { msg -> msg.contains('myDataJobId01') && msg.contains('dataJobCreated') }
99         and: 'the 3 different data node selectors for the given data job id is persisted'
100             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId01').size() == 3
101         and: 'get correlation ids from event sent to DMIs'
102             def correlationIds = getAllConsumedCorrelationIds()
103         and: 'there is correlation IDs (event) for each affected dmi (DMI-1, DMI-2)'
104             assert correlationIds.size() == 2
105             assert correlationIds.containsAll(['myDataJobId01#dmi-1', 'myDataJobId01#dmi-2'])
106     }
107
108
109     def 'Create subscription accepted by DMI.'() {
110         given: 'a persisted subscription'
111             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n')
112             sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob')
113         when: 'dmi accepts the subscription create request'
114             sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0')
115         then: 'there are no more inactive data node selector for given datajob id'
116             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('newDataJob').size() == 0
117         and: 'status for the data node selector for given data job id is ACCEPTED'
118             def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
119                     '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY)
120             assert affectedDataNodes.leaves.every(entry -> entry.get('status') == 'ACCEPTED')
121     }
122
123     def 'Create new subscription which partially overlaps with an existing active subscription'() {
124         given: 'an active subscription in database'
125             createAndAcceptSubscriptionA()
126         and: 'and a partial overlapping subscription'
127             def overlappingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n'''
128             def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]'
129             def eventDataNodeSelector = (overlappingDmi1DataNodeSelector + newDmi2DataNodeSelector)
130             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJobId', eventDataNodeSelector)
131         when: 'create request event for overlapping subscription is sent'
132             sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId')
133         then: 'log shows event is consumed by ncmp'
134             def messages = listAppender.list*.formattedMessage
135             messages.any { msg -> msg.contains('partialOverlappingDataJobId') && msg.contains('dataJobCreated') }
136         and: 'the 3 data node selectors for the given data job id is persisted'
137             assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
138                 '//subscription/dataJobId[text()=\'partialOverlappingDataJobId\']', DIRECT_CHILDREN_ONLY).size() == 3
139         and: 'only one data node selector is not active'
140             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJobId').size() == 1
141         and: 'get correlation ids from event sent to DMIs'
142             def correlationIds = getAllConsumedCorrelationIds()
143         and: 'there is correlation IDs (event) for only the affected dmi (DMI-2)'
144             assert !correlationIds.contains('partialOverlappingDataJobId#dmi-1')
145             assert correlationIds.contains('partialOverlappingDataJobId#dmi-2')
146     }
147
148     def 'Create new subscription which completely overlaps with an active existing subscriptions'() {
149         given: 'a new data node selector'
150             def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
151         and: 'an event payload'
152             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJob', dataNodeSelector)
153         and: 'existing active subscriptions in database'
154             createAndAcceptSubscriptionA()
155             createAndAcceptSubscriptionB()
156         when: 'a new subscription create request is sent'
157             sendSubscriptionRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload, 'myDataJobId')
158         then: 'log shows event is consumed by ncmp'
159             def messages = listAppender.list*.formattedMessage
160             messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated') }
161         and: 'the 2 data node selectors for the given data job id is persisted'
162             assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
163                     '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
164         and: 'there are no inactive data node selector'
165             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0
166         and: 'get correlation ids from event sent to DMIs'
167             def correlationIds = getAllConsumedCorrelationIds()
168         and: 'there is no correlation IDs (event) for any dmi'
169             assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') }
170     }
171
172     def 'Delete subscription removes last subscriber.'() {
173         given: 'an existing subscription with only one data node selector'
174             def dataNodeSelector = '/parent[id=\\\"5\\\"]'
175         and: 'a subscription created'
176             def createEventPayload = createSubscriptionEventPayload('dataJobCreated', 'lastDataJobId', dataNodeSelector)
177             sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId')
178         and: 'data nodes is persisted '
179             def dataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
180                     "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
181             assert  dataNodes.size() == 1
182             assert dataNodes.iterator().next().leaves.dataNodeSelector == '/parent[id="5"]'
183         when: 'a delete event is received for the subscription'
184             def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'lastDataJobId', dataNodeSelector)
185             sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId')
186         then: 'the subscription is fully removed from persistence'
187             def remainingDataNodeSelector = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
188                     "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
189             assert remainingDataNodeSelector.isEmpty()
190         and: 'no other subscriptions exist for the same dataJobId'
191             def remainingDataJobId = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
192                     "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
193             assert remainingDataJobId.isEmpty()
194         and: 'a DMI delete event is published for the affected DMI'
195             def correlationIds = getAllConsumedCorrelationIds()
196             assert correlationIds.contains('lastDataJobId#dmi-1')
197     }
198
199     def 'Delete subscription removes one of multiple subscribers.'() {
200         given: 'data node selector that is used by other subscriptions'
201             def dataNodeSelector = '/parent[id=\\\"1\\\"]'
202             def existingSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
203                     "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId
204             assert !existingSubscription.isEmpty()
205         and: 'a new subscription'
206             def createEventPayload1 = createSubscriptionEventPayload('dataJobCreated', 'id-to-remove', dataNodeSelector)
207             sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove')
208         when: 'a delete event is received'
209             def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'id-to-remove', dataNodeSelector)
210             sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove')
211         then: 'the data job id does not exist in database'
212             def resultForDeletedSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
213                     "//subscription/dataJobId[text()='id-to-remove']", DIRECT_CHILDREN_ONLY)
214             assert resultForDeletedSubscription.isEmpty()
215         and: 'subscription still exist for the same data node selector'
216             def remainingSubscriptions = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
217                     "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId
218             assert !remainingSubscriptions.isEmpty()
219             assert !remainingSubscriptions.contains('id-to-remove')
220         and: 'no DMI delete event is published'
221             def correlationIds = getAllConsumedCorrelationIds()
222             assert !correlationIds.contains(['id-to-remove#dmi-1'])
223     }
224
225     def 'Deleting non-existent subscription.'() {
226         given: 'an event payload'
227             def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'nonExistingDataJobId', '/nonExisting')
228         when: 'a delete event is received for the non-existent subscription'
229             sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'myDataJobId')
230         then: 'no exception is thrown'
231             noExceptionThrown()
232         and: 'nothing is sent to DMI'
233             getAllConsumedCorrelationIds().isEmpty()
234     }
235
236     def registerCmHandlesForSubscriptions() {
237         registerCmHandle('dmi-0', 'cmHandle0', '', '/parent=0')
238         registerCmHandle('dmi-1', 'cmHandle1', '', '/parent=1')
239         registerCmHandle('dmi-1', 'cmHandle2', '', '/parent=2')
240         registerCmHandle('dmi-2', 'cmHandle3', '', '/parent=3')
241         registerCmHandle('dmi-2', 'cmHandle4', '', '/parent=4')
242         registerCmHandle('dmi-1', 'cmHandle5', '', '/parent=5')
243     }
244
245     def createSubscriptionEventPayload(eventType, dataJobId, dataNodeSelector) {
246         def eventPayload = readResourceDataFile('datajobs/subscription/createSubscriptionEvent.json')
247         eventPayload = eventPayload.replace('#eventType', eventType)
248         eventPayload = eventPayload.replace('#dataJobId', dataJobId)
249         eventPayload = eventPayload.replace('#dataNodeSelector', dataNodeSelector)
250         return eventPayload
251     }
252
253     def createAndAcceptSubscriptionA() {
254         def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child'''
255         def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector)
256         sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA')
257         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1')
258         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2')
259     }
260
261     def createAndAcceptSubscriptionB() {
262         def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]'''
263         def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector)
264         sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB')
265         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2')
266     }
267
268     def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId) {
269         def event = new ProducerRecord<>(topic, eventKey, eventPayload)
270         testRequestProducer.send(event)
271         //TODO Add polling within for log to report dataJobId is finished (separate commit)
272         sleep(2000)
273     }
274
275     def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) {
276         def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json')
277         eventPayload = eventPayload.replace('#statusCode', statusCode)
278         eventPayload = eventPayload.replace('#statusMessage', statusMessage)
279         def cloudEvent = CloudEventBuilder.v1()
280                 .withData(eventPayload.getBytes(StandardCharsets.UTF_8))
281                 .withId('random-uuid')
282                 .withType(eventType)
283                 .withSource(URI.create(eventSource))
284                 .withExtension('correlationid', correlationId).build()
285         def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent)
286         testResponseProducer.send(event)
287         sleep(2000)
288     }
289
290     def getAllConsumedCorrelationIds() {
291         def consumedEvents = getLatestConsumerRecordsWithMaxPollOf1Second(dmiInConsumer, 1)
292         def headersMap = getAllHeaders(consumedEvents)
293         return headersMap.get('ce_correlationid')
294     }
295
296     def getAllHeaders(consumedEvents) {
297         def headersMap = [:].withDefault { [] }
298         consumedEvents.each { event ->
299             event.headers().each { header ->
300                 def key = header.key()
301                 def value = new String(header.value())
302                 headersMap[key] << value
303             }
304         }
305         return headersMap
306     }
307 }