fb36917449de9531cfebd3e23bcb8cf35d1bb99d
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertTrue;
21 import static org.mockito.ArgumentMatchers.any;
22 import static org.mockito.Mockito.doReturn;
23 import static org.mockito.Mockito.mock;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.verifyNoMoreInteractions;
28 import static org.mockito.Mockito.when;
29
30 import ch.qos.logback.classic.spi.ILoggingEvent;
31 import ch.qos.logback.core.read.ListAppender;
32
33 import java.io.File;
34 import java.net.URI;
35 import java.nio.file.Path;
36 import java.nio.file.Paths;
37 import java.time.Duration;
38 import java.util.ArrayList;
39 import java.util.HashMap;
40 import java.util.List;
41 import java.util.Map;
42
43 import org.apache.http.Header;
44 import org.apache.http.HttpResponse;
45 import org.apache.http.StatusLine;
46 import org.apache.http.client.methods.HttpPut;
47 import org.apache.http.client.methods.HttpUriRequest;
48 import org.junit.jupiter.api.BeforeAll;
49 import org.junit.jupiter.api.BeforeEach;
50 import org.junit.jupiter.api.Test;
51 import org.mockito.ArgumentCaptor;
52 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
53 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
54 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
55 import org.onap.dcaegen2.collectors.datafile.model.Counters;
56 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
57 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
58 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
59 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
60 import org.springframework.http.HttpStatus;
61 import reactor.test.StepVerifier;
62
63 /**
64  * Tests the DataRouter publisher.
65  *
66  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
67  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
68  */
69 class DataRouterPublisherTest {
70
71     private static final String PRODUCT_NAME = "NrRadio";
72     private static final String VENDOR_NAME = "Ericsson";
73     private static final String LAST_EPOCH_MICROSEC = "8745745764578";
74     private static final String SOURCE_NAME = "oteNB5309";
75     private static final String START_EPOCH_MICROSEC = "8745745764578";
76     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
77     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
78     private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
79     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
80
81     private static final String COMPRESSION = "gzip";
82     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
83     private static final String FILE_FORMAT_VERSION = "V10";
84     private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
85
86     private static final String HOST = "54.45.33.2";
87     private static final String HTTPS_SCHEME = "https";
88     private static final int PORT = 1234;
89     private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
90     private static final String PUBLISH_TOPIC = "publish";
91     private static final String FEED_ID = "1";
92
93     // "https://54.45.333.2:1234/publish/1";
94     private static final String PUBLISH_URL =
95         HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
96
97     private static FilePublishInformation filePublishInformation;
98     private static DmaapProducerHttpClient httpClientMock;
99     private static AppConfig appConfig;
100     private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
101     private static Map<String, String> context = new HashMap<>();
102     private static DataRouterPublisher publisherTaskUnderTestSpy;
103     private static final Counters counters = new Counters();
104
105     @BeforeAll
106     public static void setUp() {
107         when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
108
109         filePublishInformation = ImmutableFilePublishInformation.builder() //
110             .productName(PRODUCT_NAME) //
111             .vendorName(VENDOR_NAME) //
112             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
113             .sourceName(SOURCE_NAME) //
114             .startEpochMicrosec(START_EPOCH_MICROSEC) //
115             .timeZoneOffset(TIME_ZONE_OFFSET) //
116             .name(PM_FILE_NAME) //
117             .location(FTPES_ADDRESS) //
118             .internalLocation(Paths.get("target/" + PM_FILE_NAME)) //
119             .compression("gzip") //
120             .fileFormatType(FILE_FORMAT_TYPE) //
121             .fileFormatVersion(FILE_FORMAT_VERSION) //
122             .context(context) //
123             .changeIdentifier(CHANGE_IDENTIFIER) //
124             .build(); //
125         appConfig = mock(AppConfig.class);
126         publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, counters));
127     }
128
129     @BeforeEach
130     void setUpTest() {
131         counters.clear();
132     }
133
134     @Test
135     public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
136         prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
137         StepVerifier //
138             .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
139             .expectNext(filePublishInformation) //
140             .verifyComplete();
141
142         ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
143         verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
144         verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
145         verifyNoMoreInteractions(httpClientMock);
146
147         HttpPut actualPut = (HttpPut) requestCaptor.getValue();
148         URI actualUri = actualPut.getURI();
149         assertEquals(HTTPS_SCHEME, actualUri.getScheme());
150         assertEquals(HOST, actualUri.getHost());
151         assertEquals(PORT, actualUri.getPort());
152
153         Path actualPath = Paths.get(actualUri.getPath());
154         assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
155         assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
156         assertTrue(PM_FILE_NAME.equals(actualPath.getName(2).toString()));
157
158         Header[] contentHeaders = actualPut.getHeaders("content-type");
159         assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
160
161         Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
162         Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
163
164         assertEquals(PRODUCT_NAME, metaHash.get("productName"));
165         assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
166         assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
167         assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
168         assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
169         assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
170         assertEquals(COMPRESSION, metaHash.get("compression"));
171         assertEquals(FTPES_ADDRESS, metaHash.get("location"));
172         assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
173         assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
174
175         // Note that the following line checks the number of properties that are sent to the data
176         // router.
177         // This should be 10 unless the API is updated (which is the fields checked above)
178         assertEquals(10, metaHash.size());
179
180         assertEquals("totalPublishedFiles should have been 1", 1, counters.getTotalPublishedFiles());
181         assertEquals("noOfFailedPublishAttempts should have been 0", 0, counters.getNoOfFailedPublishAttempts());
182     }
183
184     @Test
185     void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
186         prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
187
188         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterPublisher.class);
189         StepVerifier.create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0)))
190             .expectNext(filePublishInformation) //
191             .verifyComplete();
192
193         assertTrue("Warning missing in log",
194             logAppender.list.toString().contains("[WARN] Publishing file " + PM_FILE_NAME + " to DR unsuccessful."));
195
196         assertEquals("totalPublishedFiles should have been 1", 1, counters.getTotalPublishedFiles());
197         assertEquals("noOfFailedPublishAttempts should have been 1", 1, counters.getNoOfFailedPublishAttempts());
198     }
199
200     @Test
201     public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
202         prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
203             Integer.valueOf(HttpStatus.OK.value()));
204
205         StepVerifier //
206             .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
207             .expectNext(filePublishInformation) //
208             .verifyComplete();
209
210         verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
211         verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
212         verifyNoMoreInteractions(httpClientMock);
213
214         assertEquals("totalPublishedFiles should have been 1", 1, counters.getTotalPublishedFiles());
215         assertEquals("noOfFailedPublishAttempts should have been 1", 1, counters.getNoOfFailedPublishAttempts());
216     }
217
218     @Test
219     public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
220         prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
221             Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
222
223         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterPublisher.class);
224         StepVerifier.create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
225             .expectErrorMessage("Retries exhausted: 1/1") //
226             .verify();
227
228         assertTrue("Warning missing in log", logAppender.list.toString().contains("[WARN] Publishing file "
229             + PM_FILE_NAME + " to DR unsuccessful. Response code: " + HttpStatus.BAD_GATEWAY));
230
231         verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
232         verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
233         verifyNoMoreInteractions(httpClientMock);
234
235         assertEquals("totalPublishedFiles should have been 0", 0, counters.getTotalPublishedFiles());
236         assertEquals("noOfFailedPublishAttempts should have been 2", 2, counters.getNoOfFailedPublishAttempts());
237     }
238
239     @SafeVarargs
240     final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
241         throws Exception {
242         httpClientMock = mock(DmaapProducerHttpClient.class);
243         when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
244         doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
245         doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER);
246
247         HttpResponse httpResponseMock = mock(HttpResponse.class);
248         if (exception == null) {
249             when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
250                 .thenReturn(httpResponseMock);
251         } else {
252             when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
253                 .thenThrow(exception).thenReturn(httpResponseMock);
254         }
255         StatusLine statusLineMock = mock(StatusLine.class);
256         when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
257         when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
258
259         File file = File.createTempFile("DFC", "tmp");
260         doReturn(file).when(publisherTaskUnderTestSpy).createInputFile(Paths.get("target", PM_FILE_NAME));
261     }
262
263     private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
264         Map<String, String> metaHash = new HashMap<>();
265         String actualMetaData = metaHeaders[0].getValue();
266         actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
267         actualMetaData = actualMetaData.replace("\"", "");
268         String[] commaSplitedMetaData = actualMetaData.split(",");
269         for (int i = 0; i < commaSplitedMetaData.length; i++) {
270             String[] keyValuePair = commaSplitedMetaData[i].split(":");
271             if (keyValuePair.length > 2) {
272                 List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
273                 for (int j = 1; j < keyValuePair.length; j++) {
274                     arrayKeyValuePair.add(keyValuePair[j]);
275                 }
276                 keyValuePair[1] = String.join(":", arrayKeyValuePair);
277             }
278             metaHash.put(keyValuePair[0], keyValuePair[1]);
279         }
280         return metaHash;
281     }
282 }