2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.api.impl.config.kafka;
23 import io.cloudevents.CloudEvent;
25 import lombok.RequiredArgsConstructor;
26 import org.apache.kafka.clients.producer.ProducerConfig;
27 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
28 import org.springframework.context.annotation.Bean;
29 import org.springframework.context.annotation.Configuration;
30 import org.springframework.context.annotation.Primary;
31 import org.springframework.kafka.annotation.EnableKafka;
32 import org.springframework.kafka.core.ConsumerFactory;
33 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
34 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
35 import org.springframework.kafka.core.KafkaTemplate;
36 import org.springframework.kafka.core.ProducerFactory;
37 import org.springframework.kafka.support.serializer.JsonDeserializer;
38 import org.springframework.kafka.support.serializer.JsonSerializer;
41 * kafka Configuration for legacy and cloud events.
43 * @param <T> valid legacy event to be published over the wire.
47 @RequiredArgsConstructor
48 public class KafkaTemplateConfig<T> {
50 private final KafkaProperties kafkaProperties;
53 * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
54 * application.yml and replaces value-serializer by JsonSerializer.
56 * @return legacy event producer instance.
59 public ProducerFactory<String, T> legacyEventProducerFactory() {
60 final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
61 producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
62 return new DefaultKafkaProducerFactory<>(producerConfigProperties);
66 * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
67 * into application.yml and replaces deserializer-value by JsonDeserializer.
69 * @return an instance of legacy consumer factory.
72 public ConsumerFactory<String, T> legacyEventConsumerFactory() {
73 final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
74 consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
75 return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
79 * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
80 * application.yml with CloudEventSerializer.
82 * @return cloud event producer instance.
85 public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
86 final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
87 return new DefaultKafkaProducerFactory<>(producerConfigProperties);
91 * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
92 * into application.yml having CloudEventDeserializer as deserializer-value.
94 * @return an instance of cloud consumer factory.
97 public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
98 final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
99 return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
103 * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
105 * @return an instance of legacy Kafka template.
109 public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
110 final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
111 kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
112 return kafkaTemplate;
116 * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
118 * @return an instance of cloud Kafka template.
121 public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
122 final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
123 kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
124 return kafkaTemplate;