2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2019 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.tasks;
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.ArgumentMatchers.anyLong;
22 import static org.mockito.ArgumentMatchers.anyString;
23 import static org.mockito.ArgumentMatchers.notNull;
24 import static org.mockito.Mockito.doReturn;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.verifyNoMoreInteractions;
31 import java.nio.file.Paths;
32 import java.time.Duration;
33 import java.util.LinkedList;
34 import java.util.List;
36 import org.junit.jupiter.api.BeforeEach;
37 import org.junit.jupiter.api.Test;
38 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
39 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
40 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
41 import org.onap.dcaegen2.collectors.datafile.model.FileData;
42 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
43 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
44 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
45 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
46 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
47 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
48 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
49 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
51 import reactor.core.publisher.Flux;
52 import reactor.core.publisher.Mono;
53 import reactor.test.StepVerifier;
55 public class ScheduledTasksTest {
57 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
59 private AppConfig appConfig = mock(AppConfig.class);
60 private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
62 private int uniqueValue = 0;
63 private DMaaPMessageConsumerTask consumerMock;
64 private PublishedChecker publishedCheckerMock;
65 private FileCollector fileCollectorMock;
66 private DataRouterPublisher dataRouterMock;
69 private void setUp() {
70 DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
71 .dmaapContentType("application/json") //
72 .dmaapHostName("54.45.33.2") //
73 .dmaapPortNumber(1234) //
74 .dmaapProtocol("https") //
75 .dmaapUserName("DFC") //
76 .dmaapUserPassword("DFC") //
77 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
78 .trustStorePath("trustStorePath") //
79 .trustStorePasswordPath("trustStorePasswordPath") //
80 .keyStorePath("keyStorePath") //
81 .keyStorePasswordPath("keyStorePasswordPath") //
82 .enableDmaapCertAuth(true) //
84 doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
86 consumerMock = mock(DMaaPMessageConsumerTask.class);
87 publishedCheckerMock = mock(PublishedChecker.class);
88 fileCollectorMock = mock(FileCollector.class);
89 dataRouterMock = mock(DataRouterPublisher.class);
91 doReturn(consumerMock).when(testedObject).createConsumerTask();
92 doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
93 doReturn(fileCollectorMock).when(testedObject).createFileCollector();
94 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
97 private MessageMetaData messageMetaData() {
98 return ImmutableMessageMetaData.builder() //
99 .productName("productName") //
101 .lastEpochMicrosec("") //
103 .startEpochMicrosec("") //
104 .timeZoneOffset("") //
105 .changeIdentifier("") //
110 private FileData fileData(int instanceNumber) {
111 return ImmutableFileData.builder() //
112 .name("name" + instanceNumber) //
113 .fileFormatType("") //
114 .fileFormatVersion("") //
115 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
116 .scheme(Scheme.FTPS) //
121 private List<FileData> files(int size, boolean uniqueNames) {
122 List<FileData> list = new LinkedList<FileData>();
123 for (int i = 0; i < size; ++i) {
127 list.add(fileData(uniqueValue));
132 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
133 MessageMetaData md = messageMetaData();
134 return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
135 .files(files(numberOfFiles, uniqueNames)).build();
138 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
139 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
140 for (int i = 0; i < numberOfEvents; ++i) {
141 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
143 return Flux.fromIterable(list);
146 private ConsumerDmaapModel consumerData() {
147 return ImmutableConsumerDmaapModel //
151 .lastEpochMicrosec("") //
153 .startEpochMicrosec("") //
154 .timeZoneOffset("") //
157 .internalLocation(Paths.get("internalLocation")) //
159 .fileFormatType("") //
160 .fileFormatVersion("") //
165 public void notingToConsume() {
166 doReturn(consumerMock).when(testedObject).createConsumerTask();
167 doReturn(Flux.empty()).when(consumerMock).execute();
169 testedObject.scheduleMainDatafileEventTask(any());
171 assertEquals(0, testedObject.getCurrentNumberOfTasks());
172 verify(consumerMock, times(1)).execute();
173 verifyNoMoreInteractions(consumerMock);
177 public void consume_successfulCase() {
178 final int noOfEvents = 200;
179 final int noOfFilesPerEvent = 200;
180 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
182 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
183 doReturn(fileReadyMessages).when(consumerMock).execute();
185 doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
187 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
188 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
189 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
191 StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
192 .expectNextCount(noOfFiles) //
196 assertEquals(0, testedObject.getCurrentNumberOfTasks());
197 verify(consumerMock, times(1)).execute();
198 verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
199 verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
200 verifyNoMoreInteractions(dataRouterMock);
201 verifyNoMoreInteractions(fileCollectorMock);
202 verifyNoMoreInteractions(consumerMock);
206 public void consume_fetchFailedOnce() {
207 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
208 doReturn(fileReadyMessages).when(consumerMock).execute();
210 doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
212 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
213 Mono<Object> error = Mono.error(new Exception("problem"));
215 // First file collect will fail, 3 will succeed
216 doReturn(error, collectedFile, collectedFile, collectedFile) //
217 .when(fileCollectorMock) //
218 .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
220 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
221 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
223 StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
224 .expectNextCount(3) //
228 assertEquals(0, testedObject.getCurrentNumberOfTasks());
229 verify(consumerMock, times(1)).execute();
230 verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
231 verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
232 verifyNoMoreInteractions(dataRouterMock);
233 verifyNoMoreInteractions(fileCollectorMock);
234 verifyNoMoreInteractions(consumerMock);
238 public void consume_publishFailedOnce() {
240 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
241 doReturn(fileReadyMessages).when(consumerMock).execute();
243 doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
245 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
246 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
248 Mono<Object> error = Mono.error(new Exception("problem"));
249 // One publish will fail, the rest will succeed
250 doReturn(collectedFile, error, collectedFile, collectedFile) //
251 .when(dataRouterMock) //
252 .execute(notNull(), anyLong(), notNull(), any());
254 StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
255 .expectNextCount(3) // 3 completed files
259 assertEquals(0, testedObject.getCurrentNumberOfTasks());
260 verify(consumerMock, times(1)).execute();
261 verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
262 verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
263 verifyNoMoreInteractions(dataRouterMock);
264 verifyNoMoreInteractions(fileCollectorMock);
265 verifyNoMoreInteractions(consumerMock);
269 public void consume_successfulCase_sameFileNames() {
270 final int noOfEvents = 1;
271 final int noOfFilesPerEvent = 100;
273 // 100 files with the same name
274 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
275 doReturn(fileReadyMessages).when(consumerMock).execute();
277 doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
279 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
280 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
281 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
283 StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
284 .expectNextCount(1) // 99 is skipped
288 assertEquals(0, testedObject.getCurrentNumberOfTasks());
289 verify(consumerMock, times(1)).execute();
290 verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
291 verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
292 verifyNoMoreInteractions(dataRouterMock);
293 verifyNoMoreInteractions(fileCollectorMock);
294 verifyNoMoreInteractions(consumerMock);