Merge "Improvent of AsynchRestClient"
[ccsdk/oran.git] / a1-policy-management / src / test / java / org / onap / ccsdk / oran / a1policymanagementservice / dmaap / DmaapMessageHandlerTest.java
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
22
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.verifyNoMoreInteractions;
31
32 import ch.qos.logback.classic.spi.ILoggingEvent;
33 import ch.qos.logback.core.read.ListAppender;
34
35 import com.google.gson.Gson;
36 import com.google.gson.GsonBuilder;
37 import com.google.gson.JsonObject;
38
39 import java.io.IOException;
40 import java.nio.charset.Charset;
41 import java.util.Optional;
42
43 import org.junit.jupiter.api.BeforeEach;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.ArgumentCaptor;
46 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
47 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
48 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
49 import org.springframework.http.HttpHeaders;
50 import org.springframework.http.HttpStatus;
51 import org.springframework.http.ResponseEntity;
52 import org.springframework.web.reactive.function.client.WebClientResponseException;
53
54 import reactor.core.publisher.Mono;
55 import reactor.test.StepVerifier;
56
57 class DmaapMessageHandlerTest {
58     private static final String URL = "url";
59
60     private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
61     private final AsyncRestClient pmsClient = mock(AsyncRestClient.class);
62     private DmaapMessageHandler testedObject;
63     private Gson gson = new GsonBuilder().create(); //
64
65     @BeforeEach
66     private void setUp() throws Exception {
67         testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient));
68     }
69
70     JsonObject payloadAsJson() {
71         return gson.fromJson(payloadAsString(), JsonObject.class);
72     }
73
74     String payloadAsString() {
75         return "{\"param\":\"value\"}";
76     }
77
78     DmaapRequestMessage dmaapRequestMessage(Operation operation) {
79         Optional<JsonObject> payload =
80                 ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
81                         : Optional.empty());
82         return ImmutableDmaapRequestMessage.builder() //
83                 .apiVersion("apiVersion") //
84                 .correlationId("correlationId") //
85                 .operation(operation) //
86                 .originatorId("originatorId") //
87                 .payload(payload) //
88                 .requestId("requestId") //
89                 .target("target") //
90                 .timestamp("timestamp") //
91                 .url(URL) //
92                 .build();
93     }
94
95     private Mono<ResponseEntity<String>> okResponse() {
96         ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
97         return Mono.just(entity);
98     }
99
100     private Mono<ResponseEntity<String>> notOkResponse() {
101         ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
102         return Mono.just(entity);
103     }
104
105     @Test
106     void successfulDelete() throws IOException {
107         doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString());
108         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
109
110         DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
111
112         StepVerifier //
113                 .create(testedObject.createTask(message)) //
114                 .expectSubscription() //
115                 .expectNext("OK") //
116                 .verifyComplete(); //
117
118         verify(pmsClient).deleteForEntity(URL);
119         verifyNoMoreInteractions(pmsClient);
120
121         verify(dmaapClient).post(anyString(), anyString());
122
123         verifyNoMoreInteractions(dmaapClient);
124     }
125
126     @Test
127     void successfulGet() throws IOException {
128         doReturn(okResponse()).when(pmsClient).getForEntity(anyString());
129         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
130
131         DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
132         StepVerifier //
133                 .create(testedObject.createTask(message)) //
134                 .expectSubscription() //
135                 .expectNext("OK") //
136                 .verifyComplete(); //
137
138         verify(pmsClient).getForEntity(URL);
139         verifyNoMoreInteractions(pmsClient);
140
141         verify(dmaapClient).post(anyString(), anyString());
142         verifyNoMoreInteractions(dmaapClient);
143     }
144
145     @Test
146     void exceptionFromPmsWhenGet_thenPostError() throws IOException {
147         String errorBody = "Unavailable";
148         WebClientResponseException webClientResponseException = new WebClientResponseException(
149                 HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
150         doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString());
151         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
152
153         DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
154         StepVerifier //
155                 .create(testedObject.createTask(message)) //
156                 .expectSubscription() //
157                 .verifyComplete(); //
158
159         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
160         verify(dmaapClient).post(anyString(), captor.capture());
161         String actualMessage = captor.getValue();
162         assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
163                 .contains(errorBody);
164     }
165
166     @Test
167     void successfulPut() throws IOException {
168         doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString());
169         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
170
171         DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
172         StepVerifier //
173                 .create(testedObject.createTask(message)) //
174                 .expectSubscription() //
175                 .expectNext("OK") //
176                 .verifyComplete(); //
177
178         verify(pmsClient).putForEntity(URL, payloadAsString());
179         verifyNoMoreInteractions(pmsClient);
180
181         verify(dmaapClient).post(anyString(), anyString());
182         verifyNoMoreInteractions(dmaapClient);
183     }
184
185     @Test
186     void successfulPost() throws IOException {
187         doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString());
188         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
189
190         DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
191         StepVerifier //
192                 .create(testedObject.createTask(message)) //
193                 .expectSubscription() //
194                 .expectNext("OK") //
195                 .verifyComplete(); //
196
197         verify(pmsClient).postForEntity(URL, payloadAsString());
198         verifyNoMoreInteractions(pmsClient);
199
200         verify(dmaapClient).post(anyString(), anyString());
201         verifyNoMoreInteractions(dmaapClient);
202     }
203
204     @Test
205     void exceptionWhenCallingPms_thenErrorResponse() throws IOException {
206
207         doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
208         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
209
210         DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
211         testedObject.createTask(message).block();
212
213         verify(pmsClient).putForEntity(anyString(), anyString());
214         verifyNoMoreInteractions(pmsClient);
215
216         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
217         verify(dmaapClient).post(anyString(), captor.capture());
218         String actualMessage = captor.getValue();
219         assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
220                 .contains(HttpStatus.BAD_GATEWAY.toString());
221
222         verifyNoMoreInteractions(dmaapClient);
223     }
224
225     @Test
226     void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
227         DmaapRequestMessage message = ImmutableDmaapRequestMessage.builder() //
228                 .apiVersion("apiVersion") //
229                 .correlationId("correlationId") //
230                 .operation(DmaapRequestMessage.Operation.PUT) //
231                 .originatorId("originatorId") //
232                 .payload(Optional.empty()) //
233                 .requestId("requestId") //
234                 .target("target") //
235                 .timestamp("timestamp") //
236                 .url(URL) //
237                 .build();
238
239         final ListAppender<ILoggingEvent> logAppender =
240                 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
241
242         testedObject.handleDmaapMsg(message);
243
244         assertThat(logAppender.list.get(0).getFormattedMessage())
245                 .startsWith("Expected payload in message from DMAAP: ");
246     }
247
248 }