2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
33 import com.google.gson.Gson;
34 import com.google.gson.GsonBuilder;
35 import com.google.gson.JsonObject;
37 import java.util.ArrayList;
39 import org.junit.Assert;
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.Test;
42 import org.junit.jupiter.api.extension.ExtendWith;
43 import org.mockito.Mock;
44 import org.mockito.junit.jupiter.MockitoExtension;
45 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
46 import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
47 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
48 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
49 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
50 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
52 import reactor.core.publisher.Flux;
53 import reactor.core.publisher.Mono;
55 @ExtendWith(MockitoExtension.class)
56 class DmaapMessageConsumerTest {
58 private ApplicationConfig applicationConfigMock;
60 private AsyncRestClient messageRouterConsumerMock;
62 private DmaapMessageHandler messageHandlerMock;
64 private DmaapMessageConsumer messageConsumerUnderTest;
66 private Gson gson = new GsonBuilder().create();
70 LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
73 private void setTaskNumberOfLoops(int number) {
74 ArrayList<Integer> l = new ArrayList<>();
75 for (int i = 0; i < number; ++i) {
78 Flux<Integer> f = Flux.fromIterable(l);
79 doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
82 private void disableTaskDelay() {
83 doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
87 void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
88 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
90 setTaskNumberOfLoops(3);
93 when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
94 doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
95 doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
96 .getFromMessageRouter(anyString());
98 doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
100 String s = messageConsumerUnderTest.createTask().blockLast();
101 assertEquals("responseFromHandler", s);
102 verify(messageConsumerUnderTest, times(2)).delay();
103 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(any());
107 void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
108 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
110 setTaskNumberOfLoops(2);
115 Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
116 Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
117 doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
120 doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
122 String s = messageConsumerUnderTest.createTask().blockLast();
124 verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
125 verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
126 verify(messageConsumerUnderTest, times(1)).delay();
127 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(any());
128 assertEquals("response1", s);
132 void unParsableMessage_thenSendResponseAndContinue() throws Exception {
133 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
134 setTaskNumberOfLoops(2);
138 Mono<String> dmaapError = Mono.just("Non valid JSON \"");
139 Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
140 doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
143 doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
145 String s = messageConsumerUnderTest.createTask().blockLast();
146 assertEquals("response1", s);
148 verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
149 verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
150 verify(messageConsumerUnderTest, times(0)).delay();
151 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
154 private String dmaapRequestMessageString() {
155 String json = gson.toJson(dmaapRequestMessage());
156 return jsonArray(json);
160 void testMessageParsing() throws ServiceException {
161 messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock, new SecurityContext(""));
162 String json = gson.toJson(dmaapRequestMessage());
164 String jsonArrayOfObject = jsonArray(json);
165 DmaapRequestMessage parsedMessage =
166 messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
167 assertNotNull(parsedMessage);
168 assertNotNull(parsedMessage.getPayload());
170 Assert.assertEquals(dmaapRequestMessage(), parsedMessage);
173 String jsonArrayOfString = jsonArray(quote(json));
174 DmaapRequestMessage parsedMessage =
175 messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
176 assertNotNull(parsedMessage);
177 assertNotNull(parsedMessage.getPayload());
178 Assert.assertEquals(dmaapRequestMessage(), parsedMessage);
183 private void setUpMrConfig() {
184 when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
185 when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
188 private String jsonArray(String s) {
189 return "[" + s + "]";
192 private String quote(String s) {
193 return "\"" + s.replace("\"", "\\\"") + "\"";
196 private DmaapRequestMessage dmaapRequestMessage() {
197 return DmaapRequestMessage.builder() //
198 .apiVersion("apiVersion") //
199 .correlationId("correlationId") //
200 .operation(Operation.PUT) //
201 .originatorId("originatorId") //
202 .payload(new JsonObject()) //
203 .requestId("requestId") //
205 .timestamp("timestamp") //