2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2022-2023 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22 import org.onap.policy.common.endpoints.event.comm.Topic;
23 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * This implementation publishes events for the associated KAFKA topic, inline with the calling
31 public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink {
36 private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
38 protected Map<String, String> additionalProps;
41 * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below
44 * <p>servers list of KAFKA servers available for publishing
45 * topic the topic to publish to
46 * partitionId the partition key (optional, autogenerated if not provided)
47 * useHttps does connection use HTTPS?
48 * @param busTopicParams contains attributes needed
49 * @throws IllegalArgumentException if invalid arguments are detected
51 public InlineKafkaTopicSink(BusTopicParams busTopicParams) {
52 super(busTopicParams);
53 this.additionalProps = busTopicParams.getAdditionalProps();
57 * Instantiation of internal resources.
62 this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder()
63 .servers(this.servers)
64 .topic(this.effectiveTopic)
65 .useHttps(this.useHttps)
66 .allowTracing(this.allowTracing)
67 .additionalProps(this.additionalProps)
69 logger.info("{}: KAFKA SINK created", this);
73 public String toString() {
74 return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
75 + super.toString() + "]";
79 public CommInfrastructure getTopicCommInfrastructure() {
80 return Topic.CommInfrastructure.KAFKA;