c6d115f6098e370f57158d78b337117d5919402c
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.times;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.verifyNoMoreInteractions;
25 import static org.mockito.Mockito.when;
26
27 import java.util.ArrayList;
28 import java.util.List;
29
30 import org.junit.jupiter.api.BeforeAll;
31 import org.junit.jupiter.api.Test;
32 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
33 import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
34 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
35 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
36 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
37 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
38 import org.onap.dcaegen2.collectors.datafile.model.FileData;
39 import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
40 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
41 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
42 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
43 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
44 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
45 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
46 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
47
48 import reactor.core.publisher.Flux;
49 import reactor.core.publisher.Mono;
50 import reactor.test.StepVerifier;
51
52 /**
53  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
54  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
55  */
56 class DmaapConsumerTaskImplTest {
57     private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
58     private static final String PRODUCT_NAME = "NrRadio";
59     private static final String VENDOR_NAME = "Ericsson";
60     private static final String LAST_EPOCH_MICROSEC = "8745745764578";
61     private static final String SOURCE_NAME = "oteNB5309";
62     private static final String START_EPOCH_MICROSEC = "8745745764578";
63     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
64     private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
65     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
66     private static final String FTPES_SCHEME = "ftpes://";
67     private static final String SFTP_SCHEME = "sftp://";
68     private static final String SERVER_ADDRESS = "192.168.0.101";
69     private static final String PORT_22 = "22";
70     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
71     private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
72     private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
73     private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
74     private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
75     private static final String GZIP_COMPRESSION = "gzip";
76     private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
77     private static final String FILE_FORMAT_VERSION = "V10";
78
79     private static List<ConsumerDmaapModel> listOfConsumerDmaapModel = new ArrayList<ConsumerDmaapModel>();
80
81     private static AppConfig appConfig;
82     private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
83     private DmaapConsumerTaskImpl dmaapConsumerTask;
84     private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
85
86     private static String ftpesMessage;
87     private static FileData ftpesFileData;
88
89     private static String sftpMessage;
90     private static FileData sftpFileData;
91
92     @BeforeAll
93     public static void setUp() {
94         //@formatter:off
95         dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
96                 .consumerGroup("OpenDCAE-c12")
97                 .consumerId("c12")
98                 .dmaapContentType("application/json")
99                 .dmaapHostName("54.45.33.2")
100                 .dmaapPortNumber(1234).dmaapProtocol("https")
101                 .dmaapUserName("Datafile")
102                 .dmaapUserPassword("Datafile")
103                 .dmaapTopicName("unauthenticated.NOTIFICATION")
104                 .timeoutMS(-1)
105                 .messageLimit(-1)
106                 .build();
107
108         appConfig = mock(AppConfig.class);
109
110         AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder()
111                 .location(FTPES_LOCATION)
112                 .compression(GZIP_COMPRESSION)
113                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
114                 .fileFormatVersion(FILE_FORMAT_VERSION)
115                 .build();
116
117         JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder()
118                 .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
119                 .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
120                 .changeType(FILE_READY_CHANGE_TYPE)
121                 .notificationFieldsVersion("1.0")
122                 .addAdditionalField(ftpesAdditionalField)
123                 .build();
124
125         ftpesMessage = ftpesJsonMessage.toString();
126         FileMetaData fileMetaData = ImmutableFileMetaData.builder()
127                 .productName(PRODUCT_NAME)
128                 .vendorName(VENDOR_NAME)
129                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
130                 .sourceName(SOURCE_NAME)
131                 .startEpochMicrosec(START_EPOCH_MICROSEC)
132                 .timeZoneOffset(TIME_ZONE_OFFSET)
133                 .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
134                 .changeType(FILE_READY_CHANGE_TYPE)
135                 .build();
136         ftpesFileData = ImmutableFileData.builder()
137                 .fileMetaData(fileMetaData)
138                 .name(PM_FILE_NAME)
139                 .location(FTPES_LOCATION)
140                 .compression(GZIP_COMPRESSION)
141                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
142                 .fileFormatVersion(FILE_FORMAT_VERSION)
143                 .build();
144
145         AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
146                 .location(SFTP_LOCATION)
147                 .compression(GZIP_COMPRESSION)
148                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
149                 .fileFormatVersion(FILE_FORMAT_VERSION)
150                 .build();
151         JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder()
152                 .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
153                 .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
154                 .changeType(FILE_READY_CHANGE_TYPE)
155                 .notificationFieldsVersion("1.0")
156                 .addAdditionalField(sftpAdditionalField)
157                 .build();
158         sftpMessage = sftpJsonMessage.toString();
159         sftpFileData = ImmutableFileData.builder()
160                 .fileMetaData(fileMetaData)
161                 .name(PM_FILE_NAME)
162                 .location(SFTP_LOCATION)
163                 .compression(GZIP_COMPRESSION)
164                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
165                 .fileFormatVersion(FILE_FORMAT_VERSION)
166                 .build();
167
168
169         ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
170                 .productName(PRODUCT_NAME)
171                 .vendorName(VENDOR_NAME)
172                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
173                 .sourceName(SOURCE_NAME)
174                 .startEpochMicrosec(START_EPOCH_MICROSEC)
175                 .timeZoneOffset(TIME_ZONE_OFFSET)
176                 .name(PM_FILE_NAME)
177                 .location(FTPES_LOCATION)
178                 .internalLocation(LOCAL_FILE_LOCATION)
179                 .compression(GZIP_COMPRESSION)
180                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
181                 .fileFormatVersion(FILE_FORMAT_VERSION)
182                 .build();
183         listOfConsumerDmaapModel.add(consumerDmaapModel);
184         //@formatter:on
185     }
186
187     @Test
188     public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
189         prepareMocksForDmaapConsumer("", null);
190
191         StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
192                 .expectError(DmaapEmptyResponseException.class).verify();
193
194         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
195     }
196
197     @Test
198     public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
199         prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
200
201         StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
202
203         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
204         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
205     }
206
207     @Test
208     public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
209         prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
210
211         StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
212
213         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
214         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
215     }
216
217     private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
218         Mono<String> messageAsMono = Mono.just(message);
219         DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
220         dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
221         when(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).thenReturn(messageAsMono);
222
223         if (!message.isEmpty()) {
224             when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
225         } else {
226             when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
227                     .thenReturn(Flux.error(new DmaapEmptyResponseException()));
228         }
229
230         dmaapConsumerTask =
231                 spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
232         when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
233         doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
234     }
235 }