Merge "Sending Data Updated Event to kafka"
[cps.git] / cps-service / src / main / java / org / onap / cps / notification / KafkaProducerListener.java
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2021 Bell Canada. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  *
16  *  SPDX-License-Identifier: Apache-2.0
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.onap.cps.notification;
21
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.kafka.clients.producer.ProducerRecord;
24 import org.apache.kafka.clients.producer.RecordMetadata;
25 import org.springframework.kafka.support.ProducerListener;
26 import org.springframework.stereotype.Component;
27
28 @Slf4j
29 @Component
30 public class KafkaProducerListener<K, V> implements ProducerListener<K, V> {
31
32     private NotificationErrorHandler notificationErrorHandler;
33
34     public KafkaProducerListener(final NotificationErrorHandler notificationErrorHandler) {
35         this.notificationErrorHandler = notificationErrorHandler;
36     }
37
38     @Override
39     public void onSuccess(final ProducerRecord<K, V> producerRecord, final RecordMetadata recordMetadata) {
40         log.debug("Message sent to event-bus topic :'{}' with body : {} ", producerRecord.topic(),
41             producerRecord.value());
42     }
43
44     @Override
45     public void onError(final ProducerRecord<K, V> producerRecord,
46             final RecordMetadata recordMetadata,
47             final Exception exception) {
48         notificationErrorHandler.onException("Failed to send message to message bus",
49             exception, producerRecord, recordMetadata);
50     }
51
52 }