2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.collectors.datafile.tasks;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.ArgumentMatchers.notNull;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
35 import java.nio.file.Paths;
36 import java.time.Duration;
37 import java.util.HashMap;
38 import java.util.LinkedList;
39 import java.util.List;
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.Test;
44 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
45 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
46 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
47 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
48 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
49 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
50 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
51 import org.onap.dcaegen2.collectors.datafile.model.FileData;
52 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
53 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
54 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
55 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
56 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
57 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
58 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
60 import reactor.core.publisher.Flux;
61 import reactor.core.publisher.Mono;
62 import reactor.test.StepVerifier;
64 public class ScheduledTasksTest {
66 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
67 private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
69 private AppConfig appConfig = mock(AppConfig.class);
70 private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
72 private int uniqueValue = 0;
73 private DMaaPMessageConsumer consumerMock;
74 private PublishedChecker publishedCheckerMock;
75 private FileCollector fileCollectorMock;
76 private DataRouterPublisher dataRouterMock;
77 private Map<String, String> contextMap = new HashMap<String, String>();
79 private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
82 private void setUp() throws DatafileTaskException {
83 final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
84 .publishUrl(publishUrl) //
86 .userName("userName") //
87 .passWord("passWord") //
88 .trustStorePath("trustStorePath") //
89 .trustStorePasswordPath("trustStorePasswordPath") //
90 .keyStorePath("keyStorePath") //
91 .keyStorePasswordPath("keyStorePasswordPath") //
92 .enableDmaapCertAuth(true) //
93 .changeIdentifier(CHANGE_IDENTIFIER) //
95 final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
96 .topicUrl("topicUrl").trustStorePath("trustStorePath") //
97 .trustStorePasswordPath("trustStorePasswordPath") //
98 .keyStorePath("keyStorePath") //
99 .keyStorePasswordPath("keyStorePasswordPath") //
100 .enableDmaapCertAuth(true) //
103 doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
104 doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
105 doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
107 consumerMock = mock(DMaaPMessageConsumer.class);
108 publishedCheckerMock = mock(PublishedChecker.class);
109 fileCollectorMock = mock(FileCollector.class);
110 dataRouterMock = mock(DataRouterPublisher.class);
112 doReturn(consumerMock).when(testedObject).createConsumerTask();
113 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
114 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
115 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
118 private MessageMetaData messageMetaData() {
119 return ImmutableMessageMetaData.builder() //
120 .productName("productName") //
122 .lastEpochMicrosec("") //
124 .startEpochMicrosec("") //
125 .timeZoneOffset("") //
126 .changeIdentifier(CHANGE_IDENTIFIER) //
131 private FileData fileData(int instanceNumber) {
132 return ImmutableFileData.builder() //
133 .name("name" + instanceNumber) //
134 .fileFormatType("") //
135 .fileFormatVersion("") //
136 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
137 .scheme(Scheme.FTPS) //
139 .messageMetaData(messageMetaData()) //
143 private List<FileData> files(int size, boolean uniqueNames) {
144 List<FileData> list = new LinkedList<FileData>();
145 for (int i = 0; i < size; ++i) {
149 list.add(fileData(uniqueValue));
154 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
155 return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
158 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
159 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
160 for (int i = 0; i < numberOfEvents; ++i) {
161 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
163 return Flux.fromIterable(list);
166 private FilePublishInformation filePublishInformation() {
167 return ImmutableFilePublishInformation //
171 .lastEpochMicrosec("") //
173 .startEpochMicrosec("") //
174 .timeZoneOffset("") //
177 .internalLocation(Paths.get("internalLocation")) //
179 .fileFormatType("") //
180 .fileFormatVersion("") //
181 .changeIdentifier(CHANGE_IDENTIFIER) //
182 .context(new HashMap<String, String>()).build();
186 public void notingToConsume() throws DatafileTaskException {
187 doReturn(consumerMock).when(testedObject).createConsumerTask();
188 doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
190 testedObject.executeDatafileMainTask();
192 assertEquals(0, testedObject.getCurrentNumberOfTasks());
193 verify(consumerMock, times(1)).getMessageRouterResponse();
194 verifyNoMoreInteractions(consumerMock);
198 public void consume_successfulCase() throws DatafileTaskException {
199 final int noOfEvents = 200;
200 final int noOfFilesPerEvent = 200;
201 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
203 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
204 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
206 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
208 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
209 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
210 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
213 .create(testedObject.createMainTask(contextMap)) //
214 .expectSubscription() //
215 .expectNextCount(noOfFiles) //
219 assertEquals(0, testedObject.getCurrentNumberOfTasks());
220 assertEquals(0, testedObject.getThreadPoolQueueSize());
221 verify(consumerMock, times(1)).getMessageRouterResponse();
222 verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
223 verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
224 verifyNoMoreInteractions(dataRouterMock);
225 verifyNoMoreInteractions(fileCollectorMock);
226 verifyNoMoreInteractions(consumerMock);
230 public void consume_fetchFailedOnce() throws DatafileTaskException {
231 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
232 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
234 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
236 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
237 Mono<Object> error = Mono.error(new Exception("problem"));
239 // First file collect will fail, 3 will succeed
240 doReturn(error, collectedFile, collectedFile, collectedFile) //
241 .when(fileCollectorMock) //
242 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
244 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
245 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
248 .create(testedObject.createMainTask(contextMap)) //
249 .expectSubscription() //
250 .expectNextCount(3) //
254 assertEquals(0, testedObject.getCurrentNumberOfTasks());
255 verify(consumerMock, times(1)).getMessageRouterResponse();
256 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
257 verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
258 verifyNoMoreInteractions(dataRouterMock);
259 verifyNoMoreInteractions(fileCollectorMock);
260 verifyNoMoreInteractions(consumerMock);
264 public void consume_publishFailedOnce() throws DatafileTaskException {
266 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
267 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
269 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
271 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
272 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
274 Mono<Object> error = Mono.error(new Exception("problem"));
275 // One publish will fail, the rest will succeed
276 doReturn(collectedFile, error, collectedFile, collectedFile) //
277 .when(dataRouterMock) //
278 .publishFile(notNull(), anyLong(), notNull());
281 .create(testedObject.createMainTask(contextMap)) //
282 .expectSubscription() //
283 .expectNextCount(3) // 3 completed files
287 assertEquals(0, testedObject.getCurrentNumberOfTasks());
288 verify(consumerMock, times(1)).getMessageRouterResponse();
289 verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
290 verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
291 verifyNoMoreInteractions(dataRouterMock);
292 verifyNoMoreInteractions(fileCollectorMock);
293 verifyNoMoreInteractions(consumerMock);
297 public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
298 final int noOfEvents = 1;
299 final int noOfFilesPerEvent = 100;
301 // 100 files with the same name
302 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
303 doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
305 doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
307 Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
308 doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
309 doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
312 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
313 .expectNextCount(1) // 99 is skipped
317 assertEquals(0, testedObject.getCurrentNumberOfTasks());
318 verify(consumerMock, times(1)).getMessageRouterResponse();
319 verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
320 verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
321 verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
322 verifyNoMoreInteractions(dataRouterMock);
323 verifyNoMoreInteractions(fileCollectorMock);
324 verifyNoMoreInteractions(consumerMock);
325 verifyNoMoreInteractions(dataRouterMock);