* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.controlloop.actor.test;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.lenient;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
+import java.util.List;
import java.util.function.BiConsumer;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.onap.policy.common.utils.time.PseudoExecutor;
-import org.onap.policy.controlloop.VirtualControlLoopEvent;
-import org.onap.policy.controlloop.actorserviceprovider.ActorService;
-import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
-import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
-import org.onap.policy.controlloop.actorserviceprovider.impl.BidirectionalTopicOperator;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
+import org.onap.policy.simulators.TopicServer;
+import org.onap.policy.simulators.Util;
/**
* Superclass for various BidirectionalTopicOperation tests.
+ *
+ * @param <Q> request type
*/
-public class BasicBidirectionalTopicOperation {
- protected static final UUID REQ_ID = UUID.randomUUID();
- protected static final String DEFAULT_ACTOR = "default-actor";
- protected static final String DEFAULT_OPERATION = "default-operation";
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+public abstract class BasicBidirectionalTopicOperation<Q> extends BasicOperation {
protected static final String MY_SINK = "my-sink";
protected static final String MY_SOURCE = "my-source";
- protected static final String TARGET_ENTITY = "my-target";
- protected static final Coder coder = new StandardCoder();
- protected static final int TIMEOUT = 10;
+ protected static final int TIMEOUT_SEC = 10;
+ protected static final long TIMEOUT_MS = 1000L * TIMEOUT_SEC;
+
+ // sink and source used by the TopicServer
+ private static TopicSink serverSink;
+ private static TopicSource serverSource;
+ private static BidirectionalTopicHandler realTopicHandler;
- protected final String actorName;
- protected final String operationName;
+ protected static BidirectionalTopicManager topicMgr = (sink, source) -> {
+ // note: the sink and source names are swapped for the simulator
+ assertEquals(serverSource.getTopic(), sink);
+ assertEquals(serverSink.getTopic(), source);
+ return realTopicHandler;
+ };
@Captor
protected ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
- @Mock
- protected ActorService service;
@Mock
protected BidirectionalTopicHandler topicHandler;
@Mock
protected Forwarder forwarder;
@Mock
- protected BidirectionalTopicOperator operator;
+ protected BidirectionalTopicConfig config;
- protected BidirectionalTopicParams topicParams;
- protected ControlLoopOperationParams params;
- protected Map<String, String> enrichment;
- protected VirtualControlLoopEvent event;
- protected ControlLoopEventContext context;
- protected OperationOutcome outcome;
- protected PseudoExecutor executor;
+ private TopicServer<Q> topicServer;
- /**
- * Constructs the object using a default actor and operation name.
- */
- public BasicBidirectionalTopicOperation() {
- this.actorName = DEFAULT_ACTOR;
- this.operationName = DEFAULT_OPERATION;
- }
/**
* Constructs the object.
* @param actor actor name
* @param operation operation name
*/
- public BasicBidirectionalTopicOperation(String actor, String operation) {
- this.actorName = actor;
- this.operationName = operation;
+ protected BasicBidirectionalTopicOperation(String actor, String operation) {
+ super(actor, operation);
}
/**
- * Initializes mocks and sets up.
+ * Starts the topic.
+ *
+ * @throws BidirectionalTopicClientException if the client cannot be built
*/
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
+ protected static void initBeforeClass(String sinkTopic, String sourceTopic)
+ throws BidirectionalTopicClientException {
+
+ // note: the sink and source names are swapped for the simulator
+ var ptopic = new TopicParameters();
+ ptopic.setTopic(sourceTopic);
+ ptopic.setManaged(true);
+ ptopic.setServers(List.of("localhost"));
+ ptopic.setTopicCommInfrastructure("NOOP");
+ ptopic.setFetchTimeout(500);
+ serverSink = TopicEndpointManager.getManager().addTopicSinks(List.of(ptopic)).get(0);
+
+ ptopic.setTopic(sinkTopic);
+ serverSource = TopicEndpointManager.getManager().addTopicSources(List.of(ptopic)).get(0);
+
+ serverSink.start();
+ serverSource.start();
+
+ if (!sinkTopic.equals(sourceTopic)) {
+ // sink and source are different - create other ends for the actor
+ initActorTopics(sinkTopic, sourceTopic, ptopic);
+ }
- executor = new PseudoExecutor();
+ realTopicHandler = new BidirectionalTopicHandler(sinkTopic, sourceTopic);
+ realTopicHandler.start();
+ }
- makeContext();
+ private static void initActorTopics(String sinkTopic, String sourceTopic, TopicParameters ptopic) {
+ // create sink and source for the actor, too
+ ptopic.setTopic(sinkTopic);
+ TopicEndpointManager.getManager().addTopicSinks(List.of(ptopic)).get(0).start();
- outcome = params.makeOutcome();
- topicParams = BidirectionalTopicParams.builder().sinkTopic(MY_SINK).sourceTopic(MY_SOURCE).timeoutSec(TIMEOUT)
- .build();
+ ptopic.setTopic(sourceTopic);
+ TopicEndpointManager.getManager().addTopicSources(List.of(ptopic)).get(0).start();
+ }
- initOperator();
+ protected static void destroyAfterClass() {
+ TopicEndpointManager.getManager().shutdown();
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+ HttpClientFactoryInstance.getClientFactory().destroy();
}
/**
- * Reinitializes {@link #enrichment}, {@link #event}, {@link #context}, and
- * {@link #params}.
- * <p/>
- * Note: {@link #params} is configured to use {@link #executor}.
+ * Initializes mocks and sets up.
*/
- protected void makeContext() {
- enrichment = new TreeMap<>(makeEnrichment());
-
- event = new VirtualControlLoopEvent();
- event.setRequestId(REQ_ID);
- event.setAai(enrichment);
-
- context = new ControlLoopEventContext(event);
-
- params = ControlLoopOperationParams.builder().executor(executor).context(context).actorService(service)
- .actor(actorName).operation(operationName).targetEntity(TARGET_ENTITY).payload(makePayload())
- .build();
- }
-
- protected Map<String, String> makePayload() {
- return null;
+ @Override
+ public void setUpBasic() {
+ super.setUpBasic();
+ topicServer = makeServer(serverSink, serverSource);
+ initConfig();
}
/**
- * Initializes an operator so that it is "alive" and has the given names.
+ * Finish all topic servers and mocks.
*/
- protected void initOperator() {
- when(operator.isAlive()).thenReturn(true);
- when(operator.getFullName()).thenReturn(actorName + "." + operationName);
- when(operator.getActorName()).thenReturn(actorName);
- when(operator.getName()).thenReturn(operationName);
- when(operator.getTopicHandler()).thenReturn(topicHandler);
- when(operator.getForwarder()).thenReturn(forwarder);
- when(operator.getParams()).thenReturn(topicParams);
+ public void tearDownBasic() {
+ topicServer.shutdown();
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
}
/**
- * Makes enrichment data.
+ * Makes a simulator for the given sink and source.
*
- * @return enrichment data
+ * @param sink topic to which the simulator should publish responses
+ * @param source topic from which the simulator should receive messages
+ * @return a new topic server/simulator
+ */
+ protected abstract TopicServer<Q> makeServer(TopicSink sink, TopicSource source);
+
+ /**
+ * Initializes a configuration.
*/
- protected Map<String, String> makeEnrichment() {
- return new TreeMap<>();
+ protected void initConfig() {
+ lenient().when(config.getTopicHandler()).thenReturn(topicHandler);
+ lenient().when(config.getForwarder()).thenReturn(forwarder);
+ lenient().when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
}
/**