2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.collectors.datafile.tasks;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.ArgumentMatchers.notNull;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
35 import java.nio.file.Paths;
36 import java.time.Duration;
37 import java.util.HashMap;
38 import java.util.LinkedList;
39 import java.util.List;
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.Test;
44 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
45 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
46 import org.onap.dcaegen2.collectors.datafile.model.FileData;
47 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
48 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
49 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
50 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
51 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
52 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
53 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
54 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
55 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
57 import reactor.core.publisher.Flux;
58 import reactor.core.publisher.Mono;
59 import reactor.test.StepVerifier;
61 public class ScheduledTasksTest {
63 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
65 private AppConfig appConfig = mock(AppConfig.class);
66 private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
68 private int uniqueValue = 0;
69 private DMaaPMessageConsumer consumerMock;
70 private PublishedChecker publishedCheckerMock;
71 private FileCollector fileCollectorMock;
72 private DataRouterPublisher dataRouterMock;
73 private Map<String, String> contextMap = new HashMap<String, String>();
76 private void setUp() {
77 DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
78 .dmaapContentType("application/json") //
79 .dmaapHostName("54.45.33.2") //
80 .dmaapPortNumber(1234) //
81 .dmaapProtocol("https") //
82 .dmaapUserName("DFC") //
83 .dmaapUserPassword("DFC") //
84 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
85 .trustStorePath("trustStorePath") //
86 .trustStorePasswordPath("trustStorePasswordPath") //
87 .keyStorePath("keyStorePath") //
88 .keyStorePasswordPath("keyStorePasswordPath") //
89 .enableDmaapCertAuth(true) //
91 doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
93 consumerMock = mock(DMaaPMessageConsumer.class);
94 publishedCheckerMock = mock(PublishedChecker.class);
95 fileCollectorMock = mock(FileCollector.class);
96 dataRouterMock = mock(DataRouterPublisher.class);
98 doReturn(consumerMock).when(testedObject).createConsumerTask();
99 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
100 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
101 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
104 private MessageMetaData messageMetaData() {
105 return ImmutableMessageMetaData.builder() //
106 .productName("productName") //
108 .lastEpochMicrosec("") //
110 .startEpochMicrosec("") //
111 .timeZoneOffset("") //
112 .changeIdentifier("") //
117 private FileData fileData(int instanceNumber) {
118 return ImmutableFileData.builder() //
119 .name("name" + instanceNumber) //
120 .fileFormatType("") //
121 .fileFormatVersion("") //
122 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
123 .scheme(Scheme.FTPS) //
125 .messageMetaData(messageMetaData()) //
129 private List<FileData> files(int size, boolean uniqueNames) {
130 List<FileData> list = new LinkedList<FileData>();
131 for (int i = 0; i < size; ++i) {
135 list.add(fileData(uniqueValue));
140 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
141 return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
144 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
145 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
146 for (int i = 0; i < numberOfEvents; ++i) {
147 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
149 return Flux.fromIterable(list);
152 private FilePublishInformation filePublishInformation() {
153 return ImmutableFilePublishInformation //
157 .lastEpochMicrosec("") //
159 .startEpochMicrosec("") //
160 .timeZoneOffset("") //
163 .internalLocation(Paths.get("internalLocation")) //
165 .fileFormatType("") //
166 .fileFormatVersion("") //
167 .context(new HashMap<String, String>()).build();
171 public void notingToConsume() {
172 doReturn(consumerMock).when(testedObject).createConsumerTask();
173 doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
175 testedObject.executeDatafileMainTask();
177 assertEquals(0, testedObject.getCurrentNumberOfTasks());
178 verify(consumerMock, times(1)).getMessageRouterResponse();
179 verifyNoMoreInteractions(consumerMock);
183 public void consume_successfulCase() {
184 final int noOfEvents = 200;
185 final int noOfFilesPerEvent = 200;
186 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
188 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
189 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
191 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
193 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
194 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
195 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
198 .create(testedObject.createMainTask(contextMap)) //
199 .expectSubscription() //
200 .expectNextCount(noOfFiles) //
204 assertEquals(0, testedObject.getCurrentNumberOfTasks());
205 assertEquals(0, testedObject.getThreadPoolQueueSize());
206 verify(consumerMock, times(1)).getMessageRouterResponse();
207 verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
208 verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
209 verifyNoMoreInteractions(dataRouterMock);
210 verifyNoMoreInteractions(fileCollectorMock);
211 verifyNoMoreInteractions(consumerMock);
215 public void consume_fetchFailedOnce() {
216 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
217 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
219 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
221 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
222 Mono<Object> error = Mono.error(new Exception("problem"));
224 // First file collect will fail, 3 will succeed
225 doReturn(error, collectedFile, collectedFile, collectedFile) //
226 .when(fileCollectorMock) //
227 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
229 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
230 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
233 .create(testedObject.createMainTask(contextMap)) //
234 .expectSubscription() //
235 .expectNextCount(3) //
239 assertEquals(0, testedObject.getCurrentNumberOfTasks());
240 verify(consumerMock, times(1)).getMessageRouterResponse();
241 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
242 verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
243 verifyNoMoreInteractions(dataRouterMock);
244 verifyNoMoreInteractions(fileCollectorMock);
245 verifyNoMoreInteractions(consumerMock);
249 public void consume_publishFailedOnce() {
251 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
252 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
254 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
256 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
257 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
259 Mono<Object> error = Mono.error(new Exception("problem"));
260 // One publish will fail, the rest will succeed
261 doReturn(collectedFile, error, collectedFile, collectedFile) //
262 .when(dataRouterMock) //
263 .publishFile(notNull(), anyLong(), notNull());
266 .create(testedObject.createMainTask(contextMap)) //
267 .expectSubscription() //
268 .expectNextCount(3) // 3 completed files
272 assertEquals(0, testedObject.getCurrentNumberOfTasks());
273 verify(consumerMock, times(1)).getMessageRouterResponse();
274 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
275 verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
276 verifyNoMoreInteractions(dataRouterMock);
277 verifyNoMoreInteractions(fileCollectorMock);
278 verifyNoMoreInteractions(consumerMock);
282 public void consume_successfulCase_sameFileNames() {
283 final int noOfEvents = 1;
284 final int noOfFilesPerEvent = 100;
286 // 100 files with the same name
287 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
288 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
290 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
292 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
293 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
294 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
297 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
298 .expectNextCount(1) // 99 is skipped
302 assertEquals(0, testedObject.getCurrentNumberOfTasks());
303 verify(consumerMock, times(1)).getMessageRouterResponse();
304 verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
305 verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
306 verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
307 verifyNoMoreInteractions(dataRouterMock);
308 verifyNoMoreInteractions(fileCollectorMock);
309 verifyNoMoreInteractions(consumerMock);
310 verifyNoMoreInteractions(dataRouterMock);