24bb759f5c72ab15bff47ef5a48cf1b6ff5ff1bd
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile.tasks;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.ArgumentMatchers.notNull;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
34
35 import java.nio.file.Paths;
36 import java.time.Duration;
37 import java.util.HashMap;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
44 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
45 import org.onap.dcaegen2.collectors.datafile.model.FileData;
46 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
47 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
48 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
49 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
50 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
51 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
52 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
53 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
54 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
55 import reactor.core.publisher.Flux;
56 import reactor.core.publisher.Mono;
57 import reactor.test.StepVerifier;
58
59 public class ScheduledTasksTest {
60
61     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
62
63     private AppConfig appConfig = mock(AppConfig.class);
64     private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
65
66     private int uniqueValue = 0;
67     private DMaaPMessageConsumer consumerMock;
68     private PublishedChecker publishedCheckerMock;
69     private FileCollector fileCollectorMock;
70     private DataRouterPublisher dataRouterMock;
71     private Map<String, String> contextMap = new HashMap<String, String>();
72
73     @BeforeEach
74     private void setUp() {
75         DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
76                 .dmaapContentType("application/json") //
77                 .dmaapHostName("54.45.33.2") //
78                 .dmaapPortNumber(1234) //
79                 .dmaapProtocol("https") //
80                 .dmaapUserName("DFC") //
81                 .dmaapUserPassword("DFC") //
82                 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
83                 .trustStorePath("trustStorePath") //
84                 .trustStorePasswordPath("trustStorePasswordPath") //
85                 .keyStorePath("keyStorePath") //
86                 .keyStorePasswordPath("keyStorePasswordPath") //
87                 .enableDmaapCertAuth(true) //
88                 .build(); //
89         doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
90
91         consumerMock = mock(DMaaPMessageConsumer.class);
92         publishedCheckerMock = mock(PublishedChecker.class);
93         fileCollectorMock = mock(FileCollector.class);
94         dataRouterMock = mock(DataRouterPublisher.class);
95
96         doReturn(consumerMock).when(testedObject).createConsumerTask();
97         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
98         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
99         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
100     }
101
102     private MessageMetaData messageMetaData() {
103         return ImmutableMessageMetaData.builder() //
104                 .productName("productName") //
105                 .vendorName("") //
106                 .lastEpochMicrosec("") //
107                 .sourceName("") //
108                 .startEpochMicrosec("") //
109                 .timeZoneOffset("") //
110                 .changeIdentifier("") //
111                 .changeType("") //
112                 .build();
113     }
114
115     private FileData fileData(int instanceNumber) {
116         return ImmutableFileData.builder() //
117                 .name("name" + instanceNumber) //
118                 .fileFormatType("") //
119                 .fileFormatVersion("") //
120                 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
121                 .scheme(Scheme.FTPS) //
122                 .compression("") //
123                 .messageMetaData(messageMetaData()) //
124                 .build();
125     }
126
127     private List<FileData> files(int size, boolean uniqueNames) {
128         List<FileData> list = new LinkedList<FileData>();
129         for (int i = 0; i < size; ++i) {
130             if (uniqueNames) {
131                 ++uniqueValue;
132             }
133             list.add(fileData(uniqueValue));
134         }
135         return list;
136     }
137
138     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
139         return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
140     }
141
142     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
143         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
144         for (int i = 0; i < numberOfEvents; ++i) {
145             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
146         }
147         return Flux.fromIterable(list);
148     }
149
150     private FilePublishInformation filePublishInformation() {
151         return ImmutableFilePublishInformation //
152                 .builder() //
153                 .productName("") //
154                 .vendorName("") //
155                 .lastEpochMicrosec("") //
156                 .sourceName("") //
157                 .startEpochMicrosec("") //
158                 .timeZoneOffset("") //
159                 .name("") //
160                 .location("") //
161                 .internalLocation(Paths.get("internalLocation")) //
162                 .compression("") //
163                 .fileFormatType("") //
164                 .fileFormatVersion("") //
165                 .build();
166     }
167
168     @Test
169     public void notingToConsume() {
170         doReturn(consumerMock).when(testedObject).createConsumerTask();
171         doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
172
173         testedObject.executeDatafileMainTask();
174
175         assertEquals(0, testedObject.getCurrentNumberOfTasks());
176         verify(consumerMock, times(1)).getMessageRouterResponse();
177         verifyNoMoreInteractions(consumerMock);
178     }
179
180     @Test
181     public void consume_successfulCase() {
182         final int noOfEvents = 200;
183         final int noOfFilesPerEvent = 200;
184         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
185
186         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
187         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
188
189         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
190
191         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
192         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
193         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
194
195         StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
196                 .expectNextCount(noOfFiles) //
197                 .expectComplete() //
198                 .verify(); //
199
200         assertEquals(0, testedObject.getCurrentNumberOfTasks());
201         verify(consumerMock, times(1)).getMessageRouterResponse();
202         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
203         verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull(), any());
204         verifyNoMoreInteractions(dataRouterMock);
205         verifyNoMoreInteractions(fileCollectorMock);
206         verifyNoMoreInteractions(consumerMock);
207     }
208
209     @Test
210     public void consume_fetchFailedOnce() {
211         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
212         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
213
214         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
215
216         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
217         Mono<Object> error = Mono.error(new Exception("problem"));
218
219         // First file collect will fail, 3 will succeed
220         doReturn(error, collectedFile, collectedFile, collectedFile) //
221                 .when(fileCollectorMock) //
222                 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
223
224         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
225         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
226
227         StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
228                 .expectNextCount(3) //
229                 .expectComplete() //
230                 .verify(); //
231
232         assertEquals(0, testedObject.getCurrentNumberOfTasks());
233         verify(consumerMock, times(1)).getMessageRouterResponse();
234         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
235         verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull(), any());
236         verifyNoMoreInteractions(dataRouterMock);
237         verifyNoMoreInteractions(fileCollectorMock);
238         verifyNoMoreInteractions(consumerMock);
239     }
240
241     @Test
242     public void consume_publishFailedOnce() {
243
244         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
245         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
246
247         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
248
249         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
250         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
251
252         Mono<Object> error = Mono.error(new Exception("problem"));
253         // One publish will fail, the rest will succeed
254         doReturn(collectedFile, error, collectedFile, collectedFile) //
255                 .when(dataRouterMock) //
256                 .publishFile(notNull(), anyLong(), notNull(), any());
257
258         StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
259                 .expectNextCount(3) // 3 completed files
260                 .expectComplete() //
261                 .verify(); //
262
263         assertEquals(0, testedObject.getCurrentNumberOfTasks());
264         verify(consumerMock, times(1)).getMessageRouterResponse();
265         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
266         verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull(), any());
267         verifyNoMoreInteractions(dataRouterMock);
268         verifyNoMoreInteractions(fileCollectorMock);
269         verifyNoMoreInteractions(consumerMock);
270     }
271
272     @Test
273     public void consume_successfulCase_sameFileNames() {
274         final int noOfEvents = 1;
275         final int noOfFilesPerEvent = 100;
276
277         // 100 files with the same name
278         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
279         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
280
281         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
282
283         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
284         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
285         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
286
287         StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
288                 .expectNextCount(1) // 99 is skipped
289                 .expectComplete() //
290                 .verify(); //
291
292         assertEquals(0, testedObject.getCurrentNumberOfTasks());
293         verify(consumerMock, times(1)).getMessageRouterResponse();
294         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
295         verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any());
296         verifyNoMoreInteractions(dataRouterMock);
297         verifyNoMoreInteractions(fileCollectorMock);
298         verifyNoMoreInteractions(consumerMock);
299     }
300 }