2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.tasks;
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.junit.jupiter.api.Assertions.assertTrue;
21 import static org.mockito.ArgumentMatchers.any;
22 import static org.mockito.Mockito.doReturn;
23 import static org.mockito.Mockito.mock;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.verifyNoMoreInteractions;
28 import static org.mockito.Mockito.when;
30 import java.io.ByteArrayInputStream;
31 import java.io.InputStream;
33 import java.nio.file.Path;
34 import java.nio.file.Paths;
35 import java.time.Duration;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.List;
41 import org.apache.http.Header;
42 import org.apache.http.HttpResponse;
43 import org.apache.http.StatusLine;
44 import org.apache.http.client.methods.HttpPut;
45 import org.apache.http.client.methods.HttpUriRequest;
46 import org.junit.jupiter.api.BeforeAll;
47 import org.junit.jupiter.api.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
50 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
51 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
52 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
53 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
54 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
55 import org.springframework.http.HttpStatus;
57 import reactor.test.StepVerifier;
60 * Tests the DataRouter publisher.
62 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
63 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
65 class DataRouterPublisherTest {
67 private static final String PRODUCT_NAME = "NrRadio";
68 private static final String VENDOR_NAME = "Ericsson";
69 private static final String LAST_EPOCH_MICROSEC = "8745745764578";
70 private static final String SOURCE_NAME = "oteNB5309";
71 private static final String START_EPOCH_MICROSEC = "8745745764578";
72 private static final String TIME_ZONE_OFFSET = "UTC+05:00";
73 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
74 private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
75 private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
77 private static final String COMPRESSION = "gzip";
78 private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
79 private static final String FILE_FORMAT_VERSION = "V10";
80 private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
82 private static final String HOST = "54.45.33.2";
83 private static final String HTTPS_SCHEME = "https";
84 private static final int PORT = 1234;
85 private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
86 private static final String PUBLISH_TOPIC = "publish";
87 private static final String FEED_ID = "1";
88 private static final String FILE_CONTENT = "Just a string.";
90 private static FilePublishInformation filePublishInformation;
91 private static DmaapProducerHttpClient httpClientMock;
92 private static AppConfig appConfig;
93 private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
94 private static Map<String, String> context = new HashMap<>();
95 private static DataRouterPublisher publisherTaskUnderTestSpy;
97 // "https://54.45.333.2:1234/publish/1";
98 private static final String PUBLISH_URL =
99 HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
102 public static void setUp() {
103 when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
105 filePublishInformation = ImmutableFilePublishInformation.builder() //
106 .productName(PRODUCT_NAME) //
107 .vendorName(VENDOR_NAME) //
108 .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
109 .sourceName(SOURCE_NAME) //
110 .startEpochMicrosec(START_EPOCH_MICROSEC) //
111 .timeZoneOffset(TIME_ZONE_OFFSET) //
112 .name(PM_FILE_NAME) //
113 .location(FTPES_ADDRESS) //
114 .internalLocation(Paths.get("target/" + PM_FILE_NAME)) //
115 .compression("gzip") //
116 .fileFormatType(FILE_FORMAT_TYPE) //
117 .fileFormatVersion(FILE_FORMAT_VERSION) //
119 .changeIdentifier(CHANGE_IDENTIFIER) //
121 appConfig = mock(AppConfig.class);
122 publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
126 public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
127 prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
129 .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
130 .expectNext(filePublishInformation) //
133 ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
134 verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
135 verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
136 verifyNoMoreInteractions(httpClientMock);
138 HttpPut actualPut = (HttpPut) requestCaptor.getValue();
139 URI actualUri = actualPut.getURI();
140 assertEquals(HTTPS_SCHEME, actualUri.getScheme());
141 assertEquals(HOST, actualUri.getHost());
142 assertEquals(PORT, actualUri.getPort());
144 Path actualPath = Paths.get(actualUri.getPath());
145 assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
146 assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
147 assertTrue(PM_FILE_NAME.equals(actualPath.getName(2).toString()));
149 Header[] contentHeaders = actualPut.getHeaders("content-type");
150 assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
152 Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
153 Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
155 assertEquals(PRODUCT_NAME, metaHash.get("productName"));
156 assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
157 assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
158 assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
159 assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
160 assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
161 assertEquals(COMPRESSION, metaHash.get("compression"));
162 assertEquals(FTPES_ADDRESS, metaHash.get("location"));
163 assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
164 assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
166 // Note that the following line checks the number of properties that are sent to the data
168 // This should be 10 unless the API is updated (which is the fields checked above)
169 assertEquals(10, metaHash.size());
173 void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
174 prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
177 .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0)))
178 .expectNext(filePublishInformation) //
183 public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
184 prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
185 Integer.valueOf(HttpStatus.OK.value()));
188 .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
189 .expectNext(filePublishInformation) //
192 verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
193 verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
194 verifyNoMoreInteractions(httpClientMock);
198 public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
199 prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
200 Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
203 .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
204 .expectErrorMessage("Retries exhausted: 1/1") //
207 verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
208 verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
209 verifyNoMoreInteractions(httpClientMock);
213 final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
215 httpClientMock = mock(DmaapProducerHttpClient.class);
216 when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
217 doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
218 doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER);
220 HttpResponse httpResponseMock = mock(HttpResponse.class);
221 if (exception == null) {
222 when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
223 .thenReturn(httpResponseMock);
225 when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
226 .thenThrow(exception).thenReturn(httpResponseMock);
228 StatusLine statusLineMock = mock(StatusLine.class);
229 when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
230 when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
232 InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
233 doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", PM_FILE_NAME));
236 private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
237 Map<String, String> metaHash = new HashMap<>();
238 String actualMetaData = metaHeaders[0].getValue();
239 actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
240 actualMetaData = actualMetaData.replace("\"", "");
241 String[] commaSplitedMetaData = actualMetaData.split(",");
242 for (int i = 0; i < commaSplitedMetaData.length; i++) {
243 String[] keyValuePair = commaSplitedMetaData[i].split(":");
244 if (keyValuePair.length > 2) {
245 List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
246 for (int j = 1; j < keyValuePair.length; j++) {
247 arrayKeyValuePair.add(keyValuePair[j]);
249 keyValuePair[1] = String.join(":", arrayKeyValuePair);
251 metaHash.put(keyValuePair[0], keyValuePair[1]);