2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.common.rules.test;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertNotNull;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.when;
30 import java.io.IOException;
31 import java.nio.file.Files;
32 import java.nio.file.Paths;
33 import java.util.List;
34 import java.util.concurrent.TimeUnit;
35 import lombok.ToString;
36 import org.junit.AfterClass;
37 import org.junit.Before;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.junit.runner.RunWith;
41 import org.mockito.Mock;
42 import org.mockito.junit.MockitoJUnitRunner;
43 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
44 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
45 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
46 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
47 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
48 import org.onap.policy.common.endpoints.parameters.TopicParameters;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.drools.controller.DroolsController;
52 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
53 import org.onap.policy.drools.system.PolicyController;
54 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
56 @RunWith(MockitoJUnitRunner.class)
57 public class TopicsTest {
58 private static final String EXPECTED_EXCEPTION = "expected exception";
59 private static final String MY_SOURCE_TOPIC = "my-source-topic";
60 private static final String MY_SINK_TOPIC = "my-sink-topic";
61 private static final String MY_GROUP = "my-group";
62 private static final String MY_ARTIFACT = "my-artifact";
63 private static final String MESSAGE = "{\"text\": \"hello\"}";
64 private static final String TEXT = "hello";
65 private static final String INJECT_FILE = "src/test/resources/topics.json";
66 private static final String POLICY_NAME = "my-policy";
69 private DroolsController drools;
71 private PolicyController controller;
73 private EventProtocolCoder protocolCoder;
75 private NoopTopicSink sink;
77 private NoopTopicSource source;
79 private TopicEndpoint mgr;
81 private ToscaPolicy policy;
83 private Topics topics;
89 public static void setUpBeforeClass() {
90 TopicEndpointManager.getManager().shutdown();
92 TopicParameters params = new TopicParameters();
93 params.setTopic(MY_SOURCE_TOPIC);
94 params.setManaged(true);
95 params.setTopicCommInfrastructure("NOOP");
96 TopicEndpointManager.getManager().addTopicSources(List.of(params));
100 public static void tearDownAfterClass() {
101 TopicEndpointManager.getManager().shutdown();
108 public void setUp() {
109 policy = new ToscaPolicy();
110 policy.setName(POLICY_NAME);
111 policy.setVersion("1.0.0");
113 when(drools.getGroupId()).thenReturn(MY_GROUP);
114 when(drools.getArtifactId()).thenReturn(MY_ARTIFACT);
116 when(controller.getDrools()).thenReturn(drools);
118 when(protocolCoder.decode(MY_GROUP, MY_ARTIFACT, MY_SINK_TOPIC, MESSAGE)).thenReturn(TEXT);
120 when(mgr.getNoopTopicSink(MY_SINK_TOPIC)).thenReturn(sink);
121 when(mgr.getNoopTopicSource(MY_SOURCE_TOPIC)).thenReturn(source);
123 topics = new Topics() {
125 protected TopicEndpoint getTopicManager() {
130 protected EventProtocolCoder getProtocolCoder() {
131 return protocolCoder;
137 public void testDestroy() {
138 Listener<String> listener1 = topics.createListener(MY_SINK_TOPIC, msg -> msg);
139 Listener<String> listener2 = topics.createListener(MY_SINK_TOPIC, msg -> msg + "a suffix");
143 verify(sink).unregister(listener1);
144 verify(sink).unregister(listener2);
148 public void testInjectStringFile() throws IOException {
149 topics.inject(MY_SOURCE_TOPIC, INJECT_FILE);
151 // nothing should have been replaced
152 String expected = new String(Files.readAllBytes(Paths.get(INJECT_FILE)));
153 verify(source).offer(expected);
157 public void testInjectStringFileString() throws IOException {
158 topics.inject(MY_SOURCE_TOPIC, INJECT_FILE, "hello");
160 // text should have been replaced with "hello"
161 String expected = new String(Files.readAllBytes(Paths.get("src", "test", "resources", "topicsReplaced.json")));
162 verify(source).offer(expected);
164 // exception reading file
165 assertThatThrownBy(() -> topics.inject(MY_SOURCE_TOPIC, "missing-file.json", "some text"))
166 .isInstanceOf(TopicException.class);
170 public void testCreateListenerStringClassOfTPolicyController() {
171 Listener<String> listener = topics.createListener(MY_SINK_TOPIC, String.class, controller);
172 listener.onTopicEvent(CommInfrastructure.NOOP, MY_SINK_TOPIC, MESSAGE);
174 assertEquals(TEXT, listener.await());
178 public void testCreateListenerStringClassOfTCoder() {
179 Listener<Data> listener = topics.createListener(MY_SINK_TOPIC, Data.class, new StandardCoder());
180 listener.onTopicEvent(CommInfrastructure.NOOP, MY_SINK_TOPIC, MESSAGE);
182 Data expected = new Data();
183 expected.text = TEXT;
184 assertEquals(expected.toString(), listener.await().toString());
188 * Tests createListener() when the coder throws an exception.
191 public void testCreateListenerStringClassOfTCoderException() {
192 StandardCoder coder = new StandardCoder() {
194 public <T> T decode(String arg0, Class<T> arg1) throws CoderException {
195 throw new CoderException(EXPECTED_EXCEPTION);
199 Listener<Data> listener = topics.createListener(MY_SINK_TOPIC, Data.class, coder);
201 // onTopicEvent() should not throw an exception
202 assertThatCode(() -> listener.onTopicEvent(CommInfrastructure.NOOP, MY_SINK_TOPIC, MESSAGE))
203 .doesNotThrowAnyException();
205 // should not have queued a message
206 assertThatThrownBy(() -> listener.await(0, TimeUnit.MILLISECONDS)).isInstanceOf(TopicException.class);
210 public void testCreateListenerStringFunctionOfStringT() {
211 Listener<String> listener = topics.createListener(MY_SINK_TOPIC, msg -> msg);
212 listener.onTopicEvent(CommInfrastructure.NOOP, MY_SINK_TOPIC, MESSAGE);
214 assertEquals(MESSAGE, listener.await());
218 public void testGetTopicManager_testGetProtocolCoder() {
219 // use a topic with a real manager
220 topics = new Topics();
222 assertNotNull(topics.getTopicManager());
223 assertNotNull(topics.getProtocolCoder());
227 private static class Data {