72ca84a54befd82da1593dbe2e8a7ec9c60c5b40
[ccsdk/oran.git] /
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2020 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import com.google.gson.Gson;
35 import com.google.gson.GsonBuilder;
36 import com.google.gson.JsonObject;
37
38 import java.util.ArrayList;
39
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.Test;
42 import org.junit.jupiter.api.extension.ExtendWith;
43 import org.mockito.Mock;
44 import org.mockito.junit.jupiter.MockitoExtension;
45 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
46 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
47 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
48 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
49 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
50
51 import reactor.core.publisher.Flux;
52 import reactor.core.publisher.Mono;
53
54 @ExtendWith(MockitoExtension.class)
55 class DmaapMessageConsumerTest {
56     @Mock
57     private ApplicationConfig applicationConfigMock;
58     @Mock
59     private AsyncRestClient messageRouterConsumerMock;
60     @Mock
61     private DmaapMessageHandler messageHandlerMock;
62
63     private DmaapMessageConsumer messageConsumerUnderTest;
64
65     private Gson gson = new GsonBuilder().create();
66
67     @AfterEach
68     void resetLogging() {
69         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
70     }
71
72     private void setTaskNumberOfLoops(int number) {
73         ArrayList<Integer> l = new ArrayList<>();
74         for (int i = 0; i < number; ++i) {
75             l.add(i);
76         }
77         Flux<Integer> f = Flux.fromIterable(l);
78         doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
79     }
80
81     private void disableTaskDelay() {
82         doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
83     }
84
85     @Test
86     void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
87         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
88
89         setTaskNumberOfLoops(3);
90         disableTaskDelay();
91
92         when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
93         doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
94         doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
95                 .getFromMessageRouter(anyString());
96
97         doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
98
99         String s = messageConsumerUnderTest.createTask().blockLast();
100         assertEquals("responseFromHandler", s);
101         verify(messageConsumerUnderTest, times(2)).delay();
102         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
103     }
104
105     @Test
106     void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
107         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
108
109         setTaskNumberOfLoops(2);
110         disableTaskDelay();
111         setUpMrConfig();
112
113         {
114             Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
115             Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
116             doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
117         }
118
119         doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
120
121         String s = messageConsumerUnderTest.createTask().blockLast();
122
123         verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
124         verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
125         verify(messageConsumerUnderTest, times(1)).delay();
126         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
127         assertEquals("response1", s);
128     }
129
130     @Test
131     void unParsableMessage_thenSendResponseAndContinue() throws Exception {
132         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
133         setTaskNumberOfLoops(2);
134         setUpMrConfig();
135
136         {
137             Mono<String> dmaapError = Mono.just("Non valid JSON \"");
138             Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
139             doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
140         }
141
142         doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
143
144         String s = messageConsumerUnderTest.createTask().blockLast();
145         assertEquals("response1", s);
146
147         verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
148         verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
149         verify(messageConsumerUnderTest, times(0)).delay();
150         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
151     }
152
153     private String dmaapRequestMessageString() {
154         String json = gson.toJson(dmaapRequestMessage());
155         return jsonArray(json);
156     }
157
158     @Test
159     void testMessageParsing() throws ServiceException {
160         messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock);
161         String json = gson.toJson(dmaapRequestMessage());
162         {
163             String jsonArrayOfObject = jsonArray(json);
164             DmaapRequestMessage parsedMessage =
165                     messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
166             assertNotNull(parsedMessage);
167             assertTrue(parsedMessage.payload().isPresent());
168         }
169         {
170             String jsonArrayOfString = jsonArray(quote(json));
171             DmaapRequestMessage parsedMessage =
172                     messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
173             assertNotNull(parsedMessage);
174             assertTrue(parsedMessage.payload().isPresent());
175         }
176
177     }
178
179     private void setUpMrConfig() {
180         when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
181         when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
182     }
183
184     private String jsonArray(String s) {
185         return "[" + s + "]";
186     }
187
188     private String quote(String s) {
189         return "\"" + s.replace("\"", "\\\"") + "\"";
190     }
191
192     private DmaapRequestMessage dmaapRequestMessage() {
193         return ImmutableDmaapRequestMessage.builder() //
194                 .apiVersion("apiVersion") //
195                 .correlationId("correlationId") //
196                 .operation(Operation.PUT) //
197                 .originatorId("originatorId") //
198                 .payload(new JsonObject()) //
199                 .requestId("requestId") //
200                 .target("target") //
201                 .timestamp("timestamp") //
202                 .url("URL") //
203                 .build();
204     }
205
206 }