2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.collectors.datafile.tasks;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.ArgumentMatchers.notNull;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
35 import java.nio.file.Paths;
36 import java.time.Duration;
37 import java.util.HashMap;
38 import java.util.LinkedList;
39 import java.util.List;
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.Test;
44 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
45 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
46 import org.onap.dcaegen2.collectors.datafile.model.FileData;
47 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
48 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
49 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
50 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
51 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
52 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
53 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
54 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
55 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
57 import reactor.core.publisher.Flux;
58 import reactor.core.publisher.Mono;
59 import reactor.test.StepVerifier;
61 public class ScheduledTasksTest {
63 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
65 private AppConfig appConfig = mock(AppConfig.class);
66 private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
68 private int uniqueValue = 0;
69 private DMaaPMessageConsumer consumerMock;
70 private PublishedChecker publishedCheckerMock;
71 private FileCollector fileCollectorMock;
72 private DataRouterPublisher dataRouterMock;
73 private Map<String, String> contextMap = new HashMap<String, String>();
76 private void setUp() {
77 DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
78 .dmaapContentType("application/json") //
79 .dmaapHostName("54.45.33.2") //
80 .dmaapPortNumber(1234) //
81 .dmaapProtocol("https") //
82 .dmaapUserName("DFC") //
83 .dmaapUserPassword("DFC") //
84 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
85 .trustStorePath("trustStorePath") //
86 .trustStorePasswordPath("trustStorePasswordPath") //
87 .keyStorePath("keyStorePath") //
88 .keyStorePasswordPath("keyStorePasswordPath") //
89 .enableDmaapCertAuth(true) //
91 doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
93 consumerMock = mock(DMaaPMessageConsumer.class);
94 publishedCheckerMock = mock(PublishedChecker.class);
95 fileCollectorMock = mock(FileCollector.class);
96 dataRouterMock = mock(DataRouterPublisher.class);
98 doReturn(consumerMock).when(testedObject).createConsumerTask();
99 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
100 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
101 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
104 private MessageMetaData messageMetaData() {
105 return ImmutableMessageMetaData.builder() //
106 .productName("productName") //
108 .lastEpochMicrosec("") //
110 .startEpochMicrosec("") //
111 .timeZoneOffset("") //
112 .changeIdentifier("") //
117 private FileData fileData(int instanceNumber) {
118 return ImmutableFileData.builder() //
119 .name("name" + instanceNumber) //
120 .fileFormatType("") //
121 .fileFormatVersion("") //
122 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
123 .scheme(Scheme.FTPS) //
125 .messageMetaData(messageMetaData()) //
129 private List<FileData> files(int size, boolean uniqueNames) {
130 List<FileData> list = new LinkedList<FileData>();
131 for (int i = 0; i < size; ++i) {
135 list.add(fileData(uniqueValue));
140 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
141 return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
144 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
145 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
146 for (int i = 0; i < numberOfEvents; ++i) {
147 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
149 return Flux.fromIterable(list);
152 private FilePublishInformation filePublishInformation() {
153 return ImmutableFilePublishInformation //
157 .lastEpochMicrosec("") //
159 .startEpochMicrosec("") //
160 .timeZoneOffset("") //
163 .internalLocation(Paths.get("internalLocation")) //
165 .fileFormatType("") //
166 .fileFormatVersion("") //
167 .context(new HashMap<String, String>()).build();
171 public void notingToConsume() {
172 doReturn(consumerMock).when(testedObject).createConsumerTask();
173 doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
175 testedObject.executeDatafileMainTask();
177 assertEquals(0, testedObject.getCurrentNumberOfTasks());
178 verify(consumerMock, times(1)).getMessageRouterResponse();
179 verifyNoMoreInteractions(consumerMock);
183 public void consume_successfulCase() {
184 final int noOfEvents = 200;
185 final int noOfFilesPerEvent = 200;
186 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
188 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
189 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
191 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
193 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
194 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
195 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
198 .create(testedObject.createMainTask(contextMap)) //
199 .expectSubscription() //
200 .expectNextCount(noOfFiles) //
204 assertEquals(0, testedObject.getCurrentNumberOfTasks());
205 verify(consumerMock, times(1)).getMessageRouterResponse();
206 verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
207 verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
208 verifyNoMoreInteractions(dataRouterMock);
209 verifyNoMoreInteractions(fileCollectorMock);
210 verifyNoMoreInteractions(consumerMock);
214 public void consume_fetchFailedOnce() {
215 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
216 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
218 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
220 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
221 Mono<Object> error = Mono.error(new Exception("problem"));
223 // First file collect will fail, 3 will succeed
224 doReturn(error, collectedFile, collectedFile, collectedFile) //
225 .when(fileCollectorMock) //
226 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
228 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
229 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
232 .create(testedObject.createMainTask(contextMap)) //
233 .expectSubscription() //
234 .expectNextCount(3) //
238 assertEquals(0, testedObject.getCurrentNumberOfTasks());
239 verify(consumerMock, times(1)).getMessageRouterResponse();
240 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
241 verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
242 verifyNoMoreInteractions(dataRouterMock);
243 verifyNoMoreInteractions(fileCollectorMock);
244 verifyNoMoreInteractions(consumerMock);
248 public void consume_publishFailedOnce() {
250 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
251 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
253 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
255 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
256 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
258 Mono<Object> error = Mono.error(new Exception("problem"));
259 // One publish will fail, the rest will succeed
260 doReturn(collectedFile, error, collectedFile, collectedFile) //
261 .when(dataRouterMock) //
262 .publishFile(notNull(), anyLong(), notNull());
265 .create(testedObject.createMainTask(contextMap)) //
266 .expectSubscription() //
267 .expectNextCount(3) // 3 completed files
271 assertEquals(0, testedObject.getCurrentNumberOfTasks());
272 verify(consumerMock, times(1)).getMessageRouterResponse();
273 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
274 verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
275 verifyNoMoreInteractions(dataRouterMock);
276 verifyNoMoreInteractions(fileCollectorMock);
277 verifyNoMoreInteractions(consumerMock);
281 public void consume_successfulCase_sameFileNames() {
282 final int noOfEvents = 1;
283 final int noOfFilesPerEvent = 100;
285 // 100 files with the same name
286 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
287 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
289 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
291 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
292 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
293 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
296 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
297 .expectNextCount(1) // 99 is skipped
301 assertEquals(0, testedObject.getCurrentNumberOfTasks());
302 verify(consumerMock, times(1)).getMessageRouterResponse();
303 verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
304 verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
305 verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
306 verifyNoMoreInteractions(dataRouterMock);
307 verifyNoMoreInteractions(fileCollectorMock);
308 verifyNoMoreInteractions(consumerMock);
309 verifyNoMoreInteractions(dataRouterMock);