1a7db424d6bfdcb9aa25822d50171d8f576b84db
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile.tasks;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.ArgumentMatchers.notNull;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
34
35 import java.nio.file.Paths;
36 import java.time.Duration;
37 import java.util.HashMap;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.Test;
44 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
45 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
46 import org.onap.dcaegen2.collectors.datafile.model.FileData;
47 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
48 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
49 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
50 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
51 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
52 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
53 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
54 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
55 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
56
57 import reactor.core.publisher.Flux;
58 import reactor.core.publisher.Mono;
59 import reactor.test.StepVerifier;
60
61 public class ScheduledTasksTest {
62
63     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
64
65     private AppConfig appConfig = mock(AppConfig.class);
66     private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
67
68     private int uniqueValue = 0;
69     private DMaaPMessageConsumer consumerMock;
70     private PublishedChecker publishedCheckerMock;
71     private FileCollector fileCollectorMock;
72     private DataRouterPublisher dataRouterMock;
73     private Map<String, String> contextMap = new HashMap<String, String>();
74
75     @BeforeEach
76     private void setUp() {
77         DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
78                 .dmaapContentType("application/json") //
79                 .dmaapHostName("54.45.33.2") //
80                 .dmaapPortNumber(1234) //
81                 .dmaapProtocol("https") //
82                 .dmaapUserName("DFC") //
83                 .dmaapUserPassword("DFC") //
84                 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
85                 .trustStorePath("trustStorePath") //
86                 .trustStorePasswordPath("trustStorePasswordPath") //
87                 .keyStorePath("keyStorePath") //
88                 .keyStorePasswordPath("keyStorePasswordPath") //
89                 .enableDmaapCertAuth(true) //
90                 .build(); //
91         doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
92
93         consumerMock = mock(DMaaPMessageConsumer.class);
94         publishedCheckerMock = mock(PublishedChecker.class);
95         fileCollectorMock = mock(FileCollector.class);
96         dataRouterMock = mock(DataRouterPublisher.class);
97
98         doReturn(consumerMock).when(testedObject).createConsumerTask();
99         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
100         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
101         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
102     }
103
104     private MessageMetaData messageMetaData() {
105         return ImmutableMessageMetaData.builder() //
106                 .productName("productName") //
107                 .vendorName("") //
108                 .lastEpochMicrosec("") //
109                 .sourceName("") //
110                 .startEpochMicrosec("") //
111                 .timeZoneOffset("") //
112                 .changeIdentifier("") //
113                 .changeType("") //
114                 .build();
115     }
116
117     private FileData fileData(int instanceNumber) {
118         return ImmutableFileData.builder() //
119                 .name("name" + instanceNumber) //
120                 .fileFormatType("") //
121                 .fileFormatVersion("") //
122                 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
123                 .scheme(Scheme.FTPS) //
124                 .compression("") //
125                 .messageMetaData(messageMetaData()) //
126                 .build();
127     }
128
129     private List<FileData> files(int size, boolean uniqueNames) {
130         List<FileData> list = new LinkedList<FileData>();
131         for (int i = 0; i < size; ++i) {
132             if (uniqueNames) {
133                 ++uniqueValue;
134             }
135             list.add(fileData(uniqueValue));
136         }
137         return list;
138     }
139
140     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
141         return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
142     }
143
144     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
145         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
146         for (int i = 0; i < numberOfEvents; ++i) {
147             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
148         }
149         return Flux.fromIterable(list);
150     }
151
152     private FilePublishInformation filePublishInformation() {
153         return ImmutableFilePublishInformation //
154                 .builder() //
155                 .productName("") //
156                 .vendorName("") //
157                 .lastEpochMicrosec("") //
158                 .sourceName("") //
159                 .startEpochMicrosec("") //
160                 .timeZoneOffset("") //
161                 .name("") //
162                 .location("") //
163                 .internalLocation(Paths.get("internalLocation")) //
164                 .compression("") //
165                 .fileFormatType("") //
166                 .fileFormatVersion("") //
167                 .context(new HashMap<String, String>()).build();
168     }
169
170     @Test
171     public void notingToConsume() {
172         doReturn(consumerMock).when(testedObject).createConsumerTask();
173         doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
174
175         testedObject.executeDatafileMainTask();
176
177         assertEquals(0, testedObject.getCurrentNumberOfTasks());
178         verify(consumerMock, times(1)).getMessageRouterResponse();
179         verifyNoMoreInteractions(consumerMock);
180     }
181
182     @Test
183     public void consume_successfulCase() {
184         final int noOfEvents = 200;
185         final int noOfFilesPerEvent = 200;
186         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
187
188         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
189         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
190
191         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
192
193         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
194         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
195         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
196
197         StepVerifier //
198                 .create(testedObject.createMainTask(contextMap)) //
199                 .expectSubscription() //
200                 .expectNextCount(noOfFiles) //
201                 .expectComplete() //
202                 .verify(); //
203
204         assertEquals(0, testedObject.getCurrentNumberOfTasks());
205         verify(consumerMock, times(1)).getMessageRouterResponse();
206         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
207         verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
208         verifyNoMoreInteractions(dataRouterMock);
209         verifyNoMoreInteractions(fileCollectorMock);
210         verifyNoMoreInteractions(consumerMock);
211     }
212
213     @Test
214     public void consume_fetchFailedOnce() {
215         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
216         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
217
218         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
219
220         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
221         Mono<Object> error = Mono.error(new Exception("problem"));
222
223         // First file collect will fail, 3 will succeed
224         doReturn(error, collectedFile, collectedFile, collectedFile) //
225                 .when(fileCollectorMock) //
226                 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
227
228         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
229         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
230
231         StepVerifier //
232                 .create(testedObject.createMainTask(contextMap)) //
233                 .expectSubscription() //
234                 .expectNextCount(3) //
235                 .expectComplete() //
236                 .verify(); //
237
238         assertEquals(0, testedObject.getCurrentNumberOfTasks());
239         verify(consumerMock, times(1)).getMessageRouterResponse();
240         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
241         verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
242         verifyNoMoreInteractions(dataRouterMock);
243         verifyNoMoreInteractions(fileCollectorMock);
244         verifyNoMoreInteractions(consumerMock);
245     }
246
247     @Test
248     public void consume_publishFailedOnce() {
249
250         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
251         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
252
253         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
254
255         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
256         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
257
258         Mono<Object> error = Mono.error(new Exception("problem"));
259         // One publish will fail, the rest will succeed
260         doReturn(collectedFile, error, collectedFile, collectedFile) //
261                 .when(dataRouterMock) //
262                 .publishFile(notNull(), anyLong(), notNull());
263
264         StepVerifier //
265                 .create(testedObject.createMainTask(contextMap)) //
266                 .expectSubscription() //
267                 .expectNextCount(3) // 3 completed files
268                 .expectComplete() //
269                 .verify(); //
270
271         assertEquals(0, testedObject.getCurrentNumberOfTasks());
272         verify(consumerMock, times(1)).getMessageRouterResponse();
273         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
274         verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
275         verifyNoMoreInteractions(dataRouterMock);
276         verifyNoMoreInteractions(fileCollectorMock);
277         verifyNoMoreInteractions(consumerMock);
278     }
279
280     @Test
281     public void consume_successfulCase_sameFileNames() {
282         final int noOfEvents = 1;
283         final int noOfFilesPerEvent = 100;
284
285         // 100 files with the same name
286         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
287         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
288
289         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
290
291         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
292         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
293         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
294
295         StepVerifier //
296                 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
297                 .expectNextCount(1) // 99 is skipped
298                 .expectComplete() //
299                 .verify(); //
300
301         assertEquals(0, testedObject.getCurrentNumberOfTasks());
302         verify(consumerMock, times(1)).getMessageRouterResponse();
303         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
304         verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
305         verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
306         verifyNoMoreInteractions(dataRouterMock);
307         verifyNoMoreInteractions(fileCollectorMock);
308         verifyNoMoreInteractions(consumerMock);
309         verifyNoMoreInteractions(dataRouterMock);
310     }
311 }