2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
33 import com.google.gson.Gson;
34 import com.google.gson.GsonBuilder;
35 import com.google.gson.JsonObject;
37 import java.util.ArrayList;
39 import org.junit.Assert;
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.DisplayName;
42 import org.junit.jupiter.api.Test;
43 import org.junit.jupiter.api.extension.ExtendWith;
44 import org.mockito.Mock;
45 import org.mockito.junit.jupiter.MockitoExtension;
46 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
47 import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
48 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
49 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
50 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
51 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
56 @ExtendWith(MockitoExtension.class)
57 class DmaapMessageConsumerTest {
59 private ApplicationConfig applicationConfigMock;
61 private AsyncRestClient messageRouterConsumerMock;
63 private DmaapMessageHandler messageHandlerMock;
65 private DmaapMessageConsumer messageConsumerUnderTest;
67 private Gson gson = new GsonBuilder().create();
71 LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
74 private void setTaskNumberOfLoops(int number) {
75 ArrayList<Integer> l = new ArrayList<>();
76 for (int i = 0; i < number; ++i) {
79 Flux<Integer> f = Flux.fromIterable(l);
80 doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
83 private void disableTaskDelay() {
84 doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
88 @DisplayName("successful Case dmaap Not Configured then Sleep And Retry Until Config")
89 void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
90 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
92 setTaskNumberOfLoops(3);
95 when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
96 doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
97 doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
98 .getFromMessageRouter(anyString());
100 doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
102 String s = messageConsumerUnderTest.createTask().blockLast();
103 assertEquals("responseFromHandler", s);
104 verify(messageConsumerUnderTest, times(2)).delay();
105 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(any());
109 @DisplayName("return Error From Dmapp then Sleep And Retry")
110 void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
111 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
113 setTaskNumberOfLoops(2);
118 Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
119 Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
120 doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
123 doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
125 String s = messageConsumerUnderTest.createTask().blockLast();
127 verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
128 verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
129 verify(messageConsumerUnderTest, times(1)).delay();
130 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(any());
131 assertEquals("response1", s);
135 @DisplayName("unParsable Message then Send Response And Continue")
136 void unParsableMessage_thenSendResponseAndContinue() throws Exception {
137 messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
138 setTaskNumberOfLoops(2);
142 Mono<String> dmaapError = Mono.just("Non valid JSON \"");
143 Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
144 doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
147 doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
149 String s = messageConsumerUnderTest.createTask().blockLast();
150 assertEquals("response1", s);
152 verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
153 verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
154 verify(messageConsumerUnderTest, times(0)).delay();
155 verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
158 private String dmaapRequestMessageString() {
159 String json = gson.toJson(dmaapRequestMessage());
160 return jsonArray(json);
164 @DisplayName("test Message Parsing")
165 void testMessageParsing() throws ServiceException {
166 messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock, new SecurityContext(""));
167 String json = gson.toJson(dmaapRequestMessage());
169 String jsonArrayOfObject = jsonArray(json);
170 DmaapRequestMessage parsedMessage =
171 messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
172 assertNotNull(parsedMessage);
173 assertNotNull(parsedMessage.getPayload());
175 Assert.assertEquals(dmaapRequestMessage(), parsedMessage);
178 String jsonArrayOfString = jsonArray(quote(json));
179 DmaapRequestMessage parsedMessage =
180 messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
181 assertNotNull(parsedMessage);
182 assertNotNull(parsedMessage.getPayload());
183 Assert.assertEquals(dmaapRequestMessage(), parsedMessage);
188 private void setUpMrConfig() {
189 when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
190 when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
193 private String jsonArray(String s) {
194 return "[" + s + "]";
197 private String quote(String s) {
198 return "\"" + s.replace("\"", "\\\"") + "\"";
201 private DmaapRequestMessage dmaapRequestMessage() {
202 return DmaapRequestMessage.builder() //
203 .apiVersion("apiVersion") //
204 .correlationId("correlationId") //
205 .operation(Operation.PUT) //
206 .originatorId("originatorId") //
207 .payload(new JsonObject()) //
208 .requestId("requestId") //
210 .timestamp("timestamp") //