2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2019 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.tasks;
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.ArgumentMatchers.anyLong;
22 import static org.mockito.ArgumentMatchers.anyString;
23 import static org.mockito.ArgumentMatchers.notNull;
24 import static org.mockito.Mockito.doReturn;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.verifyNoMoreInteractions;
31 import java.nio.file.Paths;
32 import java.time.Duration;
33 import java.util.HashMap;
34 import java.util.LinkedList;
35 import java.util.List;
38 import org.junit.jupiter.api.BeforeEach;
39 import org.junit.jupiter.api.Test;
40 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
41 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
42 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
43 import org.onap.dcaegen2.collectors.datafile.model.FileData;
44 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
45 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
46 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
47 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
48 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
49 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
50 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
51 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
55 import reactor.test.StepVerifier;
57 public class ScheduledTasksTest {
59 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
61 private AppConfig appConfig = mock(AppConfig.class);
62 private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
64 private int uniqueValue = 0;
65 private DMaaPMessageConsumerTask consumerMock;
66 private PublishedChecker publishedCheckerMock;
67 private FileCollector fileCollectorMock;
68 private DataRouterPublisher dataRouterMock;
69 private Map<String, String> contextMap = new HashMap<String, String>();
72 private void setUp() {
73 DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
74 .dmaapContentType("application/json") //
75 .dmaapHostName("54.45.33.2") //
76 .dmaapPortNumber(1234) //
77 .dmaapProtocol("https") //
78 .dmaapUserName("DFC") //
79 .dmaapUserPassword("DFC") //
80 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
81 .trustStorePath("trustStorePath") //
82 .trustStorePasswordPath("trustStorePasswordPath") //
83 .keyStorePath("keyStorePath") //
84 .keyStorePasswordPath("keyStorePasswordPath") //
85 .enableDmaapCertAuth(true) //
87 doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
89 consumerMock = mock(DMaaPMessageConsumerTask.class);
90 publishedCheckerMock = mock(PublishedChecker.class);
91 fileCollectorMock = mock(FileCollector.class);
92 dataRouterMock = mock(DataRouterPublisher.class);
94 doReturn(consumerMock).when(testedObject).createConsumerTask();
95 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
96 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
97 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
100 private MessageMetaData messageMetaData() {
101 return ImmutableMessageMetaData.builder() //
102 .productName("productName") //
104 .lastEpochMicrosec("") //
106 .startEpochMicrosec("") //
107 .timeZoneOffset("") //
108 .changeIdentifier("") //
113 private FileData fileData(int instanceNumber) {
114 return ImmutableFileData.builder() //
115 .name("name" + instanceNumber) //
116 .fileFormatType("") //
117 .fileFormatVersion("") //
118 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
119 .scheme(Scheme.FTPS) //
121 .messageMetaData(messageMetaData())
125 private List<FileData> files(int size, boolean uniqueNames) {
126 List<FileData> list = new LinkedList<FileData>();
127 for (int i = 0; i < size; ++i) {
131 list.add(fileData(uniqueValue));
136 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
137 return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
140 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
141 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
142 for (int i = 0; i < numberOfEvents; ++i) {
143 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
145 return Flux.fromIterable(list);
148 private ConsumerDmaapModel consumerData() {
149 return ImmutableConsumerDmaapModel //
153 .lastEpochMicrosec("") //
155 .startEpochMicrosec("") //
156 .timeZoneOffset("") //
159 .internalLocation(Paths.get("internalLocation")) //
161 .fileFormatType("") //
162 .fileFormatVersion("") //
167 public void notingToConsume() {
168 doReturn(consumerMock).when(testedObject).createConsumerTask();
169 doReturn(Flux.empty()).when(consumerMock).execute();
171 testedObject.scheduleMainDatafileEventTask(any());
173 assertEquals(0, testedObject.getCurrentNumberOfTasks());
174 verify(consumerMock, times(1)).execute();
175 verifyNoMoreInteractions(consumerMock);
179 public void consume_successfulCase() {
180 final int noOfEvents = 200;
181 final int noOfFilesPerEvent = 200;
182 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
184 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
185 doReturn(fileReadyMessages).when(consumerMock).execute();
187 doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
189 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
190 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
191 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
193 StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
194 .expectNextCount(noOfFiles) //
198 assertEquals(0, testedObject.getCurrentNumberOfTasks());
199 verify(consumerMock, times(1)).execute();
200 verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull());
201 verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
202 verifyNoMoreInteractions(dataRouterMock);
203 verifyNoMoreInteractions(fileCollectorMock);
204 verifyNoMoreInteractions(consumerMock);
208 public void consume_fetchFailedOnce() {
209 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
210 doReturn(fileReadyMessages).when(consumerMock).execute();
212 doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
214 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
215 Mono<Object> error = Mono.error(new Exception("problem"));
217 // First file collect will fail, 3 will succeed
218 doReturn(error, collectedFile, collectedFile, collectedFile) //
219 .when(fileCollectorMock) //
220 .execute(any(FileData.class), anyLong(), any(Duration.class), notNull());
222 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
223 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
225 StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
226 .expectNextCount(3) //
230 assertEquals(0, testedObject.getCurrentNumberOfTasks());
231 verify(consumerMock, times(1)).execute();
232 verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
233 verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
234 verifyNoMoreInteractions(dataRouterMock);
235 verifyNoMoreInteractions(fileCollectorMock);
236 verifyNoMoreInteractions(consumerMock);
240 public void consume_publishFailedOnce() {
242 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
243 doReturn(fileReadyMessages).when(consumerMock).execute();
245 doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
247 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
248 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
250 Mono<Object> error = Mono.error(new Exception("problem"));
251 // One publish will fail, the rest will succeed
252 doReturn(collectedFile, error, collectedFile, collectedFile) //
253 .when(dataRouterMock) //
254 .execute(notNull(), anyLong(), notNull(), any());
256 StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
257 .expectNextCount(3) // 3 completed files
261 assertEquals(0, testedObject.getCurrentNumberOfTasks());
262 verify(consumerMock, times(1)).execute();
263 verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
264 verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
265 verifyNoMoreInteractions(dataRouterMock);
266 verifyNoMoreInteractions(fileCollectorMock);
267 verifyNoMoreInteractions(consumerMock);
271 public void consume_successfulCase_sameFileNames() {
272 final int noOfEvents = 1;
273 final int noOfFilesPerEvent = 100;
275 // 100 files with the same name
276 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
277 doReturn(fileReadyMessages).when(consumerMock).execute();
279 doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
281 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
282 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
283 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
285 StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
286 .expectNextCount(1) // 99 is skipped
290 assertEquals(0, testedObject.getCurrentNumberOfTasks());
291 verify(consumerMock, times(1)).execute();
292 verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull());
293 verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
294 verifyNoMoreInteractions(dataRouterMock);
295 verifyNoMoreInteractions(fileCollectorMock);
296 verifyNoMoreInteractions(consumerMock);