d4dd89f0b081a0db298fe2f03ebac7dce6a27fe5
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation.
4  * Copyright (C) 2020 Nokia. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.dcaegen2.collectors.datafile.tasks;
23
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.Mockito.doReturn;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.spy;
28 import static org.mockito.Mockito.times;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.verifyNoMoreInteractions;
31 import static org.mockito.Mockito.when;
32 import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG;
33
34 import com.google.gson.JsonElement;
35 import com.google.gson.JsonParser;
36
37 import java.nio.file.Path;
38 import java.nio.file.Paths;
39 import java.util.ArrayList;
40 import java.util.HashMap;
41 import java.util.List;
42 import java.util.Optional;
43
44 import org.junit.jupiter.api.BeforeAll;
45 import org.junit.jupiter.api.Test;
46 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
47 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
48 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
49 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
50 import org.onap.dcaegen2.collectors.datafile.model.FileData;
51 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
52 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
53 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
54 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
55 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
56 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
57 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
58 import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
59 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
60 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
61 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
62 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
63
64 import reactor.core.publisher.Flux;
65 import reactor.core.publisher.Mono;
66 import reactor.test.StepVerifier;
67
68 public class DMaaPMessageConsumerTest {
69     private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
70     private static final String PRODUCT_NAME = "NrRadio";
71     private static final String VENDOR_NAME = "Ericsson";
72     private static final String LAST_EPOCH_MICROSEC = "8745745764578";
73     private static final String SOURCE_NAME = "oteNB5309";
74     private static final String START_EPOCH_MICROSEC = "8745745764578";
75     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
76     private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
77     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
78     private static final String FTPES_SCHEME = "ftpes://";
79     private static final String SFTP_SCHEME = "sftp://";
80     private static final String SERVER_ADDRESS = "192.168.0.101";
81     private static final String PORT_22 = "22";
82     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
83     private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
84     private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME);
85     private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
86     private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
87     private static final String GZIP_COMPRESSION = "gzip";
88     private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
89     private static final String FILE_FORMAT_VERSION = "V10";
90     private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>();
91     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
92
93     private DMaaPConsumerReactiveHttpClient httpClientMock;
94
95     private DMaaPMessageConsumer messageConsumer;
96     private static String ftpesMessageString;
97     private static JsonElement ftpesMessageJson;
98     private static FileData ftpesFileData;
99     private static FileReadyMessage expectedFtpesMessage;
100
101     private static String sftpMessageString;
102     private static JsonElement sftpMessageJson;
103     private static FileData sftpFileData;
104     private static FileReadyMessage expectedSftpMessage;
105
106     private static AppConfig appConfig;
107     private static ConsumerConfiguration dmaapConsumerConfiguration;
108
109     /**
110      * Sets up data for the test.
111      */
112     @BeforeAll
113     public static void setUp() {
114
115         appConfig = mock(AppConfig.class);
116         dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG;
117
118         JsonParser jsonParser = new JsonParser();
119
120         AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
121             .location(FTPES_LOCATION) //
122             .compression(GZIP_COMPRESSION) //
123             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
124             .fileFormatVersion(FILE_FORMAT_VERSION) //
125             .build();
126
127         JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() //
128             .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
129             .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
130             .changeType(FILE_READY_CHANGE_TYPE) //
131             .notificationFieldsVersion("1.0") //
132             .addAdditionalField(ftpesAdditionalField) //
133             .build();
134
135         ftpesMessageString = ftpesJsonMessage.toString();
136         ftpesMessageJson = jsonParser.parse(ftpesMessageString);
137
138         MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
139             .productName(PRODUCT_NAME) //
140             .vendorName(VENDOR_NAME) //
141             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
142             .sourceName(SOURCE_NAME) //
143             .startEpochMicrosec(START_EPOCH_MICROSEC) //
144             .timeZoneOffset(TIME_ZONE_OFFSET) //
145             .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
146             .changeType(FILE_READY_CHANGE_TYPE) //
147             .build();
148         ftpesFileData = ImmutableFileData.builder() //
149             .name(PM_FILE_NAME) //
150             .location(FTPES_LOCATION) //
151             .scheme(Scheme.FTPES) //
152             .compression(GZIP_COMPRESSION) //
153             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
154             .fileFormatVersion(FILE_FORMAT_VERSION) //
155             .messageMetaData(messageMetaData) //
156             .build();
157
158         List<FileData> files = new ArrayList<>();
159         files.add(ftpesFileData);
160         expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
161             .files(files) //
162             .build();
163
164         AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
165             .location(SFTP_LOCATION) //
166             .compression(GZIP_COMPRESSION) //
167             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
168             .fileFormatVersion(FILE_FORMAT_VERSION) //
169             .build();
170         JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() //
171             .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
172             .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
173             .changeType(FILE_READY_CHANGE_TYPE) //
174             .notificationFieldsVersion("1.0") //
175             .addAdditionalField(sftpAdditionalField) //
176             .build();
177         sftpMessageString = sftpJsonMessage.toString();
178         sftpMessageJson = jsonParser.parse(sftpMessageString);
179         sftpFileData = ImmutableFileData.builder() //
180             .name(PM_FILE_NAME) //
181             .location(SFTP_LOCATION) //
182             .scheme(Scheme.FTPES) //
183             .compression(GZIP_COMPRESSION) //
184             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
185             .fileFormatVersion(FILE_FORMAT_VERSION) //
186             .messageMetaData(messageMetaData) //
187             .build();
188
189         ImmutableFilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
190             .productName(PRODUCT_NAME) //
191             .vendorName(VENDOR_NAME) //
192             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
193             .sourceName(SOURCE_NAME) //
194             .startEpochMicrosec(START_EPOCH_MICROSEC) //
195             .timeZoneOffset(TIME_ZONE_OFFSET) //
196             .name(PM_FILE_NAME) //
197             .location(FTPES_LOCATION) //
198             .internalLocation(LOCAL_FILE_LOCATION) //
199             .compression(GZIP_COMPRESSION) //
200             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
201             .fileFormatVersion(FILE_FORMAT_VERSION) //
202             .changeIdentifier(CHANGE_IDENTIFIER) //
203             .context(new HashMap<String, String>()) //
204             .build();
205         listOfFilePublishInformation.add(filePublishInformation);
206
207         files = new ArrayList<>();
208         files.add(sftpFileData);
209         expectedSftpMessage = ImmutableFileReadyMessage.builder() //
210             .files(files) //
211             .build();
212     }
213
214     @Test
215     public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
216         prepareMocksForDmaapConsumer(Optional.empty(), null);
217
218         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
219             .expectSubscription() //
220             .expectError(DatafileTaskException.class) //
221             .verify();
222
223         verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
224     }
225
226     @Test
227     public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
228         prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage);
229
230         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
231             .expectNext(expectedFtpesMessage) //
232             .verifyComplete();
233
234         verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
235         verifyNoMoreInteractions(httpClientMock);
236     }
237
238     @Test
239     public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
240         prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage);
241
242         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
243             .expectNext(expectedSftpMessage) //
244             .verifyComplete();
245
246         verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
247         verifyNoMoreInteractions(httpClientMock);
248     }
249
250     private void prepareMocksForDmaapConsumer(Optional<JsonElement> message,
251         FileReadyMessage fileReadyMessageAfterConsume) {
252         Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty();
253         JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
254         httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
255         when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono);
256         when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
257         ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
258         try {
259             doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap());
260         } catch (DatafileTaskException e) {
261             e.printStackTrace();
262         }
263
264         if (message.isPresent()) {
265             when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume));
266         } else {
267             when(jsonMessageParserMock.getMessagesFromJson(any()))
268                 .thenReturn(Flux.error(new DatafileTaskException("problemas")));
269         }
270
271         messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory));
272     }
273
274 }