7a9a17abec3f07665049a61f7b8c1d5a45cdeff4
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.junit.jupiter.api.Assertions.assertTrue;
21 import static org.mockito.ArgumentMatchers.any;
22 import static org.mockito.Mockito.doReturn;
23 import static org.mockito.Mockito.mock;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.verifyNoMoreInteractions;
28 import static org.mockito.Mockito.when;
29
30 import java.io.ByteArrayInputStream;
31 import java.io.InputStream;
32 import java.net.URI;
33 import java.nio.file.Path;
34 import java.nio.file.Paths;
35 import java.time.Duration;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import org.apache.http.Header;
41 import org.apache.http.HttpResponse;
42 import org.apache.http.StatusLine;
43 import org.apache.http.client.methods.HttpPut;
44 import org.apache.http.client.methods.HttpUriRequest;
45 import org.junit.jupiter.api.BeforeAll;
46 import org.junit.jupiter.api.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
49 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
50 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
51 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
52 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
53 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
54 import org.springframework.http.HttpStatus;
55 import org.springframework.web.util.DefaultUriBuilderFactory;
56 import org.springframework.web.util.UriBuilder;
57 import reactor.test.StepVerifier;
58
59 /**
60  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
61  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
62  */
63 class DataRouterPublisherTest {
64     private static final String PRODUCT_NAME = "NrRadio";
65     private static final String VENDOR_NAME = "Ericsson";
66     private static final String LAST_EPOCH_MICROSEC = "8745745764578";
67     private static final String SOURCE_NAME = "oteNB5309";
68     private static final String START_EPOCH_MICROSEC = "8745745764578";
69     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
70     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
71     private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
72
73     private static final String COMPRESSION = "gzip";
74     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
75     private static final String FILE_FORMAT_VERSION = "V10";
76     private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
77
78     private static final String HOST = "54.45.33.2";
79     private static final String HTTPS_SCHEME = "https";
80     private static final int PORT = 1234;
81     private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
82     private static final String PUBLISH_TOPIC = "publish";
83     private static final String FEED_ID = "1";
84     private static final String FILE_CONTENT = "Just a string.";
85
86     private static ConsumerDmaapModel consumerDmaapModel;
87     private static DmaapProducerReactiveHttpClient httpClientMock;
88     private static AppConfig appConfig;
89     private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
90     private final Map<String, String> contextMap = new HashMap<>();
91     private static DataRouterPublisher publisherTaskUnderTestSpy;
92
93     /**
94      * Sets up data for tests.
95      */
96     @BeforeAll
97     public static void setUp() {
98         when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
99         when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
100         when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
101
102         consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
103                 .productName(PRODUCT_NAME) //
104                 .vendorName(VENDOR_NAME) //
105                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
106                 .sourceName(SOURCE_NAME) //
107                 .startEpochMicrosec(START_EPOCH_MICROSEC) //
108                 .timeZoneOffset(TIME_ZONE_OFFSET) //
109                 .name(PM_FILE_NAME) //
110                 .location(FTPES_ADDRESS) //
111                 .internalLocation(Paths.get("target/" + PM_FILE_NAME)) //
112                 .compression("gzip") //
113                 .fileFormatType(FILE_FORMAT_TYPE) //
114                 .fileFormatVersion(FILE_FORMAT_VERSION) //
115                 .build(); //
116         appConfig = mock(AppConfig.class);
117         publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
118     }
119
120     @Test
121     public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
122         prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
123         StepVerifier
124                 .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
125                 .expectNext(consumerDmaapModel) //
126                 .verifyComplete();
127
128         ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
129         verify(httpClientMock).getBaseUri();
130         verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
131         verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
132         verifyNoMoreInteractions(httpClientMock);
133
134         HttpPut actualPut = (HttpPut) requestCaptor.getValue();
135         URI actualUri = actualPut.getURI();
136         assertEquals(HTTPS_SCHEME, actualUri.getScheme());
137         assertEquals(HOST, actualUri.getHost());
138         assertEquals(PORT, actualUri.getPort());
139         Path actualPath = Paths.get(actualUri.getPath());
140         assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
141         assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
142         assertTrue(PM_FILE_NAME.equals(actualPath.getName(2).toString()));
143
144         Header[] contentHeaders = actualPut.getHeaders("content-type");
145         assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
146
147         Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
148         Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
149         assertTrue(10 == metaHash.size());
150         assertEquals(PRODUCT_NAME, metaHash.get("productName"));
151         assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
152         assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
153         assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
154         assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
155         assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
156         assertEquals(COMPRESSION, metaHash.get("compression"));
157         assertEquals(FTPES_ADDRESS, metaHash.get("location"));
158         assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
159         assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
160     }
161
162     @Test
163     void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
164         prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
165
166         StepVerifier
167                 .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), contextMap))
168                 .expectNext(consumerDmaapModel) //
169                 .verifyComplete();
170     }
171
172     @Test
173     public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
174         prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
175                 Integer.valueOf(HttpStatus.OK.value()));
176
177         StepVerifier
178                 .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
179                 .expectNext(consumerDmaapModel) //
180                 .verifyComplete();
181
182         verify(httpClientMock, times(2)).getBaseUri();
183         verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
184         verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
185         verifyNoMoreInteractions(httpClientMock);
186     }
187
188     @Test
189     public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
190         prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
191                 Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
192
193         StepVerifier
194                 .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
195                 .expectErrorMessage("Retries exhausted: 1/1") //
196                 .verify();
197
198         verify(httpClientMock, times(2)).getBaseUri();
199         verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
200         verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
201         verifyNoMoreInteractions(httpClientMock);
202     }
203
204     @SafeVarargs
205     final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
206             throws Exception {
207         httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
208         when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
209         doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
210         doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
211
212         UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
213         when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
214
215         HttpResponse httpResponseMock = mock(HttpResponse.class);
216         if (exception == null) {
217             when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
218                     .thenReturn(httpResponseMock);
219         } else {
220             when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
221                     .thenThrow(exception).thenReturn(httpResponseMock);
222         }
223         StatusLine statusLineMock = mock(StatusLine.class);
224         when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
225         when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
226
227         InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
228         doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", PM_FILE_NAME));
229     }
230
231     private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
232         Map<String, String> metaHash = new HashMap<>();
233         String actualMetaData = metaHeaders[0].getValue();
234         actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
235         actualMetaData = actualMetaData.replace("\"", "");
236         String[] commaSplitedMetaData = actualMetaData.split(",");
237         for (int i = 0; i < commaSplitedMetaData.length; i++) {
238             String[] keyValuePair = commaSplitedMetaData[i].split(":");
239             if (keyValuePair.length > 2) {
240                 List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
241                 for (int j = 1; j < keyValuePair.length; j++) {
242                     arrayKeyValuePair.add(keyValuePair[j]);
243                 }
244                 keyValuePair[1] = String.join(":", arrayKeyValuePair);
245             }
246             metaHash.put(keyValuePair[0], keyValuePair[1]);
247         }
248         return metaHash;
249     }
250 }