ONAP
================================================================================
Copyright (C) 2017-2022 AT&T Intellectual Property. All rights reserved.
- Modifications Copyright (C) 2023 Nordix Foundation.
+ Modifications Copyright (C) 2023-2024 Nordix Foundation.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
# limitations under the License.
# ============LICENSE_END=========================================================
-kafka.source.topics=pdpd-configuration
+kafka.source.topics=${envd:PDPD_CONFIGURATION_TOPIC}
kafka.source.topics.fetchTimeout=15000
kafka.source.topics.pdpd-configuration.servers=${envd:KAFKA_SERVERS}
kafka.source.topics.pdpd-configuration.effectiveTopic=${envd:PDPD_CONFIGURATION_TOPIC}
kafka.source.topics.pdpd-configuration.consumerGroup=${envd:PDPD_CONFIGURATION_CONSUMER_GROUP}
kafka.source.topics.pdpd-configuration.consumerInstance=${envd:PDPD_CONFIGURATION_CONSUMER_INSTANCE}
kafka.source.topics.pdpd-configuration.managed=false
-kafka.source.topics.pdpd-configuration.https=${envd:KAFKA_HTTPS:true}
+kafka.source.topics.pdpd-configuration.https=${envd:KAFKA_HTTPS:false}
# Mandatory policy types that this PDP-D must support at a minimum
lifecycle.pdp.policytypes=${envd:POLICY_PDP_PAP_POLICYTYPES}
-kafka.source.topics=POLICY-PDP-PAP
-kafka.sink.topics=POLICY-PDP-PAP
+kafka.source.topics=${envd:POLICY_PDP_PAP_TOPIC}
+kafka.sink.topics=${envd:POLICY_PDP_PAP_TOPIC}
-kafka.source.topics.POLICY-PDP-PAP.servers=${envd:KAFKA_SERVERS}
-kafka.source.topics.POLICY-PDP-PAP.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
-kafka.source.topics.POLICY-PDP-PAP.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
-kafka.source.topics.POLICY-PDP-PAP.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
-kafka.source.topics.POLICY-PDP-PAP.https=${envd:KAFKA_HTTPS:true}
+kafka.source.topics.policy-pdp-pap.servers=${envd:KAFKA_SERVERS}
+kafka.source.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
+kafka.source.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
+kafka.source.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
+kafka.source.topics.policy-pdp-pap.https=${envd:KAFKA_HTTPS:false}
-kafka.sink.topics.POLICY-PDP-PAP.servers=${envd:KAFKA_SERVERS}
-kafka.sink.topics.POLICY-PDP-PAP.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
-kafka.sink.topics.POLICY-PDP-PAP.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
-kafka.sink.topics.POLICY-PDP-PAP.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
-kafka.sink.topics.POLICY-PDP-PAP.https=${envd:KAFKA_HTTPS:true}
+kafka.sink.topics.policy-pdp-pap.servers=${envd:KAFKA_SERVERS}
+kafka.sink.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
+kafka.sink.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
+kafka.sink.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
+kafka.sink.topics.policy-pdp-pap.https=${envd:KAFKA_HTTPS:false}
# be enabled at the same time.
pooling.usecases.enabled=true
-pooling.usecases.topic=${env:POOLING_TOPIC}
+pooling.usecases.topic=${envd:POOLING_TOPIC}
# the list of sources and sinks should be identical
-kafka.source.topics=POOLING_TOPIC
-kafka.sink.topics=POOLING_TOPIC
-
-kafka.source.topics.POOLING_TOPIC.servers=${env:KAFKA_SERVERS}
-kafka.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-kafka.source.topics.POOLING_TOPIC.apiKey=
-kafka.source.topics.POOLING_TOPIC.apiSecret=
-
-kafka.sink.topics.POOLING_TOPIC.servers=${env:kafka_SERVERS}
-kafka.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-kafka.sink.topics.POOLING_TOPIC.apiKey=
-kafka.sink.topics.POOLING_TOPIC.apiSecret=
+kafka.source.topics=${envd:POOLING_TOPIC}
+kafka.sink.topics=${envd:POOLING_TOPIC}
+
+kafka.source.topics.policy-pdp-pooling.servers=${envd:KAFKA_SERVERS}
+kafka.source.topics.policy-pdp-pooling.effectiveTopic=${envd:POOLING_TOPIC}
+kafka.source.topics.policy-pdp-pooling.apiKey=
+kafka.source.topics.policy-pdp-pooling.apiSecret=
+
+kafka.sink.topics.policy-pdp-pooling.servers=${envd:KAFKA_SERVERS}
+kafka.sink.topics.policy-pdp-pooling.effectiveTopic=${envd:POOLING_TOPIC}
+kafka.sink.topics.policy-pdp-pooling.apiKey=
+kafka.sink.topics.policy-pdp-pooling.apiSecret=
* hosts, instead of all running on a single, active host.
*
* <p>With each controller, there is an
- * associated DMaaP topic that is used for internal communication between the different hosts
+ * associated topic that is used for internal communication between the different hosts
* serving the controller.
*/
public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
String getHost();
/**
- * Gets the name of the internal DMaaP topic used by this manager to communicate with
+ * Gets the name of the internal topic used by this manager to communicate with
* its other hosts.
*
- * @return the name of the internal DMaaP topic
+ * @return the name of the internal topic
*/
String getTopic();
private final Serializer serializer;
/**
- * Internal DMaaP topic used by this controller.
+ * Internal topic used by this controller.
*/
@Getter
private final String topic;
/**
- * Manager for the internal DMaaP topic.
+ * Manager for the internal topic.
*/
private final TopicMessageManager topicMessageManager;
throw new PoolingFeatureRtException(e);
} catch (PoolingFeatureException e) {
- logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
+ logger.error("failed to attach internal topic to controller {}", controller.getName());
throw new PoolingFeatureRtException(e);
}
}
}
/**
- * Creates a DMaaP manager.
+ * Creates a topic manager.
*
- * @param topic name of the internal DMaaP topic
+ * @param topic name of the internal topic
* @return a new topic messages manager
* @throws PoolingFeatureException if an error occurs
*/
/**
* End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
- * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
+ * feature object. Uses real feature objects, as well as real sources and sinks. However, the
* following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
* </dl>
*
- * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
+ * <p>The following fields must be set before executing this: <ul> <li>SERVER</li>
* <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
*/
public class EndToEndFeatureTest {
private static Properties makeSourceProperties(String topic) {
Properties props = new Properties();
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS, topic);
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
if (EXTERNAL_TOPIC.equals(topic)) {
// consumer group is a constant
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
// consumer instance is generated by the BusConsumer code
}
/**
- * Starts threads for the host so that it begins consuming from both the external "DMaaP"
- * topic and its own internal "DMaaP" topic.
+ * Starts threads for the host so that it begins consuming from both the external
+ * topic and its own internal topic.
*/
public void start() {
feature.beforeStart(engine);
}
/**
- * DMaaP Manager with overrides.
+ * TopicManager with overrides.
*/
private static class TopicMessageManagerImpl extends TopicMessageManager {
* End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
* its own feature object. Uses real feature objects. However, the following are not:
* <dl>
- * <dt>DMaaP sources and sinks</dt>
* <dd>simulated using queues. There is one queue for the external topic, and one queue
* for each host's internal topic. Messages published to the "admin" channel are simply
* sent to all of the hosts' internal topic queues</dd>
private final AtomicBoolean sawMsg = new AtomicBoolean(false);
/**
- * This host's internal "DMaaP" topic.
+ * This host's internal topic.
*/
private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
/**
- * Queue for the external "DMaaP" topic.
+ * Queue for the external topic.
*/
@Getter
private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
/**
* Starts threads for the host so that it begins consuming from both the external
- * "DMaaP" topic and its own internal "DMaaP" topic.
+ * topic and its own internal topic.
*/
public void start() {
}
/**
- * DMaaP Manager with overrides.
+ * TopicManager with overrides.
*/
private static class TopicMessageManagerImpl extends TopicMessageManager {
DCAE_SERVERS=
DCAE_CONSUMER_GROUP=
-# Open DMaaP
+# Kafka
-# DMAAP_SERVERS=
-# DMAAP_HTTPS=true
+KAFKA_SERVERS=
+KAFKA_HTTPS=false
# AAI
# limitations under the License.
# ============LICENSE_END=========================================================
###
-POOLING_TOPIC=POLICY-PDP-POOLING
+POOLING_TOPIC=policy-pdp-pooling
CONTROLLER_NAME=policy-management-controller
CONTROLLER_PORT=9696
RULES_ARTIFACT=org.onap.policy:dummy-artifact:1.0.0-SNAPSHOT
-UEB_TOPIC=policyengine-develop
* This class is a wrapper around 'KieSession', which adds the following:
*
* <p>1) A thread running 'KieSession.fireUntilHalt()'
- * 2) Access to UEB
+ * 2) Access to topic
* 3) Logging of events
*/
public class PolicySession implements AgendaEventListener, RuleRuntimeEventListener {
* ONAP
* ================================================================================
* Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
}
/**
- * Intercept an event from UEB/DMaaP before the PolicyEngine processes it.
+ * Intercept an event from Bus before the PolicyEngine processes it.
*
* @return True if this feature intercepts and takes ownership of the operation
* preventing the invocation of lower priority features. False, otherwise.
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
protected Properties getProperties(Path propertiesPath) {
if (!Files.exists(propertiesPath)) {
- throw new IllegalArgumentException("properties for " + propertiesPath.toString() + " are not persisted.");
+ throw new IllegalArgumentException("properties for " + propertiesPath + " are not persisted.");
}
try {
return PropertyUtil.getProperties(propertiesPath.toFile());
} catch (final Exception e) {
- throw new IllegalArgumentException("can't read properties for " + propertiesPath.toString(), e);
+ throw new IllegalArgumentException("can't read properties for " + propertiesPath, e);
}
}
} else {
throw new IllegalArgumentException(
"property \"requestID\" is of type \"java.lang.String\", but got "
- + value.getClass().toString());
+ + value.getClass());
}
}
} else {
throw new IllegalArgumentException(
"property \"entity\" is of type \"java.lang.String\", but got "
- + value.getClass().toString());
+ + value.getClass());
}
}
* ONAP
* ================================================================================
* Copyright (C) 2017-2022 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
/**
* Policy Engine, the top abstraction for the Drools PDP Policy Engine. It abstracts away a Drools
- * PDP Engine from management purposes. This is the best place to looking at the code from a top
- * down approach. Other managed entities can be obtained from the PolicyEngine, hierarchically. <br>
+ * PDP Engine from management purposes. This is the best place to looking at the code from a top-down
+ * approach. Other managed entities can be obtained from the PolicyEngine, hierarchically. <br>
* PolicyEngine 1 --- * PolicyController 1 --- 1 DroolsController 1 --- 1 PolicyContainer 1 --- *
* PolicySession <br> PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicReader 1 --- 1
- * UebTopicReader <br> PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicReader 1 --- 1
- * DmaapTopicReader <br> PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicWriter 1 --- 1
- * DmaapTopicWriter <br> PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicReader 1 --- 1
* RestTopicReader <br> PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicWriter 1 --- 1
* RestTopicWriter <br> PolicyEngine 1 --- 1 ManagementServer
*/
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2021 Nordix Foundation.
+ * Modifications Copyright (C) 2021, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
this.name = name;
/*
- * 1. Register read topics with network infrastructure (ueb, dmaap, rest) 2. Register write
- * topics with network infrastructure (ueb, dmaap, rest) 3. Register with drools
- * infrastructure
+ * 1. Register read topics with network infrastructure
+ * 2. Register write topics with network infrastructure
+ * 3. Register with drools infrastructure
*/
// Create/Reuse Readers/Writers for all event sources endpoints
get:
tags:
- pdp-d-telemetry
- summary: Retrieves the latest events received by an UEB topic
- description: This is a Network Communicaton Endpoint source of messages for
+ summary: Retrieves the latest events received by a topic
+ description: This is a Network Communication Endpoint source of messages for
the Engine
operationId: sourceEvents
parameters:
get:
tags:
- pdp-d-telemetry
- summary: Retrieves the latest events received by an UEB topic
+ summary: Retrieves the latest events received by a topic
description: This is a Network Communication Endpoint source of messages for
the Engine
operationId: sinkEvents
topicCommInfrastructure:
type: string
enum:
- - UEB
- - DMAAP
- KAFKA
- NOOP
- REST
topicCommInfrastructure:
type: string
enum:
- - UEB
- - DMAAP
- KAFKA
- NOOP
- REST
topicCommInfrastructure:
type: string
enum:
- - UEB
- - DMAAP
- KAFKA
- NOOP
- REST
"/topics/sources/{comm}/{topic}/events" : {
"get" : {
"tags" : [ "pdp-d-telemetry" ],
- "summary" : "Retrieves the latest events received by an UEB topic",
- "description" : "This is a Network Communicaton Endpoint source of messages for the Engine",
+ "summary" : "Retrieves the latest events received by a topic",
+ "description" : "This is a Network Communication Endpoint source of messages for the Engine",
"operationId" : "sourceEvents",
"parameters" : [ {
"name" : "comm",
"/topics/sinks/{comm}/{topic}/events" : {
"get" : {
"tags" : [ "pdp-d-telemetry" ],
- "summary" : "Retrieves the latest events received by an UEB topic",
+ "summary" : "Retrieves the latest events received by a topic",
"description" : "This is a Network Communication Endpoint source of messages for the Engine",
"operationId" : "sinkEvents",
"parameters" : [ {
},
"topicCommInfrastructure" : {
"type" : "string",
- "enum" : [ "UEB", "DMAAP", "KAFKA", "NOOP", "REST" ]
+ "enum" : [ "KAFKA", "NOOP", "REST" ]
},
"alive" : {
"type" : "boolean"
},
"topicCommInfrastructure" : {
"type" : "string",
- "enum" : [ "UEB", "DMAAP", "KAFKA", "NOOP", "REST" ]
+ "enum" : [ "KAFKA", "NOOP", "REST" ]
},
"alive" : {
"type" : "boolean"
},
"topicCommInfrastructure" : {
"type" : "string",
- "enum" : [ "UEB", "DMAAP", "KAFKA", "NOOP", "REST" ]
+ "enum" : [ "KAFKA", "NOOP", "REST" ]
},
"alive" : {
"type" : "boolean"
done
if [ -z "${BUS_HOST}" ]; then
- echo "An UEB/KAFKA server must be provided."
+ echo "A KAFKA server must be provided."
echo
usage
exit 1
done
if [ -z "${BUS_HOST}" ]; then
- echo "An UEB/KAFKA server must be provided."
+ echo "A KAFKA server must be provided."
echo
usage
exit 1
done
if [ -z "${BUS_HOST}" ]; then
- echo "An UEB/KAFKA server must be provided."
+ echo "A KAFKA server must be provided."
echo
usage
exit 1
rm -f "${LIB}"/"${depJarName}"
# case there were multiple features using this library
- # re-enable link fron an enabled feature
+ # re-enable link from an enabled feature
for aDepsEnabledMap in ${xDepsEnabledMap}; do
if [ $(basename "${aDepsEnabledMap}") = ${depJarName} ]; then
ln -s -f "${aDepsEnabledMap}" "${LIB}/"
echo ""
}
-BUS_PORT=9092
+BUS_PORT=29092
+BUS_HOST=kafka
REQUEST_ID="7f5474ca-16a9-42ac-abc0-d86f62296fbc"
-TOPIC="PDPD-CONFIGURATION"
+TOPIC="pdpd-configuration"
# command line options parsing
until [ -z "$1" ]; do
done
if [ -z "${BUS_HOST}" ]; then
- echo "An UEB/KAFKA server must be provided."
+ echo "A KAFKA server must be provided."
echo
usage
exit 1
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import java.io.File;
import java.io.IOException;
import org.onap.policy.drools.protocol.coders.EventProtocolParams;
import org.onap.policy.drools.protocol.coders.JsonProtocolFilter;
import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
+import org.onap.policy.drools.server.restful.RestManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
void test100Configure() {
var manager = (PolicyEngineManager) PolicyEngineConstants.getManager();
var engineProps = manager.defaultTelemetryConfig();
+ engineProps.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
+ + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, RestManager.class.getName());
/* override default port */
engineProps.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."