2405e46cb8eedfcadee542591cd755610f982529
[ccsdk/oran.git] /
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
22
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.ArgumentMatchers.anyString;
29 import static org.mockito.Mockito.doReturn;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
34
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
37
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
41
42 import java.io.IOException;
43 import java.nio.charset.Charset;
44 import java.util.Optional;
45
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
50 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
51 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import org.springframework.http.HttpHeaders;
55 import org.springframework.http.HttpStatus;
56 import org.springframework.http.ResponseEntity;
57 import org.springframework.web.reactive.function.client.WebClientResponseException;
58
59 import reactor.core.publisher.Mono;
60 import reactor.test.StepVerifier;
61
62 class DmaapMessageHandlerTest {
63     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
64     private static final String URL = "url";
65
66     private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
67     private final AsyncRestClient pmsClient = mock(AsyncRestClient.class);
68     private DmaapMessageHandler testedObject;
69     private static Gson gson = new GsonBuilder() //
70         .create(); //
71
72     @BeforeEach
73     private void setUp() throws Exception {
74         testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient));
75     }
76
77     static JsonObject payloadAsJson() {
78         return gson.fromJson(payloadAsString(), JsonObject.class);
79     }
80
81     static String payloadAsString() {
82         return "{\"param\":\"value\"}";
83     }
84
85     DmaapRequestMessage dmaapRequestMessage(Operation operation) {
86         Optional<JsonObject> payload =
87             ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
88                 : Optional.empty());
89         return ImmutableDmaapRequestMessage.builder() //
90             .apiVersion("apiVersion") //
91             .correlationId("correlationId") //
92             .operation(operation) //
93             .originatorId("originatorId") //
94             .payload(payload) //
95             .requestId("requestId") //
96             .target("target") //
97             .timestamp("timestamp") //
98             .url(URL) //
99             .build();
100     }
101
102     private String dmaapInputMessage(Operation operation) {
103         return gson.toJson(dmaapRequestMessage(operation));
104     }
105
106     private Mono<ResponseEntity<String>> okResponse() {
107         ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
108         return Mono.just(entity);
109     }
110
111     private Mono<ResponseEntity<String>> notOkResponse() {
112         ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
113         return Mono.just(entity);
114     }
115
116     @Test
117     void testMessageParsing() {
118         String message = dmaapInputMessage(Operation.DELETE);
119         logger.info(message);
120         DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
121         assertNotNull(parsedMessage);
122         assertFalse(parsedMessage.payload().isPresent());
123
124         message = dmaapInputMessage(Operation.PUT);
125         logger.info(message);
126         parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
127         assertNotNull(parsedMessage);
128         assertTrue(parsedMessage.payload().isPresent());
129     }
130
131     @Test
132     void unparseableMessage_thenWarning() {
133         final ListAppender<ILoggingEvent> logAppender =
134             LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
135
136         String msg = "bad message";
137         testedObject.handleDmaapMsg(msg);
138
139         assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
140             "handleDmaapMsg failure org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException: Received unparsable "
141                 + "message from DMAAP: \"" + msg + "\", reason: ");
142     }
143
144     @Test
145     void successfulDelete() throws IOException {
146         doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString());
147         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
148
149         String message = dmaapInputMessage(Operation.DELETE);
150
151         StepVerifier //
152             .create(testedObject.createTask(message)) //
153             .expectSubscription() //
154             .expectNext("OK") //
155             .verifyComplete(); //
156
157         verify(pmsClient).deleteForEntity(URL);
158         verifyNoMoreInteractions(pmsClient);
159
160         verify(dmaapClient).post(anyString(), anyString());
161
162         verifyNoMoreInteractions(dmaapClient);
163     }
164
165     @Test
166     void successfulGet() throws IOException {
167         doReturn(okResponse()).when(pmsClient).getForEntity(anyString());
168         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
169
170         StepVerifier //
171             .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
172             .expectSubscription() //
173             .expectNext("OK") //
174             .verifyComplete(); //
175
176         verify(pmsClient).getForEntity(URL);
177         verifyNoMoreInteractions(pmsClient);
178
179         verify(dmaapClient).post(anyString(), anyString());
180         verifyNoMoreInteractions(dmaapClient);
181     }
182
183     @Test
184     void exceptionFromPmsWhenGet_thenPostError() throws IOException {
185         String errorBody = "Unavailable";
186         WebClientResponseException webClientResponseException = new WebClientResponseException(
187             HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
188         doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString());
189         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
190
191         StepVerifier //
192             .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
193             .expectSubscription() //
194             .verifyComplete(); //
195
196         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
197         verify(dmaapClient).post(anyString(), captor.capture());
198         String actualMessage = captor.getValue();
199         assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
200             .contains(errorBody);
201     }
202
203     @Test
204     void successfulPut() throws IOException {
205         doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString());
206         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
207
208         StepVerifier //
209             .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
210             .expectSubscription() //
211             .expectNext("OK") //
212             .verifyComplete(); //
213
214         verify(pmsClient).putForEntity(URL, payloadAsString());
215         verifyNoMoreInteractions(pmsClient);
216
217         verify(dmaapClient).post(anyString(), anyString());
218         verifyNoMoreInteractions(dmaapClient);
219     }
220
221     @Test
222     void successfulPost() throws IOException {
223         doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString());
224         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
225
226         StepVerifier //
227             .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
228             .expectSubscription() //
229             .expectNext("OK") //
230             .verifyComplete(); //
231
232         verify(pmsClient).postForEntity(URL, payloadAsString());
233         verifyNoMoreInteractions(pmsClient);
234
235         verify(dmaapClient).post(anyString(), anyString());
236         verifyNoMoreInteractions(dmaapClient);
237     }
238
239     @Test
240     void exceptionWhenCallingPms_thenNotFoundResponse() throws IOException {
241
242         doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
243         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
244
245         testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
246
247         verify(pmsClient).putForEntity(anyString(), anyString());
248         verifyNoMoreInteractions(pmsClient);
249
250         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
251         verify(dmaapClient).post(anyString(), captor.capture());
252         String actualMessage = captor.getValue();
253         assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
254             .contains(HttpStatus.BAD_GATEWAY.toString());
255
256         verifyNoMoreInteractions(dmaapClient);
257     }
258
259     @Test
260     void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
261         String message = dmaapInputMessage(Operation.PUT).toString();
262         String badOperation = "BAD";
263         message = message.replace(Operation.PUT.toString(), badOperation);
264
265         testedObject.handleDmaapMsg(message);
266
267         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
268         verify(dmaapClient).post(anyString(), captor.capture());
269         String actualMessage = captor.getValue();
270         assertThat(actualMessage).contains("Not implemented operation") //
271             .contains("BAD_REQUEST");
272     }
273
274     @Test
275     void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
276         String message = dmaapInputMessage(Operation.PUT).toString();
277         message = message.replace("payload", "junk");
278
279         final ListAppender<ILoggingEvent> logAppender =
280             LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
281
282         testedObject.handleDmaapMsg(message);
283
284         assertThat(logAppender.list.get(0).getFormattedMessage())
285             .startsWith("Expected payload in message from DMAAP: ");
286     }
287 }