2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.common.rules.test;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertNotNull;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.when;
32 import java.io.IOException;
33 import java.nio.file.Files;
34 import java.nio.file.Paths;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37 import lombok.ToString;
38 import org.junit.jupiter.api.AfterAll;
39 import org.junit.jupiter.api.BeforeAll;
40 import org.junit.jupiter.api.BeforeEach;
41 import org.junit.jupiter.api.Test;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
44 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
45 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
46 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
47 import org.onap.policy.common.endpoints.parameters.TopicParameters;
48 import org.onap.policy.common.utils.coder.CoderException;
49 import org.onap.policy.common.utils.coder.StandardCoder;
50 import org.onap.policy.drools.controller.DroolsController;
51 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
52 import org.onap.policy.drools.system.PolicyController;
53 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
56 private static final String EXPECTED_EXCEPTION = "expected exception";
57 private static final String MY_SOURCE_TOPIC = "my-source-topic";
58 private static final String MY_SINK_TOPIC = "my-sink-topic";
59 private static final String MY_GROUP = "my-group";
60 private static final String MY_ARTIFACT = "my-artifact";
61 private static final String MESSAGE = "{\"text\": \"hello\"}";
62 private static final String TEXT = "hello";
63 private static final String INJECT_FILE = "src/test/resources/topics.json";
64 private static final String POLICY_NAME = "my-policy";
66 private final DroolsController drools = mock(DroolsController.class);
67 private final PolicyController controller = mock(PolicyController.class);
68 private final EventProtocolCoder protocolCoder = mock(EventProtocolCoder.class);
69 private final NoopTopicSink sink = mock(NoopTopicSink.class);
70 private final NoopTopicSource source = mock(NoopTopicSource.class);
71 private final TopicEndpoint mgr = mock(TopicEndpoint.class);
73 private ToscaPolicy policy;
75 private Topics topics;
81 static void setUpBeforeClass() {
82 TopicEndpointManager.getManager().shutdown();
84 var params = new TopicParameters();
85 params.setTopic(MY_SOURCE_TOPIC);
86 params.setManaged(true);
87 params.setTopicCommInfrastructure("NOOP");
88 TopicEndpointManager.getManager().addTopicSources(List.of(params));
92 public static void tearDownAfterClass() {
93 TopicEndpointManager.getManager().shutdown();
100 public void setUp() {
101 policy = new ToscaPolicy();
102 policy.setName(POLICY_NAME);
103 policy.setVersion("1.0.0");
105 when(drools.getGroupId()).thenReturn(MY_GROUP);
106 when(drools.getArtifactId()).thenReturn(MY_ARTIFACT);
108 when(controller.getDrools()).thenReturn(drools);
110 when(protocolCoder.decode(MY_GROUP, MY_ARTIFACT, MY_SINK_TOPIC, MESSAGE)).thenReturn(TEXT);
112 when(mgr.getNoopTopicSink(MY_SINK_TOPIC)).thenReturn(sink);
113 when(mgr.getNoopTopicSource(MY_SOURCE_TOPIC)).thenReturn(source);
115 topics = new Topics() {
117 protected TopicEndpoint getTopicManager() {
122 protected EventProtocolCoder getProtocolCoder() {
123 return protocolCoder;
130 var listener1 = topics.createListener(MY_SINK_TOPIC, msg -> msg);
131 var listener2 = topics.createListener(MY_SINK_TOPIC, msg -> msg + "a suffix");
135 verify(sink).unregister(listener1);
136 verify(sink).unregister(listener2);
140 void testInjectStringFile() throws IOException {
141 topics.inject(MY_SOURCE_TOPIC, INJECT_FILE);
143 // nothing should have been replaced
144 var expected = new String(Files.readAllBytes(Paths.get(INJECT_FILE)));
145 verify(source).offer(expected);
149 void testInjectStringFileString() throws IOException {
150 topics.inject(MY_SOURCE_TOPIC, INJECT_FILE, "hello");
152 // text should have been replaced with "hello"
153 var expected = new String(Files.readAllBytes(Paths.get("src", "test", "resources", "topicsReplaced.json")));
154 verify(source).offer(expected);
156 // exception reading file
157 assertThatThrownBy(() -> topics.inject(MY_SOURCE_TOPIC, "missing-file.json", "some text"))
158 .isInstanceOf(TopicException.class);
162 void testCreateListenerStringClassOfTPolicyController() {
163 var listener = topics.createListener(MY_SINK_TOPIC, String.class, controller);
164 listener.onTopicEvent(CommInfrastructure.NOOP, MY_SINK_TOPIC, MESSAGE);
166 assertEquals(TEXT, listener.await());
170 void testCreateListenerStringClassOfTCoder() {
171 var listener = topics.createListener(MY_SINK_TOPIC, Data.class, new StandardCoder());
172 listener.onTopicEvent(CommInfrastructure.NOOP, MY_SINK_TOPIC, MESSAGE);
174 var expected = new Data();
175 expected.text = TEXT;
176 assertEquals(expected.toString(), listener.await().toString());
180 * Tests createListener() when the coder throws an exception.
183 void testCreateListenerStringClassOfTCoderException() {
184 var coder = new StandardCoder() {
186 public <T> T decode(String arg0, Class<T> arg1) throws CoderException {
187 throw new CoderException(EXPECTED_EXCEPTION);
191 var listener = topics.createListener(MY_SINK_TOPIC, Data.class, coder);
193 // onTopicEvent() should not throw an exception
194 assertThatCode(() -> listener.onTopicEvent(CommInfrastructure.NOOP, MY_SINK_TOPIC, MESSAGE))
195 .doesNotThrowAnyException();
197 // should not have queued a message
198 assertThatThrownBy(() -> listener.await(0, TimeUnit.MILLISECONDS)).isInstanceOf(TopicException.class);
202 void testCreateListenerStringFunctionOfStringT() {
203 var listener = topics.createListener(MY_SINK_TOPIC, msg -> msg);
204 listener.onTopicEvent(CommInfrastructure.NOOP, MY_SINK_TOPIC, MESSAGE);
206 assertEquals(MESSAGE, listener.await());
210 void testGetTopicManager_testGetProtocolCoder() {
211 // use a topic with a real manager
212 topics = new Topics();
214 assertNotNull(topics.getTopicManager());
215 assertNotNull(topics.getProtocolCoder());
219 private static class Data {