2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2022, 2024 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.common.endpoints.event.comm.bus;
21 import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC;
22 import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION;
23 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX;
24 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
25 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
26 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
27 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
29 import com.fasterxml.jackson.core.JsonProcessingException;
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import java.util.Collections;
32 import java.util.List;
35 import org.onap.policy.common.endpoints.parameters.TopicParameters;
38 public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
40 public static final String SERVER = "localhost:9092";
41 public static final String TOPIC2 = "my-topic-2";
42 public static final String ADDITIONAL_PROPS = "{\"security.protocol\": \"SASL_PLAINTEXT\","
43 + "\"sasl.mechanism\": \"SCRAM-SHA-512\",\"sasl.jaas.config\": "
44 + "\"org.apache.kafka.common.security.plain.PlainLoginModule "
45 + "required username=abc password=abc serviceName=kafka;\"}";
47 private final TopicParameters params = new TopicParameters();
50 * Constructs the object.
52 * @param prefix the prefix for the properties to be built
54 public KafkaTopicPropertyBuilder(String prefix) {
59 * Adds a topic and configures it's properties with default values.
61 * @param topic the topic to be added
62 * @return this builder
64 public KafkaTopicPropertyBuilder makeTopic(String topic) {
67 setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC);
68 setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true");
69 setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
70 setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
71 setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
72 setTopicProperty(".additionalProps", ADDITIONAL_PROPS);
74 params.setTopicCommInfrastructure("kafka");
75 params.setTopic(topic);
76 params.setEffectiveTopic(MY_EFFECTIVE_TOPIC);
77 params.setManaged(true);
78 params.setUseHttps(true);
79 params.setPartitionId(MY_PARTITION);
80 params.setServers(List.of(SERVER));
81 params.setAdditionalProps(getAdditionalProps());
86 private Map<String, String> getAdditionalProps() {
88 return new ObjectMapper().readValue(ADDITIONAL_PROPS, Map.class);
89 } catch (JsonProcessingException e) {
90 return Collections.emptyMap();