dmaap.source.topics.PDPD-CONFIGURATION.consumerInstance=${envd:PDPD_CONFIGURATION_CONSUMER_INSTANCE}
dmaap.source.topics.PDPD-CONFIGURATION.managed=false
dmaap.source.topics.PDPD-CONFIGURATION.https=${envd:DMAAP_HTTPS:true}
+
+#Replace the properties with the following to use kafka message broker
+#kafka.source.topics=pdpd-configuration
+#kafka.source.topics.fetchTimeout=15000
+#kafka.source.topics.pdpd-configuration.servers=${envd:DMAAP_SERVERS}
+#kafka.source.topics.pdpd-configuration.effectiveTopic=${envd:PDPD_CONFIGURATION_TOPIC}
+#kafka.source.topics.pdpd-configuration.apiKey=${envd:PDPD_CONFIGURATION_API_KEY}
+#kafka.source.topics.pdpd-configuration.apiSecret=${envd:PDPD_CONFIGURATION_API_SECRET}
+#kafka.source.topics.pdpd-configuration.consumerGroup=${envd:PDPD_CONFIGURATION_CONSUMER_GROUP}
+#kafka.source.topics.pdpd-configuration.consumerInstance=${envd:PDPD_CONFIGURATION_CONSUMER_INSTANCE}
+#kafka.source.topics.pdpd-configuration.managed=false
+#kafka.source.topics.pdpd-configuration.https=${envd:DMAAP_HTTPS:true}
dmaap.sink.topics.POLICY-PDP-PAP.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
dmaap.sink.topics.POLICY-PDP-PAP.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
dmaap.sink.topics.POLICY-PDP-PAP.https=${envd:DMAAP_HTTPS:true}
+
+#Replace the properties with the following to use kafka message broker
+#kafka.source.topics=policy-pdp-pap
+#kafka.source.topics.fetchTimeout=15000
+#kafka.sink.topics=policy-pdp-pap
+#kafka.source.topics.policy-pdp-pap.servers=${envd:DMAAP_SERVERS}
+#kafka.source.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
+#kafka.source.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
+#kafka.source.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
+#kafka.source.topics.policy-pdp-pap.https=${envd:DMAAP_HTTPS:true}
+
+#kafka.sink.topics.policy-pdp-pap.servers=${envd:DMAAP_SERVERS}
+#kafka.sink.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
+#kafka.sink.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
+#kafka.sink.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
+#kafka.sink.topics.policy-pdp-pap.https=${envd:DMAAP_HTTPS:true}
} else {
return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
}
+ } else if (commInfra == CommInfrastructure.KAFKA) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + ".";
+ }
} else {
throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
}
case NOOP:
sources.addAll(TopicEndpointManager.getManager().getNoopTopicSources());
break;
+ case KAFKA:
+ sources.addAll(TopicEndpointManager.getManager().getKafkaTopicSources());
+ break;
default:
status = Status.BAD_REQUEST;
logger.debug("Invalid communication mechanism");
case NOOP:
sinks.addAll(TopicEndpointManager.getManager().getNoopTopicSinks());
break;
+ case KAFKA:
+ sinks.addAll(TopicEndpointManager.getManager().getKafkaTopicSinks());
+ break;
default:
status = Status.BAD_REQUEST;
logger.debug("Invalid communication mechanism");