Read additionalProps from .properties files used in drools 70/138870/1
authoradheli.tavares <adheli.tavares@est.tech>
Mon, 26 Aug 2024 13:12:46 +0000 (14:12 +0100)
committerAdheli Tavares <adheli.tavares@est.tech>
Fri, 30 Aug 2024 08:29:41 +0000 (08:29 +0000)
Issue-ID: POLICY-5120
Change-Id: I2cb5cae3e6456f32a72e4b2bda0819b3eb77653e
Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java [new file with mode: 0644]

index 4295a0a..5d36a31 100644 (file)
@@ -43,6 +43,7 @@ public final class PolicyEndPointProperties {
     public static final String PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX = ".fetchTimeout";
     public static final String PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX = ".fetchLimit";
     public static final String PROPERTY_MANAGED_SUFFIX = ".managed";
+    public static final String PROPERTY_ADDITIONAL_PROPS_SUFFIX = ".additionalProps";
 
     public static final String PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX = ".partitionKey";
 
index 03e2076..2e137ce 100644 (file)
@@ -3,7 +3,7 @@
  * ONAP
  * ================================================================================
  * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.common.endpoints.utils;
 
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.re2j.Pattern;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class KafkaPropertyUtils {
@@ -39,20 +46,35 @@ public class KafkaPropertyUtils {
      * Makes a topic builder, configuring it with properties that are common to both
      * sources and sinks.
      *
-     * @param props properties to be used to configure the builder
-     * @param topic topic being configured
+     * @param props   properties to be used to configure the builder
+     * @param topic   topic being configured
      * @param servers target servers
      * @return a topic builder
      */
     public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
 
         final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
-
         return BusTopicParams.builder()
-                    .servers(serverList)
-                    .topic(topic)
-                    .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX,
-                                    topic))
-                    .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true));
+            .servers(serverList)
+            .topic(topic)
+            .effectiveTopic(props.getString(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic))
+            .managed(props.getBoolean(PROPERTY_MANAGED_SUFFIX, true))
+            .additionalProps(getAdditionalProps(props.getString(PROPERTY_ADDITIONAL_PROPS_SUFFIX, "")));
+    }
+
+    private static Map<String, String> getAdditionalProps(String additionalPropsString) {
+        try {
+            Map<String, String> additionalProps = new HashMap<>();
+            var converted = new ObjectMapper().readValue(additionalPropsString, Map.class);
+            converted.forEach((k, v) -> {
+                if (k instanceof String key && v instanceof String value) {
+                    additionalProps.put(key, value);
+                }
+            });
+            return additionalProps;
+        } catch (Exception e) {
+            return Collections.emptyMap();
+        }
+
     }
 }
index a00879c..b49f58e 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022, 2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,17 +26,25 @@ import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperti
 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
 
-import java.util.Arrays;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import lombok.Getter;
 import org.onap.policy.common.endpoints.parameters.TopicParameters;
 
+@Getter
 public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
 
     public static final String SERVER = "localhost:9092";
     public static final String TOPIC2 = "my-topic-2";
+    public static final String ADDITIONAL_PROPS = "{\"security.protocol\": \"SASL_PLAINTEXT\","
+        + "\"sasl.mechanism\": \"SCRAM-SHA-512\",\"sasl.jaas.config\": "
+        + "\"org.apache.kafka.common.security.plain.PlainLoginModule "
+        + "required username=abc password=abc serviceName=kafka;\"}";
 
-    @Getter
-    private TopicParameters params = new TopicParameters();
+    private final TopicParameters params = new TopicParameters();
 
     /**
      * Constructs the object.
@@ -61,6 +69,7 @@ public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
         setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
         setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
         setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
+        setTopicProperty(".additionalProps", ADDITIONAL_PROPS);
 
         params.setTopicCommInfrastructure("kafka");
         params.setTopic(topic);
@@ -68,8 +77,17 @@ public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
         params.setManaged(true);
         params.setUseHttps(true);
         params.setPartitionId(MY_PARTITION);
-        params.setServers(Arrays.asList(SERVER));
+        params.setServers(List.of(SERVER));
+        params.setAdditionalProps(getAdditionalProps());
 
         return this;
     }
+
+    private Map<String, String> getAdditionalProps() {
+        try {
+            return new ObjectMapper().readValue(ADDITIONAL_PROPS, Map.class);
+        } catch (JsonProcessingException e) {
+            return Collections.emptyMap();
+        }
+    }
 }
index 6aed6cd..5ff6782 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.policy.common.endpoints.event.comm.bus;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
@@ -88,6 +89,7 @@ class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink
         assertEquals(MY_TOPIC, params.getTopic());
         assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
         assertEquals(MY_PARTITION, params.getPartitionId());
+        assertNotNull(params.getAdditionalProps());
 
         List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
             .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
@@ -100,6 +102,13 @@ class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink
         assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
     }
 
+    @Test
+    void testBuildFromProperties() {
+        Properties props = makePropBuilder().makeTopic(MY_TOPIC).build();
+        var listTopic = factory.build(props);
+        assertNotNull(listTopic);
+    }
+
     @Test
     @Override
     void testDestroyString_testGet_testInventory() {
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java
new file mode 100644 (file)
index 0000000..52caa47
--- /dev/null
@@ -0,0 +1,55 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.utils;
+
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX;
+
+import java.util.Properties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class KafkaPropertyUtilsTest {
+
+    @Test
+    void test() {
+        var properties = new Properties();
+        properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, "{444-");
+        PropertyUtils props = new PropertyUtils(properties, "mytopic", null);
+
+        var build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+        Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+
+        properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX,
+            "{\"security.protocol\": \"SASL_PLAINTEXT\"}");
+        build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+        Assertions.assertTrue(build.getAdditionalProps().containsKey("security.protocol"));
+
+        properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX,
+            "{\"security.protocol\": false }");
+        build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+        Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+
+        properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, "");
+        build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+        Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+    }
+  
+}
\ No newline at end of file