public static final String PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX = ".fetchTimeout";
public static final String PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX = ".fetchLimit";
public static final String PROPERTY_MANAGED_SUFFIX = ".managed";
+ public static final String PROPERTY_ADDITIONAL_PROPS_SUFFIX = ".additionalProps";
public static final String PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX = ".partitionKey";
* ONAP
* ================================================================================
* Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.common.endpoints.utils;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.re2j.Pattern;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class KafkaPropertyUtils {
* Makes a topic builder, configuring it with properties that are common to both
* sources and sinks.
*
- * @param props properties to be used to configure the builder
- * @param topic topic being configured
+ * @param props properties to be used to configure the builder
+ * @param topic topic being configured
* @param servers target servers
* @return a topic builder
*/
public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
-
return BusTopicParams.builder()
- .servers(serverList)
- .topic(topic)
- .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX,
- topic))
- .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true));
+ .servers(serverList)
+ .topic(topic)
+ .effectiveTopic(props.getString(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic))
+ .managed(props.getBoolean(PROPERTY_MANAGED_SUFFIX, true))
+ .additionalProps(getAdditionalProps(props.getString(PROPERTY_ADDITIONAL_PROPS_SUFFIX, "")));
+ }
+
+ private static Map<String, String> getAdditionalProps(String additionalPropsString) {
+ try {
+ Map<String, String> additionalProps = new HashMap<>();
+ var converted = new ObjectMapper().readValue(additionalPropsString, Map.class);
+ converted.forEach((k, v) -> {
+ if (k instanceof String key && v instanceof String value) {
+ additionalProps.put(key, value);
+ }
+ });
+ return additionalProps;
+ } catch (Exception e) {
+ return Collections.emptyMap();
+ }
+
}
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
-import java.util.Arrays;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import lombok.Getter;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
+@Getter
public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
public static final String SERVER = "localhost:9092";
public static final String TOPIC2 = "my-topic-2";
+ public static final String ADDITIONAL_PROPS = "{\"security.protocol\": \"SASL_PLAINTEXT\","
+ + "\"sasl.mechanism\": \"SCRAM-SHA-512\",\"sasl.jaas.config\": "
+ + "\"org.apache.kafka.common.security.plain.PlainLoginModule "
+ + "required username=abc password=abc serviceName=kafka;\"}";
- @Getter
- private TopicParameters params = new TopicParameters();
+ private final TopicParameters params = new TopicParameters();
/**
* Constructs the object.
setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
+ setTopicProperty(".additionalProps", ADDITIONAL_PROPS);
params.setTopicCommInfrastructure("kafka");
params.setTopic(topic);
params.setManaged(true);
params.setUseHttps(true);
params.setPartitionId(MY_PARTITION);
- params.setServers(Arrays.asList(SERVER));
+ params.setServers(List.of(SERVER));
+ params.setAdditionalProps(getAdditionalProps());
return this;
}
+
+ private Map<String, String> getAdditionalProps() {
+ try {
+ return new ObjectMapper().readValue(ADDITIONAL_PROPS, Map.class);
+ } catch (JsonProcessingException e) {
+ return Collections.emptyMap();
+ }
+ }
}
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
assertEquals(MY_TOPIC, params.getTopic());
assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
assertEquals(MY_PARTITION, params.getPartitionId());
+ assertNotNull(params.getAdditionalProps());
List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
.removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
}
+ @Test
+ void testBuildFromProperties() {
+ Properties props = makePropBuilder().makeTopic(MY_TOPIC).build();
+ var listTopic = factory.build(props);
+ assertNotNull(listTopic);
+ }
+
@Test
@Override
void testDestroyString_testGet_testInventory() {
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.utils;
+
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX;
+
+import java.util.Properties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class KafkaPropertyUtilsTest {
+
+ @Test
+ void test() {
+ var properties = new Properties();
+ properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, "{444-");
+ PropertyUtils props = new PropertyUtils(properties, "mytopic", null);
+
+ var build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+ Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+
+ properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX,
+ "{\"security.protocol\": \"SASL_PLAINTEXT\"}");
+ build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+ Assertions.assertTrue(build.getAdditionalProps().containsKey("security.protocol"));
+
+ properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX,
+ "{\"security.protocol\": false }");
+ build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+ Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+
+ properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, "");
+ build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+ Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+ }
+
+}
\ No newline at end of file