b49f58e2eca0dd04cef2bb857340f22a4249aeb0
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2022, 2024 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.common.endpoints.event.comm.bus;
20
21 import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC;
22 import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION;
23 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX;
24 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
25 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
26 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
27 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
28
29 import com.fasterxml.jackson.core.JsonProcessingException;
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Map;
34 import lombok.Getter;
35 import org.onap.policy.common.endpoints.parameters.TopicParameters;
36
37 @Getter
38 public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
39
40     public static final String SERVER = "localhost:9092";
41     public static final String TOPIC2 = "my-topic-2";
42     public static final String ADDITIONAL_PROPS = "{\"security.protocol\": \"SASL_PLAINTEXT\","
43         + "\"sasl.mechanism\": \"SCRAM-SHA-512\",\"sasl.jaas.config\": "
44         + "\"org.apache.kafka.common.security.plain.PlainLoginModule "
45         + "required username=abc password=abc serviceName=kafka;\"}";
46
47     private final TopicParameters params = new TopicParameters();
48
49     /**
50      * Constructs the object.
51      *
52      * @param prefix the prefix for the properties to be built
53      */
54     public KafkaTopicPropertyBuilder(String prefix) {
55         super(prefix);
56     }
57
58     /**
59      * Adds a topic and configures it's properties with default values.
60      *
61      * @param topic the topic to be added
62      * @return this builder
63      */
64     public KafkaTopicPropertyBuilder makeTopic(String topic) {
65         addTopic(topic);
66
67         setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC);
68         setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true");
69         setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
70         setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
71         setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
72         setTopicProperty(".additionalProps", ADDITIONAL_PROPS);
73
74         params.setTopicCommInfrastructure("kafka");
75         params.setTopic(topic);
76         params.setEffectiveTopic(MY_EFFECTIVE_TOPIC);
77         params.setManaged(true);
78         params.setUseHttps(true);
79         params.setPartitionId(MY_PARTITION);
80         params.setServers(List.of(SERVER));
81         params.setAdditionalProps(getAdditionalProps());
82
83         return this;
84     }
85
86     private Map<String, String> getAdditionalProps() {
87         try {
88             return new ObjectMapper().readValue(ADDITIONAL_PROPS, Map.class);
89         } catch (JsonProcessingException e) {
90             return Collections.emptyMap();
91         }
92     }
93 }