084912071f6ac8b9428135aca40535d96185cd54
[ccsdk/oran.git] /
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2020 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32
33 import com.google.gson.Gson;
34 import com.google.gson.GsonBuilder;
35 import com.google.gson.JsonObject;
36
37 import java.util.ArrayList;
38
39 import org.junit.Assert;
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.Test;
42 import org.junit.jupiter.api.extension.ExtendWith;
43 import org.mockito.Mock;
44 import org.mockito.junit.jupiter.MockitoExtension;
45 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
46 import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
47 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
48 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
49 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
50 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
51
52 import reactor.core.publisher.Flux;
53 import reactor.core.publisher.Mono;
54
55 @ExtendWith(MockitoExtension.class)
56 class DmaapMessageConsumerTest {
57     @Mock
58     private ApplicationConfig applicationConfigMock;
59     @Mock
60     private AsyncRestClient messageRouterConsumerMock;
61     @Mock
62     private DmaapMessageHandler messageHandlerMock;
63
64     private DmaapMessageConsumer messageConsumerUnderTest;
65
66     private Gson gson = new GsonBuilder().create();
67
68     @AfterEach
69     void resetLogging() {
70         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
71     }
72
73     private void setTaskNumberOfLoops(int number) {
74         ArrayList<Integer> l = new ArrayList<>();
75         for (int i = 0; i < number; ++i) {
76             l.add(i);
77         }
78         Flux<Integer> f = Flux.fromIterable(l);
79         doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
80     }
81
82     private void disableTaskDelay() {
83         doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
84     }
85
86     @Test
87     void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
88         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
89
90         setTaskNumberOfLoops(3);
91         disableTaskDelay();
92
93         when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
94         doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
95         doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
96                 .getFromMessageRouter(anyString());
97
98         doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
99
100         String s = messageConsumerUnderTest.createTask().blockLast();
101         assertEquals("responseFromHandler", s);
102         verify(messageConsumerUnderTest, times(2)).delay();
103         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(any());
104     }
105
106     @Test
107     void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
108         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
109
110         setTaskNumberOfLoops(2);
111         disableTaskDelay();
112         setUpMrConfig();
113
114         {
115             Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
116             Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
117             doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
118         }
119
120         doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
121
122         String s = messageConsumerUnderTest.createTask().blockLast();
123
124         verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
125         verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
126         verify(messageConsumerUnderTest, times(1)).delay();
127         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(any());
128         assertEquals("response1", s);
129     }
130
131     @Test
132     void unParsableMessage_thenSendResponseAndContinue() throws Exception {
133         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
134         setTaskNumberOfLoops(2);
135         setUpMrConfig();
136
137         {
138             Mono<String> dmaapError = Mono.just("Non valid JSON \"");
139             Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
140             doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
141         }
142
143         doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
144
145         String s = messageConsumerUnderTest.createTask().blockLast();
146         assertEquals("response1", s);
147
148         verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
149         verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
150         verify(messageConsumerUnderTest, times(0)).delay();
151         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
152     }
153
154     private String dmaapRequestMessageString() {
155         String json = gson.toJson(dmaapRequestMessage());
156         return jsonArray(json);
157     }
158
159     @Test
160     void testMessageParsing() throws ServiceException {
161         messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock, new SecurityContext(""));
162         String json = gson.toJson(dmaapRequestMessage());
163         {
164             String jsonArrayOfObject = jsonArray(json);
165             DmaapRequestMessage parsedMessage =
166                     messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
167             assertNotNull(parsedMessage);
168             assertNotNull(parsedMessage.getPayload());
169
170             Assert.assertEquals(dmaapRequestMessage(), parsedMessage);
171         }
172         {
173             String jsonArrayOfString = jsonArray(quote(json));
174             DmaapRequestMessage parsedMessage =
175                     messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
176             assertNotNull(parsedMessage);
177             assertNotNull(parsedMessage.getPayload());
178             Assert.assertEquals(dmaapRequestMessage(), parsedMessage);
179         }
180
181     }
182
183     private void setUpMrConfig() {
184         when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
185         when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
186     }
187
188     private String jsonArray(String s) {
189         return "[" + s + "]";
190     }
191
192     private String quote(String s) {
193         return "\"" + s.replace("\"", "\\\"") + "\"";
194     }
195
196     private DmaapRequestMessage dmaapRequestMessage() {
197         return DmaapRequestMessage.builder() //
198                 .apiVersion("apiVersion") //
199                 .correlationId("correlationId") //
200                 .operation(Operation.PUT) //
201                 .originatorId("originatorId") //
202                 .payload(new JsonObject()) //
203                 .requestId("requestId") //
204                 .target("target") //
205                 .timestamp("timestamp") //
206                 .url("URL") //
207                 .build();
208     }
209
210 }