2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import com.google.gson.Gson;
35 import com.google.gson.GsonBuilder;
36 import com.google.gson.JsonObject;
38 import java.util.ArrayList;
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.Test;
42 import org.junit.jupiter.api.extension.ExtendWith;
43 import org.mockito.Mock;
44 import org.mockito.junit.jupiter.MockitoExtension;
45 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
46 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
47 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
48 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
49 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
51 import reactor.core.publisher.Flux;
52 import reactor.core.publisher.Mono;
54 @ExtendWith(MockitoExtension.class)
55 class DmaapMessageConsumerTest {
57 private ApplicationConfig applicationConfigMock;
59 private AsyncRestClient messageRouterConsumerMock;
61 private DmaapMessageHandler messageHandlerMock;
63 private DmaapMessageConsumer messageConsumerUnderTest;
65 private Gson gson = new GsonBuilder().create();
69 LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
72 private void setTaskNumberOfLoops(int number) {
73 ArrayList<Integer> l = new ArrayList<>();
74 for (int i = 0; i < number; ++i) {
77 Flux<Integer> f = Flux.fromIterable(l);
78 doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
81 private void disableTaskDelay() {
82 doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
86 void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
87 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
89 setTaskNumberOfLoops(3);
92 when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
93 doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
94 doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
95 .getFromMessageRouter(anyString());
97 doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
99 String s = messageConsumerUnderTest.createTask().blockLast();
100 assertEquals("responseFromHandler", s);
101 verify(messageConsumerUnderTest, times(2)).delay();
102 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
106 void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
107 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
109 setTaskNumberOfLoops(2);
114 Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
115 Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
116 doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
119 doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
121 String s = messageConsumerUnderTest.createTask().blockLast();
123 verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
124 verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
125 verify(messageConsumerUnderTest, times(1)).delay();
126 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
127 assertEquals("response1", s);
131 void unParsableMessage_thenSendResponseAndContinue() throws Exception {
132 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
133 setTaskNumberOfLoops(2);
137 Mono<String> dmaapError = Mono.just("Non valid JSON \"");
138 Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
139 doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
142 doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
144 String s = messageConsumerUnderTest.createTask().blockLast();
145 assertEquals("response1", s);
147 verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
148 verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
149 verify(messageConsumerUnderTest, times(0)).delay();
150 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
153 private String dmaapRequestMessageString() {
154 String json = gson.toJson(dmaapRequestMessage());
155 return jsonArray(json);
159 void testMessageParsing() throws ServiceException {
160 messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock);
161 String json = gson.toJson(dmaapRequestMessage());
163 String jsonArrayOfObject = jsonArray(json);
164 DmaapRequestMessage parsedMessage =
165 messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
166 assertNotNull(parsedMessage);
167 assertTrue(parsedMessage.payload().isPresent());
170 String jsonArrayOfString = jsonArray(quote(json));
171 DmaapRequestMessage parsedMessage =
172 messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
173 assertNotNull(parsedMessage);
174 assertTrue(parsedMessage.payload().isPresent());
179 private void setUpMrConfig() {
180 when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
181 when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
184 private String jsonArray(String s) {
185 return "[" + s + "]";
188 private String quote(String s) {
189 return "\"" + s.replace("\"", "\\\"") + "\"";
192 private DmaapRequestMessage dmaapRequestMessage() {
193 return ImmutableDmaapRequestMessage.builder() //
194 .apiVersion("apiVersion") //
195 .correlationId("correlationId") //
196 .operation(Operation.PUT) //
197 .originatorId("originatorId") //
198 .payload(new JsonObject()) //
199 .requestId("requestId") //
201 .timestamp("timestamp") //