2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 
  24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
  25 import static org.junit.Assert.assertEquals;
 
  26 import static org.junit.Assert.assertNotNull;
 
  27 import static org.junit.Assert.assertNotSame;
 
  28 import static org.junit.Assert.assertNull;
 
  29 import static org.junit.Assert.assertSame;
 
  30 import static org.mockito.Mockito.never;
 
  31 import static org.mockito.Mockito.verify;
 
  34 import java.util.Properties;
 
  35 import java.util.TreeMap;
 
  36 import java.util.function.Function;
 
  37 import org.junit.AfterClass;
 
  38 import org.junit.Before;
 
  39 import org.junit.BeforeClass;
 
  40 import org.junit.Test;
 
  41 import org.mockito.Mock;
 
  42 import org.mockito.MockitoAnnotations;
 
  43 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 
  44 import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
 
  45 import org.onap.policy.controlloop.actorserviceprovider.Util;
 
  46 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicActorParams;
 
  47 import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
 
  48 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
 
  50 public class BidirectionalTopicActorTest {
 
  52     private static final String ACTOR = "my-actor";
 
  53     private static final String UNKNOWN = "unknown";
 
  54     private static final String MY_SINK = "my-sink";
 
  55     private static final String MY_SOURCE1 = "my-source-A";
 
  56     private static final String MY_SOURCE2 = "my-source-B";
 
  57     private static final int TIMEOUT = 10;
 
  60     private BidirectionalTopicHandler handler1;
 
  62     private BidirectionalTopicHandler handler2;
 
  64     private BidirectionalTopicActor actor;
 
  68      * Configures the endpoints.
 
  71     public static void setUpBeforeClass() {
 
  72         Properties props = new Properties();
 
  73         props.setProperty("noop.sink.topics", MY_SINK);
 
  74         props.setProperty("noop.source.topics", MY_SOURCE1 + "," + MY_SOURCE2);
 
  76         // clear all topics and then configure one sink and two sources
 
  77         TopicEndpointManager.getManager().shutdown();
 
  78         TopicEndpointManager.getManager().addTopicSinks(props);
 
  79         TopicEndpointManager.getManager().addTopicSources(props);
 
  83     public static void tearDownAfterClass() {
 
  84         // clear all topics after the tests
 
  85         TopicEndpointManager.getManager().shutdown();
 
  93         MockitoAnnotations.initMocks(this);
 
  95         actor = new MyActor();
 
  96         actor.configure(Util.translateToMap(ACTOR, makeParams()));
 
 100     public void testDoStart() throws BidirectionalTopicClientException {
 
 101         // allocate some handlers
 
 102         actor.getTopicHandler(MY_SINK, MY_SOURCE1);
 
 103         actor.getTopicHandler(MY_SINK, MY_SOURCE2);
 
 108         verify(handler1).start();
 
 109         verify(handler2).start();
 
 111         verify(handler1, never()).stop();
 
 112         verify(handler2, never()).stop();
 
 114         verify(handler1, never()).shutdown();
 
 115         verify(handler2, never()).shutdown();
 
 119     public void testDoStop() throws BidirectionalTopicClientException {
 
 120         // allocate some handlers
 
 121         actor.getTopicHandler(MY_SINK, MY_SOURCE1);
 
 122         actor.getTopicHandler(MY_SINK, MY_SOURCE2);
 
 130         verify(handler1).stop();
 
 131         verify(handler2).stop();
 
 133         verify(handler1, never()).shutdown();
 
 134         verify(handler2, never()).shutdown();
 
 138     public void testDoShutdown() {
 
 139         // allocate some handlers
 
 140         actor.getTopicHandler(MY_SINK, MY_SOURCE1);
 
 141         actor.getTopicHandler(MY_SINK, MY_SOURCE2);
 
 149         verify(handler1).shutdown();
 
 150         verify(handler2).shutdown();
 
 152         verify(handler1, never()).stop();
 
 153         verify(handler2, never()).stop();
 
 157     public void testMakeOperatorParameters() {
 
 158         BidirectionalTopicActorParams params = makeParams();
 
 160         final BidirectionalTopicActor prov = new BidirectionalTopicActor(ACTOR);
 
 161         Function<String, Map<String, Object>> maker =
 
 162                         prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params));
 
 164         assertNull(maker.apply(UNKNOWN));
 
 166         // use a TreeMap to ensure the properties are sorted
 
 167         assertEquals("{sinkTopic=my-sink, sourceTopic=my-source-A, timeoutSec=10}",
 
 168                         new TreeMap<>(maker.apply("operA")).toString());
 
 170         assertEquals("{sinkTopic=my-sink, sourceTopic=topicB, timeoutSec=10}",
 
 171                         new TreeMap<>(maker.apply("operB")).toString());
 
 173         // with invalid actor parameters
 
 174         params.setOperation(null);
 
 175         assertThatThrownBy(() -> prov.makeOperatorParameters(Util.translateToMap(prov.getName(), params)))
 
 176                         .isInstanceOf(ParameterValidationRuntimeException.class);
 
 180     public void testBidirectionalTopicActor() {
 
 181         assertEquals(ACTOR, actor.getName());
 
 182         assertEquals(ACTOR, actor.getFullName());
 
 186     public void testGetTopicHandler() {
 
 187         assertSame(handler1, actor.getTopicHandler(MY_SINK, MY_SOURCE1));
 
 188         assertSame(handler2, actor.getTopicHandler(MY_SINK, MY_SOURCE2));
 
 190         assertThatIllegalArgumentException().isThrownBy(() -> actor.getTopicHandler(UNKNOWN, MY_SOURCE1));
 
 194     public void testMakeTopicHandler() {
 
 196         actor = new BidirectionalTopicActor(ACTOR);
 
 198         handler1 = actor.getTopicHandler(MY_SINK, MY_SOURCE1);
 
 199         handler2 = actor.getTopicHandler(MY_SINK, MY_SOURCE2);
 
 201         assertNotNull(handler1);
 
 202         assertNotNull(handler2);
 
 203         assertNotSame(handler1, handler2);
 
 207     private BidirectionalTopicActorParams makeParams() {
 
 208         BidirectionalTopicActorParams params = new BidirectionalTopicActorParams();
 
 209         params.setSinkTopic(MY_SINK);
 
 210         params.setSourceTopic(MY_SOURCE1);
 
 211         params.setTimeoutSec(TIMEOUT);
 
 214         params.setOperation(Map.of(
 
 216                         "operB", Map.of("sourceTopic", "topicB")));
 
 221     private class MyActor extends BidirectionalTopicActor {
 
 228         protected BidirectionalTopicHandler makeTopicHandler(String sinkTopic, String sourceTopic)
 
 229                         throws BidirectionalTopicClientException {
 
 231             if (MY_SINK.equals(sinkTopic)) {
 
 232                 if (MY_SOURCE1.equals(sourceTopic)) {
 
 234                 } else if (MY_SOURCE2.equals(sourceTopic)) {
 
 239             throw new BidirectionalTopicClientException("no topic " + sinkTopic + "/" + sourceTopic);