6cd9ae1b20b97b8bafed8d4487646c7c8874dcde
[cps.git] / cps-service / src / test / groovy / org / onap / cps / notification / NotificationPublisherSpec.groovy
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2021 Bell Canada. All rights reserved.
4  *  Modifications Copyright (C) 2021-2022 Nordix Foundation
5  * ================================================================================
6  *  Licensed under the Apache License, Version 2.0 (the "License");
7  *  you may not use this file except in compliance with the License.
8  *  You may obtain a copy of the License at
9  *
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.notification
22
23 import org.apache.kafka.clients.producer.ProducerRecord
24 import org.onap.cps.event.model.Content
25 import org.onap.cps.event.model.CpsDataUpdatedEvent
26 import org.spockframework.spring.SpringBean
27 import org.springframework.kafka.KafkaException
28 import org.springframework.kafka.core.KafkaTemplate
29 import spock.util.concurrent.PollingConditions
30
31 class NotificationPublisherSpec extends KafkaPublisherSpecBase {
32
33     @SpringBean
34     NotificationErrorHandler spyNotificationErrorHandler = Spy(new NotificationErrorHandler())
35
36     @SpringBean
37     KafkaProducerListener spyKafkaProducerListener = Spy(new KafkaProducerListener<>(spyNotificationErrorHandler))
38
39     KafkaTemplate spyKafkaTemplate
40     NotificationPublisher objectUnderTest
41
42     def myAnchorName = 'my-anchor'
43     def myDataspaceName = 'my-dataspace'
44
45     def cpsDataUpdatedEvent = new CpsDataUpdatedEvent()
46             .withContent(new Content()
47                     .withDataspaceName(myDataspaceName)
48                     .withAnchorName(myAnchorName))
49
50     def setup() {
51         spyKafkaTemplate = Spy(kafkaTemplate)
52         objectUnderTest = new NotificationPublisher(spyKafkaTemplate, cpsEventTopic);
53     }
54
55     def 'Sending event to message bus with correct message Key.'() {
56
57         when: 'event is sent to publisher'
58             objectUnderTest.sendNotification(cpsDataUpdatedEvent)
59             kafkaTemplate.flush()
60
61         then: 'event is sent to correct topic with the expected messageKey'
62             interaction {
63                 def messageKey = myDataspaceName + "," + myAnchorName
64                 1 * spyKafkaTemplate.send(cpsEventTopic, messageKey, cpsDataUpdatedEvent)
65             }
66         and: 'received a successful response'
67             1 * spyKafkaProducerListener.onSuccess(_ as ProducerRecord, _)
68         and: 'kafka consumer returns expected message'
69             def conditions = new PollingConditions(timeout: 60, initialDelay: 0, factor: 1)
70             conditions.eventually {
71                 assert cpsDataUpdatedEvent == consumedMessages.get(0)
72             }
73     }
74
75     def 'Handling of async errors from message bus.'() {
76         given: 'topic does not exist'
77             objectUnderTest.topicName = 'non-existing-topic'
78
79         when: 'message to sent to a non-existing topic'
80             objectUnderTest.sendNotification(cpsDataUpdatedEvent)
81             kafkaTemplate.flush()
82
83         then: 'error is thrown'
84             thrown KafkaException
85         and: 'error handler is called with exception details'
86             1 * spyKafkaProducerListener.onError(_ as ProducerRecord, _, _ as Exception)
87             1 * spyNotificationErrorHandler.onException(_ as String, _ as Exception,
88                     _ as ProducerRecord, _)
89     }
90
91 }