8810c921d4c46c4ad238ba4573b2da4232cbf73a
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.times;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.verifyNoMoreInteractions;
25 import static org.mockito.Mockito.when;
26
27 import java.util.ArrayList;
28 import java.util.List;
29
30 import org.junit.jupiter.api.BeforeAll;
31 import org.junit.jupiter.api.Test;
32 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
33 import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
34 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
35 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
36 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
37 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
38 import org.onap.dcaegen2.collectors.datafile.model.FileData;
39 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
40 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
41 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
42 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
43 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
44 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
45
46 import reactor.core.publisher.Flux;
47 import reactor.core.publisher.Mono;
48 import reactor.test.StepVerifier;
49
50 /**
51  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
52  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
53  */
54 class DmaapConsumerTaskImplTest {
55     private static final String PRODUCT_NAME = "NrRadio";
56     private static final String VENDOR_NAME = "Ericsson";
57     private static final String LAST_EPOCH_MICROSEC = "8745745764578";
58     private static final String SOURCE_NAME = "oteNB5309";
59     private static final String START_EPOCH_MICROSEC = "8745745764578";
60     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
61     private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
62     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
63     private static final String FTPES_SCHEME = "ftpes://";
64     private static final String SFTP_SCHEME = "sftp://";
65     private static final String SERVER_ADDRESS = "192.168.0.101";
66     private static final String PORT_22 = "22";
67     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
68     private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
69     private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
70     private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
71     private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
72     private static final String GZIP_COMPRESSION = "gzip";
73     private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
74     private static final String FILE_FORMAT_VERSION = "V10";
75
76     private static List<ConsumerDmaapModel> listOfConsumerDmaapModel = new ArrayList<ConsumerDmaapModel>();
77
78     private static AppConfig appConfig;
79     private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
80     private DmaapConsumerTaskImpl dmaapConsumerTask;
81     private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
82
83     private static String ftpesMessage;
84     private static FileData ftpesFileData;
85
86     private static String sftpMessage;
87     private static FileData sftpFileData;
88
89     @BeforeAll
90     public static void setUp() {
91         //@formatter:off
92         dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
93                 .consumerGroup("OpenDCAE-c12")
94                 .consumerId("c12")
95                 .dmaapContentType("application/json")
96                 .dmaapHostName("54.45.33.2")
97                 .dmaapPortNumber(1234).dmaapProtocol("https")
98                 .dmaapUserName("Datafile")
99                 .dmaapUserPassword("Datafile")
100                 .dmaapTopicName("unauthenticated.NOTIFICATION")
101                 .timeoutMS(-1)
102                 .messageLimit(-1)
103                 .build();
104
105         appConfig = mock(AppConfig.class);
106
107         AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder()
108                 .location(FTPES_LOCATION)
109                 .compression(GZIP_COMPRESSION)
110                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
111                 .fileFormatVersion(FILE_FORMAT_VERSION)
112                 .build();
113
114         JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder()
115                 .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
116                 .changeType(FILE_READY_CHANGE_TYPE)
117                 .notificationFieldsVersion("1.0")
118                 .addAdditionalField(ftpesAdditionalField)
119                 .build();
120
121         ftpesMessage = ftpesJsonMessage.toString();
122         ftpesFileData = ImmutableFileData.builder()
123                 .productName(PRODUCT_NAME)
124                 .vendorName(VENDOR_NAME)
125                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
126                 .sourceName(SOURCE_NAME)
127                 .startEpochMicrosec(START_EPOCH_MICROSEC)
128                 .timeZoneOffset(TIME_ZONE_OFFSET)
129                 .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
130                 .changeType(FILE_READY_CHANGE_TYPE)
131                 .name(PM_FILE_NAME)
132                 .location(FTPES_LOCATION)
133                 .compression(GZIP_COMPRESSION)
134                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
135                 .fileFormatVersion(FILE_FORMAT_VERSION)
136                 .build();
137
138         AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
139                 .location(SFTP_LOCATION)
140                 .compression(GZIP_COMPRESSION)
141                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
142                 .fileFormatVersion(FILE_FORMAT_VERSION)
143                 .build();
144         JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder()
145                 .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
146                 .changeType(FILE_READY_CHANGE_TYPE)
147                 .notificationFieldsVersion("1.0")
148                 .addAdditionalField(sftpAdditionalField)
149                 .build();
150         sftpMessage = sftpJsonMessage.toString();
151         sftpFileData = ImmutableFileData.builder()
152                 .productName(PRODUCT_NAME)
153                 .vendorName(VENDOR_NAME)
154                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
155                 .sourceName(SOURCE_NAME)
156                 .startEpochMicrosec(START_EPOCH_MICROSEC)
157                 .timeZoneOffset(TIME_ZONE_OFFSET)
158                 .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
159                 .changeType(FILE_READY_CHANGE_TYPE)
160                 .name(PM_FILE_NAME)
161                 .location(SFTP_LOCATION)
162                 .compression(GZIP_COMPRESSION)
163                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
164                 .fileFormatVersion(FILE_FORMAT_VERSION)
165                 .build();
166
167
168         ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
169                 .productName(PRODUCT_NAME)
170                 .vendorName(VENDOR_NAME)
171                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
172                 .sourceName(SOURCE_NAME)
173                 .startEpochMicrosec(START_EPOCH_MICROSEC)
174                 .timeZoneOffset(TIME_ZONE_OFFSET)
175                 .name(PM_FILE_NAME)
176                 .location(FTPES_LOCATION)
177                 .internalLocation(LOCAL_FILE_LOCATION)
178                 .compression(GZIP_COMPRESSION)
179                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
180                 .fileFormatVersion(FILE_FORMAT_VERSION)
181                 .build();
182         listOfConsumerDmaapModel.add(consumerDmaapModel);
183         //@formatter:on
184     }
185
186     @Test
187     public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
188         prepareMocksForDmaapConsumer("", null);
189
190         StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
191                 .expectError(DmaapEmptyResponseException.class).verify();
192
193         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
194     }
195
196     @Test
197     public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
198         prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
199
200         StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
201
202         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
203         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
204     }
205
206     @Test
207     public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
208         prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
209
210         StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
211
212         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
213         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
214     }
215
216     private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
217         Mono<String> messageAsMono = Mono.just(message);
218         DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
219         dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
220         when(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).thenReturn(messageAsMono);
221
222         if (!message.isEmpty()) {
223             when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
224         } else {
225             when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
226                     .thenReturn(Flux.error(new DmaapEmptyResponseException()));
227         }
228
229         dmaapConsumerTask =
230                 spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
231         when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
232         doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
233     }
234 }