7abc5db83b9dfc37a421ef4bd8f64862c82c92c8
[ccsdk/oran.git] /
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2020 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32
33 import com.google.gson.Gson;
34 import com.google.gson.GsonBuilder;
35 import com.google.gson.JsonObject;
36
37 import java.util.ArrayList;
38
39 import org.junit.Assert;
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.DisplayName;
42 import org.junit.jupiter.api.Test;
43 import org.junit.jupiter.api.extension.ExtendWith;
44 import org.mockito.Mock;
45 import org.mockito.junit.jupiter.MockitoExtension;
46 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
47 import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
48 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
49 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
50 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
51 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
52
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
55
56 @ExtendWith(MockitoExtension.class)
57 class DmaapMessageConsumerTest {
58     @Mock
59     private ApplicationConfig applicationConfigMock;
60     @Mock
61     private AsyncRestClient messageRouterConsumerMock;
62     @Mock
63     private DmaapMessageHandler messageHandlerMock;
64
65     private DmaapMessageConsumer messageConsumerUnderTest;
66
67     private Gson gson = new GsonBuilder().create();
68
69     @AfterEach
70     void resetLogging() {
71         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
72     }
73
74     private void setTaskNumberOfLoops(int number) {
75         ArrayList<Integer> l = new ArrayList<>();
76         for (int i = 0; i < number; ++i) {
77             l.add(i);
78         }
79         Flux<Integer> f = Flux.fromIterable(l);
80         doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
81     }
82
83     private void disableTaskDelay() {
84         doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
85     }
86
87     @Test
88     @DisplayName("successful Case dmaap Not Configured then Sleep And Retry Until Config")
89     void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
90         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
91
92         setTaskNumberOfLoops(3);
93         disableTaskDelay();
94
95         when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
96         doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
97         doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
98                 .getFromMessageRouter(anyString());
99
100         doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
101
102         String s = messageConsumerUnderTest.createTask().blockLast();
103         assertEquals("responseFromHandler", s);
104         verify(messageConsumerUnderTest, times(2)).delay();
105         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(any());
106     }
107
108     @Test
109     @DisplayName("return Error From Dmapp then Sleep And Retry")
110     void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
111         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
112
113         setTaskNumberOfLoops(2);
114         disableTaskDelay();
115         setUpMrConfig();
116
117         {
118             Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
119             Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
120             doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
121         }
122
123         doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
124
125         String s = messageConsumerUnderTest.createTask().blockLast();
126
127         verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
128         verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
129         verify(messageConsumerUnderTest, times(1)).delay();
130         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(any());
131         assertEquals("response1", s);
132     }
133
134     @Test
135     @DisplayName("unParsable Message then Send Response And Continue")
136     void unParsableMessage_thenSendResponseAndContinue() throws Exception {
137         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock, new SecurityContext("")));
138         setTaskNumberOfLoops(2);
139         setUpMrConfig();
140
141         {
142             Mono<String> dmaapError = Mono.just("Non valid JSON \"");
143             Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
144             doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
145         }
146
147         doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
148
149         String s = messageConsumerUnderTest.createTask().blockLast();
150         assertEquals("response1", s);
151
152         verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
153         verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
154         verify(messageConsumerUnderTest, times(0)).delay();
155         verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
156     }
157
158     private String dmaapRequestMessageString() {
159         String json = gson.toJson(dmaapRequestMessage());
160         return jsonArray(json);
161     }
162
163     @Test
164     @DisplayName("test Message Parsing")
165     void testMessageParsing() throws ServiceException {
166         messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock, new SecurityContext(""));
167         String json = gson.toJson(dmaapRequestMessage());
168         {
169             String jsonArrayOfObject = jsonArray(json);
170             DmaapRequestMessage parsedMessage =
171                     messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
172             assertNotNull(parsedMessage);
173             assertNotNull(parsedMessage.getPayload());
174
175             Assert.assertEquals(dmaapRequestMessage(), parsedMessage);
176         }
177         {
178             String jsonArrayOfString = jsonArray(quote(json));
179             DmaapRequestMessage parsedMessage =
180                     messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
181             assertNotNull(parsedMessage);
182             assertNotNull(parsedMessage.getPayload());
183             Assert.assertEquals(dmaapRequestMessage(), parsedMessage);
184         }
185
186     }
187
188     private void setUpMrConfig() {
189         when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
190         when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
191     }
192
193     private String jsonArray(String s) {
194         return "[" + s + "]";
195     }
196
197     private String quote(String s) {
198         return "\"" + s.replace("\"", "\\\"") + "\"";
199     }
200
201     private DmaapRequestMessage dmaapRequestMessage() {
202         return DmaapRequestMessage.builder() //
203                 .apiVersion("apiVersion") //
204                 .correlationId("correlationId") //
205                 .operation(Operation.PUT) //
206                 .originatorId("originatorId") //
207                 .payload(new JsonObject()) //
208                 .requestId("requestId") //
209                 .target("target") //
210                 .timestamp("timestamp") //
211                 .url("URL") //
212                 .build();
213     }
214
215 }