f215c6dc09cbdf9f53574f0c14c4a8f7843fef81
[cps.git] / cps-service / src / test / groovy / org / onap / cps / notification / NotificationPublisherSpec.groovy
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2021 Bell Canada. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  *
16  *  SPDX-License-Identifier: Apache-2.0
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.onap.cps.notification
21
22 import org.apache.kafka.clients.producer.ProducerRecord
23 import org.apache.kafka.clients.producer.RecordMetadata
24 import org.onap.cps.event.model.Content
25 import org.onap.cps.event.model.CpsDataUpdatedEvent
26 import org.spockframework.spring.SpringBean
27 import org.springframework.kafka.KafkaException
28 import org.springframework.kafka.core.KafkaTemplate
29 import spock.util.concurrent.PollingConditions
30
31 class NotificationPublisherSpec extends KafkaPublisherSpecBase {
32
33     @SpringBean
34     NotificationErrorHandler spyNotificationErrorHandler = Spy(new NotificationErrorHandler())
35
36     @SpringBean
37     KafkaProducerListener spyKafkaProducerListener = Spy(new KafkaProducerListener<>(spyNotificationErrorHandler))
38
39     KafkaTemplate spyKafkaTemplate
40     NotificationPublisher objectUnderTest
41
42     def myAnchorName = 'my-anchor'
43     def myDataspaceName = 'my-dataspace'
44
45     def cpsDataUpdatedEvent = new CpsDataUpdatedEvent()
46             .withContent(new Content()
47                     .withDataspaceName(myDataspaceName)
48                     .withAnchorName(myAnchorName))
49
50     def setup() {
51         spyKafkaTemplate = Spy(kafkaTemplate)
52         objectUnderTest = new NotificationPublisher(spyKafkaTemplate, cpsEventTopic);
53     }
54
55     def 'Sending event to message bus with correct message Key.'() {
56
57         when: 'event is sent to publisher'
58             objectUnderTest.sendNotification(cpsDataUpdatedEvent)
59             kafkaTemplate.flush()
60
61         then: 'event is sent to correct topic with the expected messageKey'
62             interaction {
63                 def messageKey = myDataspaceName + "," + myAnchorName
64                 1 * spyKafkaTemplate.send(cpsEventTopic, messageKey, cpsDataUpdatedEvent)
65             }
66         and: 'received a successful response'
67             1 * spyKafkaProducerListener.onSuccess(_ as ProducerRecord, _)
68         and: 'kafka consumer returns expected message'
69             def conditions = new PollingConditions(timeout: 60, initialDelay: 0, factor: 1)
70             conditions.eventually {
71                 assert cpsDataUpdatedEvent == consumedMessages.get(0)
72             }
73     }
74
75     def 'Handling of async errors from message bus.'() {
76         given: 'topic does not exist'
77             objectUnderTest.topicName = 'non-existing-topic'
78
79         when: 'message to sent to a non-existing topic'
80             objectUnderTest.sendNotification(cpsDataUpdatedEvent)
81             kafkaTemplate.flush()
82
83         then: 'error is thrown'
84             thrown KafkaException
85         and: 'error handler is called with exception details'
86             1 * spyKafkaProducerListener.onError(_ as ProducerRecord, _, _ as Exception)
87             1 * spyNotificationErrorHandler.onException(_ as String, _ as Exception,
88                     _ as ProducerRecord, _)
89     }
90
91 }