Add kafka support on drools-pdp 43/136843/1
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>
Mon, 11 Dec 2023 18:50:14 +0000 (18:50 +0000)
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>
Thu, 14 Dec 2023 11:19:18 +0000 (11:19 +0000)
Issue-ID: POLICY-4201
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Change-Id: I7f9ebec13cc41c214a400087f99e62bbc895abdd

feature-legacy-config/src/main/feature/config/feature-legacy-config.properties
feature-lifecycle/src/main/feature/config/feature-lifecycle.properties
policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java

index af5bd15..15a35b8 100644 (file)
@@ -26,3 +26,15 @@ dmaap.source.topics.PDPD-CONFIGURATION.consumerGroup=${envd:PDPD_CONFIGURATION_C
 dmaap.source.topics.PDPD-CONFIGURATION.consumerInstance=${envd:PDPD_CONFIGURATION_CONSUMER_INSTANCE}
 dmaap.source.topics.PDPD-CONFIGURATION.managed=false
 dmaap.source.topics.PDPD-CONFIGURATION.https=${envd:DMAAP_HTTPS:true}
+
+#Replace the properties with the following to use kafka message broker
+#kafka.source.topics=pdpd-configuration
+#kafka.source.topics.fetchTimeout=15000
+#kafka.source.topics.pdpd-configuration.servers=${envd:DMAAP_SERVERS}
+#kafka.source.topics.pdpd-configuration.effectiveTopic=${envd:PDPD_CONFIGURATION_TOPIC}
+#kafka.source.topics.pdpd-configuration.apiKey=${envd:PDPD_CONFIGURATION_API_KEY}
+#kafka.source.topics.pdpd-configuration.apiSecret=${envd:PDPD_CONFIGURATION_API_SECRET}
+#kafka.source.topics.pdpd-configuration.consumerGroup=${envd:PDPD_CONFIGURATION_CONSUMER_GROUP}
+#kafka.source.topics.pdpd-configuration.consumerInstance=${envd:PDPD_CONFIGURATION_CONSUMER_INSTANCE}
+#kafka.source.topics.pdpd-configuration.managed=false
+#kafka.source.topics.pdpd-configuration.https=${envd:DMAAP_HTTPS:true}
index d79c9e5..2779954 100644 (file)
@@ -36,3 +36,19 @@ dmaap.sink.topics.POLICY-PDP-PAP.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
 dmaap.sink.topics.POLICY-PDP-PAP.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
 dmaap.sink.topics.POLICY-PDP-PAP.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
 dmaap.sink.topics.POLICY-PDP-PAP.https=${envd:DMAAP_HTTPS:true}
+
+#Replace the properties with the following to use kafka message broker
+#kafka.source.topics=policy-pdp-pap
+#kafka.source.topics.fetchTimeout=15000
+#kafka.sink.topics=policy-pdp-pap
+#kafka.source.topics.policy-pdp-pap.servers=${envd:DMAAP_SERVERS}
+#kafka.source.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
+#kafka.source.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
+#kafka.source.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
+#kafka.source.topics.policy-pdp-pap.https=${envd:DMAAP_HTTPS:true}
+
+#kafka.sink.topics.policy-pdp-pap.servers=${envd:DMAAP_SERVERS}
+#kafka.sink.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
+#kafka.sink.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
+#kafka.sink.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
+#kafka.sink.topics.policy-pdp-pap.https=${envd:DMAAP_HTTPS:true}
index 5491eac..0c73224 100644 (file)
@@ -271,6 +271,12 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
             } else {
                 return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
             }
+        } else if (commInfra == CommInfrastructure.KAFKA) {
+            if (isSource) {
+                return PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + ".";
+            } else {
+                return PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + ".";
+            }
         } else {
             throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
         }
index b5cf319..ad23ff3 100644 (file)
@@ -1414,6 +1414,9 @@ public class RestManager implements SwaggerApi, DefaultApi, FeaturesApi, InputsA
             case NOOP:
                 sources.addAll(TopicEndpointManager.getManager().getNoopTopicSources());
                 break;
+            case KAFKA:
+                sources.addAll(TopicEndpointManager.getManager().getKafkaTopicSources());
+                break;
             default:
                 status = Status.BAD_REQUEST;
                 logger.debug("Invalid communication mechanism");
@@ -1449,6 +1452,9 @@ public class RestManager implements SwaggerApi, DefaultApi, FeaturesApi, InputsA
             case NOOP:
                 sinks.addAll(TopicEndpointManager.getManager().getNoopTopicSinks());
                 break;
+            case KAFKA:
+                sinks.addAll(TopicEndpointManager.getManager().getKafkaTopicSinks());
+                break;
             default:
                 status = Status.BAD_REQUEST;
                 logger.debug("Invalid communication mechanism");