5a13cb2b3dce6d41a39d2d4d0b3d86ea077d4936
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.functional.ncmp.datajobs.subscription
22
23 import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
24
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import io.cloudevents.core.builder.CloudEventBuilder
28 import io.cloudevents.kafka.CloudEventSerializer
29 import io.cloudevents.kafka.CloudEventDeserializer
30 import java.nio.charset.StandardCharsets
31 import java.time.Duration
32 import org.apache.kafka.clients.producer.ProducerRecord
33 import org.apache.kafka.common.serialization.StringSerializer
34 import org.onap.cps.integration.base.CpsIntegrationSpecBase
35 import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpInEventConsumer
36 import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
37 import org.slf4j.LoggerFactory
38 import org.springframework.beans.factory.annotation.Autowired
39 import org.springframework.beans.factory.annotation.Value
40
41 class CmSubscriptionSpec extends CpsIntegrationSpecBase {
42
43     @Autowired
44     CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService
45
46     @Value('${app.ncmp.avc.cm-subscription-ncmp-in}')
47     def subscriptionTopic
48
49     @Value('${app.ncmp.avc.cm-subscription-dmi-out}')
50     def dmiOutTopic
51
52     @Value('${app.ncmp.avc.cm-subscription-dmi-in}')
53     def dmiInTopic
54
55     def dmiInConsumer
56     def testRequestProducer
57     def testResponseProducer
58
59     def listAppender = new ListAppender<ILoggingEvent>()
60     def logger
61
62     def setup() {
63         registerCmHandlesForSubscriptions()
64         kafkaTestContainer.start()
65         dmiInConsumer = kafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
66         dmiInConsumer.subscribe([dmiInTopic])
67         dmiInConsumer.poll(Duration.ofMillis(500))
68         testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
69         testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class)
70         logger = LoggerFactory.getLogger(NcmpInEventConsumer)
71         listAppender.start()
72         logger.addAppender(listAppender)
73     }
74
75     def cleanup() {
76         dmiInConsumer.unsubscribe()
77         dmiInConsumer.close()
78         testRequestProducer.close()
79         testResponseProducer.close()
80         kafkaTestContainer.close()
81         deregisterCmHandles('dmi-0', ['cmHandle0'])
82         deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2'])
83         deregisterCmHandles('dmi-2', ['cmHandle3', 'cmHandle4'])
84     }
85
86     def 'Create subscription and send to multiple DMIs'() {
87         given: 'a data node selector on DMI-1'
88             def dmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
89         and: 'a data node selector on DMI-2'
90             def dmi2DataNodeSelector = '/parent[id=\\\"3\\\"]/child'
91         and: 'an event payload'
92             def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector)
93             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId', eventDataNodeSelector)
94         when: 'a subscription create request is sent'
95             sendSubscriptionCreateRequest(subscriptionTopic, 'key', eventPayload)
96         then: 'log shows event is consumed by ncmp'
97             def messages = listAppender.list*.formattedMessage
98             messages.any { msg -> msg.contains('myDataJobId') && msg.contains('dataJobCreated')}
99         and: 'the 3 different data node selectors for the given data job id is persisted'
100             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId').size() == 3
101         and: 'get correlation ids from event sent to DMIs'
102             def correlationIds = getAllConsumedCorrelationIds()
103         and: 'there is correlation IDs (event) for each affected dmi (DMI-1, DMI-2)'
104             assert correlationIds.size() == 2
105             assert correlationIds.containsAll(['myDataJobId#dmi-1', 'myDataJobId#dmi-2'])
106     }
107
108     def 'Update subscription status'() {
109         given: 'a persisted subscription'
110             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n')
111             sendSubscriptionCreateRequest(subscriptionTopic, 'newDataJob', eventPayload)
112         when: 'dmi accepts the subscription create request'
113             sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0')
114         then: 'there are no more inactive data node selector for given datajob id'
115             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('newDataJob').size() == 0
116         and: 'status for the data node selector for given data job id is ACCEPTED'
117             def affectedDataNodes =  cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
118                 '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY)
119             assert affectedDataNodes.leaves.every( entry -> entry.get('status') == 'ACCEPTED')
120     }
121
122     def 'Create new subscription which partially overlaps with an existing active subscription'() {
123         given: 'an existing data node selector on DMI-1'
124             def existingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n'''
125         and: 'a new data node selector on DMI-2'
126             def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]'
127         and: 'an event payload'
128             def eventDataNodeSelector = (existingDmi1DataNodeSelector + newDmi2DataNodeSelector)
129             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJob', eventDataNodeSelector)
130         and: 'an active subscription in database'
131             createAndAcceptSubscriptionA()
132         when: 'a new subscription create request is sent'
133             sendSubscriptionCreateRequest(subscriptionTopic, 'partialOverlappingDataJob', eventPayload)
134         then: 'log shows event is consumed by ncmp'
135             def messages = listAppender.list*.formattedMessage
136             messages.any { msg -> msg.contains('partialOverlappingDataJob') && msg.contains('dataJobCreated')}
137         and: 'the 3 data node selectors for the given data job id is persisted'
138             assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
139                 '//subscription/dataJobId[text()=\'partialOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 3
140         and: 'only one data node selector is not active'
141             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJob').size() == 1
142         and: 'get correlation ids from event sent to DMIs'
143             def correlationIds = getAllConsumedCorrelationIds()
144         and: 'there is correlation IDs (event) for only the affected dmi (DMI-2)'
145             assert !correlationIds.contains('partialOverlappingDataJob#dmi-1')
146             assert correlationIds.contains('partialOverlappingDataJob#dmi-2')
147     }
148
149     def 'Create new subscription which completely overlaps with an active existing subscriptions'() {
150         given: 'a new data node selector'
151             def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
152         and: 'an event payload'
153             def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJob', dataNodeSelector)
154         and: 'existing active subscriptions in database'
155             createAndAcceptSubscriptionA()
156             createAndAcceptSubscriptionB()
157         when: 'a new subscription create request is sent'
158             sendSubscriptionCreateRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload)
159         then: 'log shows event is consumed by ncmp'
160             def messages = listAppender.list*.formattedMessage
161             messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated')}
162         and: 'the 2 data node selectors for the given data job id is persisted'
163             assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
164                 '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
165         and: 'there are no inactive data node selector'
166             assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0
167         and: 'get correlation ids from event sent to DMIs'
168             def correlationIds = getAllConsumedCorrelationIds()
169         and: 'there is no correlation IDs (event) for any dmi'
170             assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') }
171     }
172
173     def registerCmHandlesForSubscriptions() {
174         registerCmHandle('dmi-0', 'cmHandle0', '','/parent=0')
175         registerCmHandle('dmi-1', 'cmHandle1', '','/parent=1')
176         registerCmHandle('dmi-1', 'cmHandle2', '','/parent=2')
177         registerCmHandle('dmi-2', 'cmHandle3', '','/parent=3')
178         registerCmHandle('dmi-2', 'cmHandle4', '','/parent=4')
179     }
180
181     def createSubscriptionEventPayload(eventType, dataJobId, dataNodeSelector) {
182         def eventPayload = readResourceDataFile('datajobs/subscription/createSubscriptionEvent.json')
183         eventPayload = eventPayload.replace('#eventType', eventType)
184         eventPayload = eventPayload.replace('#dataJobId', dataJobId)
185         eventPayload = eventPayload.replace('#dataNodeSelector', dataNodeSelector)
186         return eventPayload
187     }
188
189     def createAndAcceptSubscriptionA() {
190         def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child'''
191         def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector)
192         sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobA', eventPayload)
193         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1')
194         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2')
195     }
196
197     def createAndAcceptSubscriptionB() {
198         def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]'''
199         def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector)
200         sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobB', eventPayload)
201         sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2')
202     }
203
204     def sendSubscriptionCreateRequest(topic, eventKey, eventPayload) {
205         def event = new ProducerRecord<>(topic, eventKey, eventPayload);
206         testRequestProducer.send(event)
207         sleep(1000)
208     }
209
210     def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) {
211         def eventPayload =  readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json')
212         eventPayload = eventPayload.replace('#statusCode', statusCode)
213         eventPayload = eventPayload.replace('#statusMessage', statusMessage)
214         def cloudEvent = CloudEventBuilder.v1()
215             .withData(eventPayload.getBytes(StandardCharsets.UTF_8))
216             .withId('random-uuid')
217             .withType(eventType)
218             .withSource(URI.create(eventSource))
219             .withExtension('correlationid', correlationId).build()
220         def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent);
221         testResponseProducer.send(event)
222         sleep(2000)
223     }
224
225     def getAllConsumedCorrelationIds() {
226         def consumedEvents = dmiInConsumer.poll(Duration.ofMillis(1000))
227         def headersMap = getAllHeaders(consumedEvents)
228         return headersMap.get('ce_correlationid')
229     }
230
231     def getAllHeaders(consumedEvents) {
232         def headersMap = [:].withDefault { [] }
233         consumedEvents.each { event ->
234             event.headers().each { header ->
235                 def key = header.key()
236                 def value = new String(header.value())
237                 headersMap[key] << value
238             }
239
240         }
241         return headersMap
242     }
243 }