import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
import org.onap.policy.common.endpoints.http.server.YamlJacksonHandler;
import org.onap.policy.common.gson.JacksonHandler;
+import org.onap.policy.common.parameters.topic.BusTopicParams;
import org.onap.policy.common.utils.logging.LoggerUtils;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.onap.policy.drools.system.PolicyController;
import java.util.Properties;
import lombok.Getter;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicListener;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import org.onap.policy.drools.system.PolicyEngineConstants;
import org.slf4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
+import org.onap.policy.common.message.bus.event.noop.NoopTopicFactories;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import org.onap.policy.drools.system.PolicyEngineConstants;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.event.noop.NoopTopicFactories;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
class LegacyConfigTest {
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
import org.onap.policy.common.endpoints.http.server.YamlJacksonHandler;
import org.onap.policy.common.gson.JacksonHandler;
+import org.onap.policy.common.parameters.topic.BusTopicParams;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.onap.policy.drools.legacy.config.LegacyConfigFeature;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.endpoints.listeners.ScoListener;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
+import org.onap.policy.common.message.bus.event.client.TopicSinkClient;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.common.utils.resources.PrometheusUtils;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.drools.domain.models.controller.ControllerCustomSerialization;
import org.onap.policy.drools.domain.models.controller.ControllerEvent;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SOURCE_TOPICS;
import com.google.common.base.Strings;
import java.io.IOException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.noop.NoopTopicFactories;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.logging.LoggerUtils;
// @formatter:on
Properties noopTopicProperties = new Properties();
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS, TestConstants.DCAE_TOPIC);
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, TestConstants.APPC_CL_TOPIC);
+ noopTopicProperties.put(PROPERTY_NOOP_SOURCE_TOPICS, TestConstants.DCAE_TOPIC);
+ noopTopicProperties.put(PROPERTY_NOOP_SINK_TOPICS, TestConstants.APPC_CL_TOPIC);
TopicEndpointManager.getManager().addTopics(noopTopicProperties);
savedFsm = LifecycleFeature.getFsm();
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SOURCE_TOPICS;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.drools.server.restful.TestConstants;
// add topics
Properties noopTopicProperties = new Properties();
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS, TestConstants.DCAE_TOPIC);
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, TestConstants.APPC_CL_TOPIC);
+ noopTopicProperties.put(PROPERTY_NOOP_SOURCE_TOPICS, TestConstants.DCAE_TOPIC);
+ noopTopicProperties.put(PROPERTY_NOOP_SINK_TOPICS, TestConstants.APPC_CL_TOPIC);
TopicEndpointManager.getManager().addTopics(noopTopicProperties);
assertTrue(fsm.update(update));
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
+import org.onap.policy.common.message.bus.event.noop.NoopTopicFactories;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.logging.LoggerUtils;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SOURCE_TOPICS;
import java.io.IOException;
import java.util.Properties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.drools.domain.models.controller.ControllerPolicy;
import org.onap.policy.drools.server.restful.TestConstants;
() -> PolicyControllerConstants.getFactory().get(controllerPolicy.getName()));
Properties noopTopicProperties = new Properties();
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS, TestConstants.DCAE_TOPIC);
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, TestConstants.APPC_CL_TOPIC);
+ noopTopicProperties.put(PROPERTY_NOOP_SOURCE_TOPICS, TestConstants.DCAE_TOPIC);
+ noopTopicProperties.put(PROPERTY_NOOP_SINK_TOPICS, TestConstants.APPC_CL_TOPIC);
TopicEndpointManager.getManager().addTopics(noopTopicProperties);
assertTrue(controller.deploy(policy));
Properties noopTopicProperties = new Properties();
String noopSources = String.join(",", TestConstants.DCAE_TOPIC, TestConstants.APPC_CL_TOPIC,
TestConstants.APPC_LCM_WRITE_TOPIC, TestConstants.SDNR_CL_RSP_TOPIC);
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS, noopSources);
+ noopTopicProperties.put(PROPERTY_NOOP_SOURCE_TOPICS, noopSources);
String noopSinks = String.join(",", TestConstants.APPC_CL_TOPIC, TestConstants.APPC_LCM_READ_TOPIC,
TestConstants.POLICY_CL_MGT_TOPIC, TestConstants.DCAE_CL_RSP_TOPIC);
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, noopSinks);
+ noopTopicProperties.put(PROPERTY_NOOP_SINK_TOPICS, noopSinks);
TopicEndpointManager.getManager().addTopics(noopTopicProperties);
ToscaPolicy nativeControllerPolicy =
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SOURCE_TOPICS;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.Response;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
import org.onap.policy.common.endpoints.http.server.YamlJacksonHandler;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.JacksonHandler;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.noop.NoopTopicFactories;
+import org.onap.policy.common.parameters.topic.BusTopicParams;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.logging.LoggerUtils;
controllerSupport.installArtifact();
Properties noopTopicProperties = new Properties();
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS,
+ noopTopicProperties.put(PROPERTY_NOOP_SOURCE_TOPICS,
String.join(",", TestConstants.DCAE_TOPIC, TestConstants.APPC_CL_TOPIC,
TestConstants.APPC_LCM_WRITE_TOPIC, TestConstants.SDNR_CL_RSP_TOPIC));
- noopTopicProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS,
+ noopTopicProperties.put(PROPERTY_NOOP_SINK_TOPICS,
String.join(",", TestConstants.APPC_CL_TOPIC, TestConstants.APPC_LCM_READ_TOPIC,
TestConstants.POLICY_CL_MGT_TOPIC, TestConstants.SDNR_CL_TOPIC, TestConstants.DCAE_CL_RSP_TOPIC));
TopicEndpointManager.getManager().addTopics(noopTopicProperties);
import java.util.concurrent.CountDownLatch;
import lombok.AccessLevel;
import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicListener;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Leader;
* @param topic2 topic
* @param event event, as an object
* @param eventHashCode event's hash code
- * @return {@code true} if the event was handled, {@code false} if the invoker should
- * handle it
+ * @return {@code true} if the event was handled, {@code false} if the invoker should handle it
*/
private boolean handleExternal(String topic2, Object event, int eventHashCode) {
if (assignments == null) {
}
/**
- * Handles a {@link Forward} event, possibly forwarding it again.
+ * Handles a forward event, possibly forwarding it again.
*
* @param topic2 topic
* @param event event, as an object
import java.util.List;
import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.TopicEndpoint;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicListener;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_MANAGED_SUFFIX;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX;
import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
import com.google.gson.Gson;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicListener;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
private static Properties makeSinkProperties(String topic) {
Properties props = new Properties();
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS, topic);
+ props.setProperty(PROPERTY_KAFKA_SINK_TOPICS, topic);
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
+ props.setProperty(PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ + PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
+ props.setProperty(PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ + PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
+ props.setProperty(PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ + PROPERTY_MANAGED_SUFFIX, "false");
return props;
}
private static Properties makeSourceProperties(String topic) {
Properties props = new Properties();
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS, topic);
+ props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS, topic);
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
+ props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ + PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
+ props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ + PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
+ props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ + PROPERTY_MANAGED_SUFFIX, "false");
if (EXTERNAL_TOPIC.equals(topic)) {
// consumer group is a constant
- props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
+ props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ + PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
// consumer instance is generated by the BusConsumer code
}
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicListener;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicListener;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Heartbeat;
import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.TopicListener;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
class TopicMessageManagerTest {
Properties=java.util.Properties,
ProtocolCoderToolset=org.onap.policy.drools.protocol.coders.ProtocolCoderToolset,
Response=jakarta.ws.rs.core.Response,
- TopicEndpoint=org.onap.policy.common.endpoints.event.comm.TopicEndpoint,
- TopicSink=org.onap.policy.common.endpoints.event.comm.TopicSink,
- TopicSource=org.onap.policy.common.endpoints.event.comm.TopicSource,
+ TopicEndpoint=org.onap.policy.common.message.bus.event.TopicEndpoint,
+ TopicSink=org.onap.policy.common.message.bus.event.TopicSink,
+ TopicSource=org.onap.policy.common.message.bus.event.TopicSource,
ToscaPolicy=org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy
</importMappings>
<typeMappings>
<artifactId>policy-domains</artifactId>
<version>${project.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.onap.policy.common</groupId>
+ <artifactId>message-bus</artifactId>
+ <version>${policy.common.version}</version>
+ </dependency>
<dependency>
<groupId>org.onap.policy.common</groupId>
<artifactId>policy-endpoints</artifactId>
* ONAP
* ================================================================================
* Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import lombok.NonNull;
import org.onap.policy.common.capabilities.Lockable;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.drools.core.PolicyContainer;
import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
* @return true if successful, false if a failure has occurred.
* @throws IllegalArgumentException when invalid or insufficient properties are provided
* @throws IllegalStateException when the engine is in a state where this operation is not
- * permitted (ie. locked or stopped).
+ * permitted (i.e. locked or stopped).
* @throws UnsupportedOperationException when the engine cannot deliver due to the functionality
- * missing (ie. communication infrastructure not supported.
+ * missing (i.e. communication infrastructure not supported).
*/
boolean deliver(TopicSink sink, Object event);
* ONAP
* ================================================================================
* Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.List;
import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
/**
package org.onap.policy.drools.controller;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SOURCE_TOPICS;
+import static org.onap.policy.drools.system.PolicyEngineConstants.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX;
+import static org.onap.policy.drools.system.PolicyEngineConstants.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX;
+import static org.onap.policy.drools.system.PolicyEngineConstants.PROPERTY_TOPIC_EVENTS_SUFFIX;
+
import com.google.re2j.Pattern;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.internal.MavenDroolsController;
import org.onap.policy.drools.controller.internal.NullDroolsController;
@Override
public DroolsController build(Properties properties, List<? extends TopicSource> eventSources,
- List<? extends TopicSink> eventSinks) throws LinkageError {
+ List<? extends TopicSink> eventSinks) throws LinkageError {
String groupId = properties.getProperty(DroolsPropertyConstants.RULES_GROUPID);
if (StringUtils.isBlank(groupId)) {
List<TopicCoderFilterConfiguration> topics2EncodedClasses2Filters = codersAndFilters(properties, eventSinks);
return this.build(properties, groupId, artifactId, version,
- topics2DecodedClasses2Filters, topics2EncodedClasses2Filters);
+ topics2DecodedClasses2Filters, topics2EncodedClasses2Filters);
}
@Override
public DroolsController build(Properties properties, String newGroupId, String newArtifactId, String newVersion,
- List<TopicCoderFilterConfiguration> decoderConfigurations,
- List<TopicCoderFilterConfiguration> encoderConfigurations) throws LinkageError {
+ List<TopicCoderFilterConfiguration> decoderConfigurations,
+ List<TopicCoderFilterConfiguration> encoderConfigurations) throws LinkageError {
if (StringUtils.isBlank(newGroupId)) {
throw new IllegalArgumentException("Missing maven group-id coordinate");
*/
controllerCopy.updateToVersion(newGroupId, newArtifactId, newVersion, decoderConfigurations,
- encoderConfigurations);
+ encoderConfigurations);
return controllerCopy;
}
/* new drools controller */
DroolsController controller = applyBeforeInstance(properties, newGroupId, newArtifactId, newVersion,
- decoderConfigurations, encoderConfigurations);
+ decoderConfigurations, encoderConfigurations);
if (controller == null) {
controller = new MavenDroolsController(newGroupId, newArtifactId, newVersion, decoderConfigurations,
- encoderConfigurations);
+ encoderConfigurations);
}
synchronized (this) {
FeatureApiUtils.apply(getProviders(),
feature -> feature.afterInstance(controllerFinal, properties),
(feature, ex) -> logger.error("feature {} ({}) afterInstance() of drools controller {}:{}:{} failed",
- feature.getName(), feature.getSequenceNumber(),
- newGroupId, newArtifactId, newVersion, ex));
+ feature.getName(), feature.getSequenceNumber(),
+ newGroupId, newArtifactId, newVersion, ex));
return controller;
}
private DroolsController applyBeforeInstance(Properties properties, String newGroupId, String newArtifactId,
- String newVersion, List<TopicCoderFilterConfiguration> decoderConfigurations,
- List<TopicCoderFilterConfiguration> encoderConfigurations) {
+ String newVersion,
+ List<TopicCoderFilterConfiguration> decoderConfigurations,
+ List<TopicCoderFilterConfiguration> encoderConfigurations) {
DroolsController controller = null;
- for (DroolsControllerFeatureApi feature: getProviders()) {
+ for (DroolsControllerFeatureApi feature : getProviders()) {
try {
controller = feature.beforeInstance(properties,
- newGroupId, newArtifactId, newVersion,
- decoderConfigurations, encoderConfigurations);
+ newGroupId, newArtifactId, newVersion,
+ decoderConfigurations, encoderConfigurations);
if (controller != null) {
logger.info("feature {} ({}) beforeInstance() has intercepted drools controller {}:{}:{}",
- feature.getName(), feature.getSequenceNumber(),
- newGroupId, newArtifactId, newVersion);
+ feature.getName(), feature.getSequenceNumber(),
+ newGroupId, newArtifactId, newVersion);
break;
}
} catch (RuntimeException r) {
logger.error("feature {} ({}) beforeInstance() of drools controller {}:{}:{} failed",
- feature.getName(), feature.getSequenceNumber(),
- newGroupId, newArtifactId, newVersion, r);
+ feature.getName(), feature.getSequenceNumber(),
+ newGroupId, newArtifactId, newVersion, r);
}
}
return controller;
/**
* find out decoder classes and filters.
*
- * @param properties properties with information about decoders
+ * @param properties properties with information about decoders
* @param topicEntities topic sources
- * @return list of topics, each with associated decoder classes, each with a list of associated
- * filters
+ * @return list of topics, each with associated decoder classes, each with a list of associated filters
* @throws IllegalArgumentException invalid input data
*/
protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties,
- List<? extends Topic> topicEntities) {
+ List<? extends Topic> topicEntities) {
List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>();
// 3. second the list of classes associated with each topic
String eventClasses = properties
- .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
+ .getProperty(propertyTopicEntityPrefix + PROPERTY_TOPIC_EVENTS_SUFFIX);
if (StringUtils.isBlank(eventClasses)) {
logger.warn("There are no event classes for topic {}", firstTopic);
}
List<PotentialCoderFilter> classes2Filters =
- getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses);
+ getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses);
topics2DecodedClasses2Filters
- .add(new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder));
+ .add(new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder));
}
return topics2DecodedClasses2Filters;
var commInfra = topic.getTopicCommInfrastructure();
if (commInfra == CommInfrastructure.NOOP) {
if (isSource) {
- return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
+ return PROPERTY_NOOP_SOURCE_TOPICS + ".";
} else {
- return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
+ return PROPERTY_NOOP_SINK_TOPICS + ".";
}
} else if (commInfra == CommInfrastructure.KAFKA) {
if (isSource) {
- return PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + ".";
+ return PROPERTY_KAFKA_SOURCE_TOPICS + ".";
} else {
- return PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + ".";
+ return PROPERTY_KAFKA_SINK_TOPICS + ".";
}
} else {
throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) {
String customGson = properties.getProperty(propertyPrefix
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
+ + PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
CustomGsonCoder customGsonCoder = null;
if (StringUtils.isNotBlank(customGson)) {
customGsonCoder = new CustomGsonCoder(customGson);
} catch (IllegalArgumentException e) {
logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
- e.getMessage(), e);
+ e.getMessage(), e);
}
}
return customGsonCoder;
}
private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix,
- @NonNull String eventClasses) {
+ @NonNull String eventClasses) {
List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
for (String theClass : COMMA_SPACE_PAT.split(eventClasses)) {
// 4. for each coder class, get the filter expression
String filter = properties
- .getProperty(propertyPrefix
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
- + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+ .getProperty(propertyPrefix
+ + PROPERTY_TOPIC_EVENTS_SUFFIX
+ + "." + theClass + PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
var class2Filters = new PotentialCoderFilter(theClass, new JsonProtocolFilter(filter));
classes2Filters.add(class2Filters);
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.FactHandle;
import org.kie.api.runtime.rule.QueryResultsRow;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.common.utils.services.OrderedServiceImpl;
import org.onap.policy.drools.controller.DroolsController;
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.Map;
import lombok.NonNull;
import lombok.ToString;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
* ONAP
* ================================================================================
* Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.List;
import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.common.utils.services.OrderedService;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
package org.onap.policy.drools.features;
import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
import org.onap.policy.common.utils.services.OrderedService;
import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
import org.onap.policy.drools.system.PolicyController;
*
* @return true if this feature intercepts and takes ownership
* of the operation preventing the invocation of
- * lower priority features. False, otherwise..
+ * lower priority features. False, otherwise.
*/
default boolean afterLock(PolicyController controller) {
return false;
*
* @return true if this feature intercepts and takes ownership
* of the operation preventing the invocation of
- * lower priority features. False, otherwise..
+ * lower priority features. False, otherwise.
*/
default boolean beforeShutdown(PolicyController controller) {
return false;
*
* @return true if this feature intercepts and takes ownership
* of the operation preventing the invocation of
- * lower priority features. False, otherwise..
+ * lower priority features. False, otherwise.
*/
default boolean beforeHalt(PolicyController controller) {
return false;
package org.onap.policy.drools.features;
import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
import org.onap.policy.common.utils.services.OrderedService;
import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.http.server.YamlMessageBodyHandler;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.logging.LoggerUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
import org.onap.policy.common.utils.logging.LoggerUtils;
import org.onap.policy.common.utils.resources.MessageConstants;
import org.onap.policy.common.utils.security.CryptoUtils;
* policy-management
* ================================================================================
* Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2021 Nordix Foundation.
+ * Modifications Copyright (C) 2021, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.Properties;
import org.onap.policy.common.capabilities.Lockable;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
* @return true if successful, false if a failure has occurred.
* @throws IllegalArgumentException when invalid or insufficient properties are provided
* @throws IllegalStateException when the engine is in a state where this operation is not
- * permitted (ie. locked or stopped).
+ * permitted (i.e. locked or stopped).
* @throws UnsupportedOperationException when the engine cannot deliver due to the functionality
- * missing (ie. communication infrastructure not supported.
+ * missing (i.e. communication infrastructure not supported).
*/
boolean deliver(CommInfrastructure busType, String topic, Object event);
import java.util.concurrent.ScheduledExecutorService;
import org.onap.policy.common.capabilities.Lockable;
import org.onap.policy.common.capabilities.Startable;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicListener;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.drools.core.lock.Lock;
import org.onap.policy.drools.core.lock.LockCallback;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
* @return true if successful, false if a failure has occurred.
* @throws IllegalArgumentException when invalid or insufficient properties are provided
* @throws IllegalStateException when the engine is in a state where this operation is not
- * permitted (ie. locked or stopped).
+ * permitted (i.e. locked or stopped).
* @throws UnsupportedOperationException when the engine cannot deliver due to the functionality
- * missing (ie. communication infrastructure not supported.
+ * missing (i.e. communication infrastructure not supported).
*/
boolean deliver(CommInfrastructure busType, String topic, String event);
* policy-management
* ================================================================================
* Copyright (C) 2019, 2021-2022 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
public static final String TELEMETRY_SERVER_DEFAULT_NAME = "TELEMETRY";
+ /**
+ * Topics constants.
+ */
+ public static final String PROPERTY_TOPIC_EVENTS_SUFFIX = ".events";
+ public static final String PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX = ".events.custom.gson";
+ public static final String PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX = ".filter";
+
/**
* Policy Engine Manager.
*/
import lombok.Synchronized;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicEndpoint;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.logging.LoggerUtils;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.onap.policy.common.utils.resources.PrometheusUtils;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.ToString;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.event.TopicEndpoint;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicListener;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.common.utils.services.OrderedServiceImpl;
import org.onap.policy.drools.core.PolicyContainer;
import org.onap.policy.drools.core.PolicySession;
import java.util.Collections;
import java.util.Properties;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.drools.controller.DroolsController;
/**
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure.NOOP;
+import static org.onap.policy.common.message.bus.event.Topic.CommInfrastructure.NOOP;
import java.util.Properties;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.message.bus.event.Topic;
import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
import org.onap.policy.drools.system.PolicyEngine;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
import java.util.Properties;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
/**
void test() {
final Properties noopSinkProperties = new Properties();
- noopSinkProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
+ noopSinkProperties.put(PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
TopicEndpointManager.getManager().addTopicSinks(noopSinkProperties);
@Test
void test_extra() {
final Properties noopSinkProperties = new Properties();
- noopSinkProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
+ noopSinkProperties.put(PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
TopicEndpointManager.getManager().addTopicSinks(noopSinkProperties);
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
+import static org.onap.policy.drools.system.PolicyEngineConstants.PROPERTY_TOPIC_EVENTS_SUFFIX;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.api.builder.ReleaseId;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicSink;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.controller.internal.MavenDroolsControllerTest;
}
Properties sinkConfig = new Properties();
- sinkConfig.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, JUNIT_PROTOCOL_CODER_TOPIC);
+ sinkConfig.put(PROPERTY_NOOP_SINK_TOPICS, JUNIT_PROTOCOL_CODER_TOPIC);
final List<TopicSink> noopTopics = TopicEndpointManager.getManager().addTopicSinks(sinkConfig);
Properties droolsControllerConfig = getDroolsControllerConfig();
droolsControllerConfig.put(DroolsPropertyConstants.RULES_ARTIFACTID, releaseId.getArtifactId());
droolsControllerConfig.put(DroolsPropertyConstants.RULES_VERSION, releaseId.getVersion());
droolsControllerConfig.put(
- PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + JUNIT_PROTOCOL_CODER_TOPIC
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX,
+ PROPERTY_NOOP_SINK_TOPICS + "." + JUNIT_PROTOCOL_CODER_TOPIC
+ + PROPERTY_TOPIC_EVENTS_SUFFIX,
ThreeStrings.class.getName());
return droolsControllerConfig;
}
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.MockitoAnnotations;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicEndpoint;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.internal.NullDroolsController;
import org.onap.policy.drools.features.PolicyControllerFeatureApi;
package org.onap.policy.drools.server.restful.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SOURCE_TOPICS;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
import java.io.IOException;
import java.nio.file.Files;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.http.server.YamlJacksonHandler;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.JacksonHandler;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import org.onap.policy.drools.system.PolicyControllerConstants;
private static final String FOO_CONTROLLER = "foo";
private static final String KAFKA_TOPIC = "kafka-topic-test";
private static final String NOOP_TOPIC = "noop_topic";
- private static final String KAFKA_SOURCE_SERVER_PROPERTY = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS
- + "." + KAFKA_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
- private static final String KAFKA_SINK_SERVER_PROPERTY = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "."
- + KAFKA_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
+ private static final String KAFKA_SOURCE_SERVER_PROPERTY = PROPERTY_KAFKA_SOURCE_TOPICS
+ + "." + KAFKA_TOPIC + PROPERTY_TOPIC_SERVERS_SUFFIX;
+ private static final String KAFKA_SINK_SERVER_PROPERTY = PROPERTY_KAFKA_SINK_TOPICS + "."
+ + KAFKA_TOPIC + PROPERTY_TOPIC_SERVERS_SUFFIX;
private static final String KAFKA_SERVER = "localhost:9092";
private static final String FOO_CONTROLLER_FILE = FOO_CONTROLLER + "-controller.properties";
/* override default port */
final Properties engineProps = PolicyEngineConstants.getManager().defaultTelemetryConfig();
- engineProps.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+ engineProps.put(PROPERTY_HTTP_SERVER_SERVICES + "."
+ PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME
- + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, "" + DEFAULT_TELEMETRY_PORT);
- engineProps.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+ + PROPERTY_HTTP_PORT_SUFFIX, "" + DEFAULT_TELEMETRY_PORT);
+ engineProps.put(PROPERTY_HTTP_SERVER_SERVICES + "."
+ PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME
- + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
+ + PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
TELEMETRY_USER);
- engineProps.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+ engineProps.put(PROPERTY_HTTP_SERVER_SERVICES + "."
+ PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME
- + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
+ + PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
TELEMETRY_PASSWORD);
- engineProps.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
+ engineProps.put(PROPERTY_HTTP_SERVER_SERVICES + "."
+ PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME
- + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER,
+ + PROPERTY_HTTP_SERIALIZATION_PROVIDER,
String.join(",", JacksonHandler.class.getName(), YamlJacksonHandler.class.getName()));
/* other properties */
- engineProps.put(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS, KAFKA_TOPIC);
- engineProps.put(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS, KAFKA_TOPIC);
+ engineProps.put(PROPERTY_KAFKA_SOURCE_TOPICS, KAFKA_TOPIC);
+ engineProps.put(PROPERTY_KAFKA_SINK_TOPICS, KAFKA_TOPIC);
engineProps.put(KAFKA_SOURCE_SERVER_PROPERTY, KAFKA_SERVER);
engineProps.put(KAFKA_SINK_SERVER_PROPERTY, KAFKA_SERVER);
}
Properties noopProperties = new Properties();
- noopProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS, NOOP_TOPIC);
- noopProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
+ noopProperties.put(PROPERTY_NOOP_SOURCE_TOPICS, NOOP_TOPIC);
+ noopProperties.put(PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
TopicEndpointManager.getManager().addTopics(noopProperties);
}
putTest(HOST_URL + "/engine/topics/sources/noop/" + NOOP_TOPIC + "/events", 200,
"{x:y}", ContentType.TEXT_PLAIN);
putTest(HOST_URL + "/engine/topics/sources/kafka/" + KAFKA_TOPIC + "/events", 200,
- "FOOOO", ContentType.TEXT_PLAIN);
+ "FOOOO", ContentType.TEXT_PLAIN);
putTest(HOST_URL + "/engine/topics/sources/kafka/fiznits/events", 406,
- "FOOOO", ContentType.TEXT_PLAIN);
+ "FOOOO", ContentType.TEXT_PLAIN);
putTest(HOST_URL + "/engine/topics/switches/lock", 200);
putTest(HOST_URL + "/engine/topics/sources/kafka/" + KAFKA_TOPIC + "/events",
- 406, "FOOOO", ContentType.TEXT_PLAIN);
+ 406, "FOOOO", ContentType.TEXT_PLAIN);
deleteTest(HOST_URL + "/engine/topics/switches/lock", 200);
}
* policy-management
* ================================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2023 Nordix Foundation.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.gson.GsonSerializer;
import org.onap.policy.common.utils.gson.GsonTestUtilsBuilder;
import org.onap.policy.drools.controller.DroolsController;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicEndpoint;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.gson.GsonTestUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.core.lock.Lock;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_NOOP_SINK_TOPICS;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import java.io.File;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.noop.NoopTopicFactories;
import org.onap.policy.common.utils.gson.GsonTestUtils;
import org.onap.policy.drools.persistence.SystemPersistenceConstants;
import org.onap.policy.drools.properties.DroolsPropertyConstants;
@Test
void test350TopicDeliver() {
final Properties noopSinkProperties = new Properties();
- noopSinkProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
+ noopSinkProperties.put(PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
TopicEndpointManager.getManager().addTopicSinks(noopSinkProperties).get(0).start();
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
+import org.onap.policy.common.message.bus.event.TopicEndpoint;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.gson.GsonTestUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerFactory;