a4319d37e5135872775d093106b1230c4b3bd070
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile.tasks;
22
23 import static org.mockito.ArgumentMatchers.any;
24 import static org.mockito.Mockito.doReturn;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.verifyNoMoreInteractions;
30 import static org.mockito.Mockito.when;
31 import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG;
32
33 import com.google.gson.JsonElement;
34 import com.google.gson.JsonParser;
35
36 import java.nio.file.Path;
37 import java.nio.file.Paths;
38 import java.util.ArrayList;
39 import java.util.HashMap;
40 import java.util.List;
41 import java.util.Optional;
42
43 import org.junit.jupiter.api.BeforeAll;
44 import org.junit.jupiter.api.Test;
45 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
46 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
47 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
48 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
49 import org.onap.dcaegen2.collectors.datafile.model.FileData;
50 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
51 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
52 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
53 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
54 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
55 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
56 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
57 import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
58 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
59 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
60 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
61 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
62
63 import reactor.core.publisher.Flux;
64 import reactor.core.publisher.Mono;
65 import reactor.test.StepVerifier;
66
67 public class DMaaPMessageConsumerTest {
68     private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
69     private static final String PRODUCT_NAME = "NrRadio";
70     private static final String VENDOR_NAME = "Ericsson";
71     private static final String LAST_EPOCH_MICROSEC = "8745745764578";
72     private static final String SOURCE_NAME = "oteNB5309";
73     private static final String START_EPOCH_MICROSEC = "8745745764578";
74     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
75     private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
76     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
77     private static final String FTPES_SCHEME = "ftpes://";
78     private static final String SFTP_SCHEME = "sftp://";
79     private static final String SERVER_ADDRESS = "192.168.0.101";
80     private static final String PORT_22 = "22";
81     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
82     private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
83     private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME);
84     private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
85     private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
86     private static final String GZIP_COMPRESSION = "gzip";
87     private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
88     private static final String FILE_FORMAT_VERSION = "V10";
89     private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>();
90     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
91
92     private DMaaPConsumerReactiveHttpClient httpClientMock;
93
94     private DMaaPMessageConsumer messageConsumer;
95     private static String ftpesMessageString;
96     private static JsonElement ftpesMessageJson;
97     private static FileData ftpesFileData;
98     private static FileReadyMessage expectedFtpesMessage;
99
100     private static String sftpMessageString;
101     private static JsonElement sftpMessageJson;
102     private static FileData sftpFileData;
103     private static FileReadyMessage expectedSftpMessage;
104
105     private static AppConfig appConfig;
106     private static ConsumerConfiguration dmaapConsumerConfiguration;
107
108     /**
109      * Sets up data for the test.
110      */
111     @BeforeAll
112     public static void setUp() {
113
114         appConfig = mock(AppConfig.class);
115         dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG;
116
117         JsonParser jsonParser = new JsonParser();
118
119         AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
120             .location(FTPES_LOCATION) //
121             .compression(GZIP_COMPRESSION) //
122             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
123             .fileFormatVersion(FILE_FORMAT_VERSION) //
124             .build();
125
126         JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() //
127             .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
128             .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
129             .changeType(FILE_READY_CHANGE_TYPE) //
130             .notificationFieldsVersion("1.0") //
131             .addAdditionalField(ftpesAdditionalField) //
132             .build();
133
134         ftpesMessageString = ftpesJsonMessage.toString();
135         ftpesMessageJson = jsonParser.parse(ftpesMessageString);
136
137         MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
138             .productName(PRODUCT_NAME) //
139             .vendorName(VENDOR_NAME) //
140             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
141             .sourceName(SOURCE_NAME) //
142             .startEpochMicrosec(START_EPOCH_MICROSEC) //
143             .timeZoneOffset(TIME_ZONE_OFFSET) //
144             .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
145             .changeType(FILE_READY_CHANGE_TYPE) //
146             .build();
147         ftpesFileData = ImmutableFileData.builder() //
148             .name(PM_FILE_NAME) //
149             .location(FTPES_LOCATION) //
150             .scheme(Scheme.FTPS) //
151             .compression(GZIP_COMPRESSION) //
152             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
153             .fileFormatVersion(FILE_FORMAT_VERSION) //
154             .messageMetaData(messageMetaData) //
155             .build();
156
157         List<FileData> files = new ArrayList<>();
158         files.add(ftpesFileData);
159         expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
160             .files(files) //
161             .build();
162
163         AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
164             .location(SFTP_LOCATION) //
165             .compression(GZIP_COMPRESSION) //
166             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
167             .fileFormatVersion(FILE_FORMAT_VERSION) //
168             .build();
169         JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() //
170             .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
171             .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
172             .changeType(FILE_READY_CHANGE_TYPE) //
173             .notificationFieldsVersion("1.0") //
174             .addAdditionalField(sftpAdditionalField) //
175             .build();
176         sftpMessageString = sftpJsonMessage.toString();
177         sftpMessageJson = jsonParser.parse(sftpMessageString);
178         sftpFileData = ImmutableFileData.builder() //
179             .name(PM_FILE_NAME) //
180             .location(SFTP_LOCATION) //
181             .scheme(Scheme.FTPS) //
182             .compression(GZIP_COMPRESSION) //
183             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
184             .fileFormatVersion(FILE_FORMAT_VERSION) //
185             .messageMetaData(messageMetaData) //
186             .build();
187
188         ImmutableFilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
189             .productName(PRODUCT_NAME) //
190             .vendorName(VENDOR_NAME) //
191             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
192             .sourceName(SOURCE_NAME) //
193             .startEpochMicrosec(START_EPOCH_MICROSEC) //
194             .timeZoneOffset(TIME_ZONE_OFFSET) //
195             .name(PM_FILE_NAME) //
196             .location(FTPES_LOCATION) //
197             .internalLocation(LOCAL_FILE_LOCATION) //
198             .compression(GZIP_COMPRESSION) //
199             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
200             .fileFormatVersion(FILE_FORMAT_VERSION) //
201             .changeIdentifier(CHANGE_IDENTIFIER) //
202             .context(new HashMap<String, String>()) //
203             .build();
204         listOfFilePublishInformation.add(filePublishInformation);
205
206         files = new ArrayList<>();
207         files.add(sftpFileData);
208         expectedSftpMessage = ImmutableFileReadyMessage.builder() //
209             .files(files) //
210             .build();
211     }
212
213     @Test
214     public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
215         prepareMocksForDmaapConsumer(Optional.empty(), null);
216
217         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
218             .expectSubscription() //
219             .expectError(DatafileTaskException.class) //
220             .verify();
221
222         verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
223     }
224
225     @Test
226     public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
227         prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage);
228
229         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
230             .expectNext(expectedFtpesMessage) //
231             .verifyComplete();
232
233         verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
234         verifyNoMoreInteractions(httpClientMock);
235     }
236
237     @Test
238     public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
239         prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage);
240
241         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
242             .expectNext(expectedSftpMessage) //
243             .verifyComplete();
244
245         verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
246         verifyNoMoreInteractions(httpClientMock);
247     }
248
249     private void prepareMocksForDmaapConsumer(Optional<JsonElement> message,
250         FileReadyMessage fileReadyMessageAfterConsume) {
251         Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty();
252         JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
253         httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
254         when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono);
255         when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
256         ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
257         try {
258             doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap());
259         } catch (DatafileTaskException e) {
260             e.printStackTrace();
261         }
262
263         if (message.isPresent()) {
264             when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume));
265         } else {
266             when(jsonMessageParserMock.getMessagesFromJson(any()))
267                 .thenReturn(Flux.error(new DatafileTaskException("problemas")));
268         }
269
270         messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory));
271     }
272
273 }