2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.collectors.datafile.tasks;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.ArgumentMatchers.notNull;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
35 import java.nio.file.Paths;
36 import java.time.Duration;
37 import java.util.HashMap;
38 import java.util.LinkedList;
39 import java.util.List;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
44 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
45 import org.onap.dcaegen2.collectors.datafile.model.FileData;
46 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
47 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
48 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
49 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
50 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
51 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
52 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
53 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
54 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
55 import reactor.core.publisher.Flux;
56 import reactor.core.publisher.Mono;
57 import reactor.test.StepVerifier;
59 public class ScheduledTasksTest {
61 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
63 private AppConfig appConfig = mock(AppConfig.class);
64 private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
66 private int uniqueValue = 0;
67 private DMaaPMessageConsumer consumerMock;
68 private PublishedChecker publishedCheckerMock;
69 private FileCollector fileCollectorMock;
70 private DataRouterPublisher dataRouterMock;
71 private Map<String, String> contextMap = new HashMap<String, String>();
74 private void setUp() {
75 DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
76 .dmaapContentType("application/json") //
77 .dmaapHostName("54.45.33.2") //
78 .dmaapPortNumber(1234) //
79 .dmaapProtocol("https") //
80 .dmaapUserName("DFC") //
81 .dmaapUserPassword("DFC") //
82 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
83 .trustStorePath("trustStorePath") //
84 .trustStorePasswordPath("trustStorePasswordPath") //
85 .keyStorePath("keyStorePath") //
86 .keyStorePasswordPath("keyStorePasswordPath") //
87 .enableDmaapCertAuth(true) //
89 doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
91 consumerMock = mock(DMaaPMessageConsumer.class);
92 publishedCheckerMock = mock(PublishedChecker.class);
93 fileCollectorMock = mock(FileCollector.class);
94 dataRouterMock = mock(DataRouterPublisher.class);
96 doReturn(consumerMock).when(testedObject).createConsumerTask();
97 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
98 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
99 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
102 private MessageMetaData messageMetaData() {
103 return ImmutableMessageMetaData.builder() //
104 .productName("productName") //
106 .lastEpochMicrosec("") //
108 .startEpochMicrosec("") //
109 .timeZoneOffset("") //
110 .changeIdentifier("") //
115 private FileData fileData(int instanceNumber) {
116 return ImmutableFileData.builder() //
117 .name("name" + instanceNumber) //
118 .fileFormatType("") //
119 .fileFormatVersion("") //
120 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
121 .scheme(Scheme.FTPS) //
123 .messageMetaData(messageMetaData()) //
127 private List<FileData> files(int size, boolean uniqueNames) {
128 List<FileData> list = new LinkedList<FileData>();
129 for (int i = 0; i < size; ++i) {
133 list.add(fileData(uniqueValue));
138 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
139 return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
142 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
143 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
144 for (int i = 0; i < numberOfEvents; ++i) {
145 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
147 return Flux.fromIterable(list);
150 private FilePublishInformation filePublishInformation() {
151 return ImmutableFilePublishInformation //
155 .lastEpochMicrosec("") //
157 .startEpochMicrosec("") //
158 .timeZoneOffset("") //
161 .internalLocation(Paths.get("internalLocation")) //
163 .fileFormatType("") //
164 .fileFormatVersion("") //
169 public void notingToConsume() {
170 doReturn(consumerMock).when(testedObject).createConsumerTask();
171 doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
173 testedObject.executeDatafileMainTask();
175 assertEquals(0, testedObject.getCurrentNumberOfTasks());
176 verify(consumerMock, times(1)).getMessageRouterResponse();
177 verifyNoMoreInteractions(consumerMock);
181 public void consume_successfulCase() {
182 final int noOfEvents = 200;
183 final int noOfFilesPerEvent = 200;
184 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
186 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
187 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
189 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
191 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
192 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
193 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
195 StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
196 .expectNextCount(noOfFiles) //
200 assertEquals(0, testedObject.getCurrentNumberOfTasks());
201 verify(consumerMock, times(1)).getMessageRouterResponse();
202 verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
203 verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull(), any());
204 verifyNoMoreInteractions(dataRouterMock);
205 verifyNoMoreInteractions(fileCollectorMock);
206 verifyNoMoreInteractions(consumerMock);
210 public void consume_fetchFailedOnce() {
211 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
212 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
214 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
216 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
217 Mono<Object> error = Mono.error(new Exception("problem"));
219 // First file collect will fail, 3 will succeed
220 doReturn(error, collectedFile, collectedFile, collectedFile) //
221 .when(fileCollectorMock) //
222 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
224 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
225 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
227 StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
228 .expectNextCount(3) //
232 assertEquals(0, testedObject.getCurrentNumberOfTasks());
233 verify(consumerMock, times(1)).getMessageRouterResponse();
234 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
235 verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull(), any());
236 verifyNoMoreInteractions(dataRouterMock);
237 verifyNoMoreInteractions(fileCollectorMock);
238 verifyNoMoreInteractions(consumerMock);
242 public void consume_publishFailedOnce() {
244 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
245 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
247 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
249 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
250 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
252 Mono<Object> error = Mono.error(new Exception("problem"));
253 // One publish will fail, the rest will succeed
254 doReturn(collectedFile, error, collectedFile, collectedFile) //
255 .when(dataRouterMock) //
256 .publishFile(notNull(), anyLong(), notNull(), any());
258 StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
259 .expectNextCount(3) // 3 completed files
263 assertEquals(0, testedObject.getCurrentNumberOfTasks());
264 verify(consumerMock, times(1)).getMessageRouterResponse();
265 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
266 verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull(), any());
267 verifyNoMoreInteractions(dataRouterMock);
268 verifyNoMoreInteractions(fileCollectorMock);
269 verifyNoMoreInteractions(consumerMock);
273 public void consume_successfulCase_sameFileNames() {
274 final int noOfEvents = 1;
275 final int noOfFilesPerEvent = 100;
277 // 100 files with the same name
278 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
279 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
281 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
283 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
284 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
285 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
287 StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
288 .expectNextCount(1) // 99 is skipped
292 assertEquals(0, testedObject.getCurrentNumberOfTasks());
293 verify(consumerMock, times(1)).getMessageRouterResponse();
294 verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
295 verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any());
296 verifyNoMoreInteractions(dataRouterMock);
297 verifyNoMoreInteractions(fileCollectorMock);
298 verifyNoMoreInteractions(consumerMock);