2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2019 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.tasks;
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.ArgumentMatchers.anyLong;
22 import static org.mockito.ArgumentMatchers.notNull;
23 import static org.mockito.Mockito.doReturn;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29 import java.time.Duration;
30 import java.util.LinkedList;
31 import java.util.List;
32 import org.junit.jupiter.api.BeforeEach;
33 import org.junit.jupiter.api.Test;
34 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
35 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
36 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
37 import org.onap.dcaegen2.collectors.datafile.model.FileData;
38 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
39 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
40 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
41 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
42 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
43 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
46 import reactor.core.publisher.Flux;
47 import reactor.core.publisher.Mono;
48 import reactor.test.StepVerifier;
50 public class ScheduledTasksTest {
52 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
54 private AppConfig appConfig = mock(AppConfig.class);
55 private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
57 private int uniqueValue = 0;
58 private DMaaPMessageConsumerTask consumerMock;
59 private FileCollector fileCollectorMock;
60 private DataRouterPublisher dataRouterMock;
63 private void setUp() {
64 DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
65 .dmaapContentType("application/json") //
66 .dmaapHostName("54.45.33.2") //
67 .dmaapPortNumber(1234) //
68 .dmaapProtocol("https") //
69 .dmaapUserName("DFC") //
70 .dmaapUserPassword("DFC") //
71 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
72 .trustStorePath("trustStorePath") //
73 .trustStorePasswordPath("trustStorePasswordPath") //
74 .keyStorePath("keyStorePath") //
75 .keyStorePasswordPath("keyStorePasswordPath") //
76 .enableDmaapCertAuth(true) //
79 consumerMock = mock(DMaaPMessageConsumerTask.class);
80 fileCollectorMock = mock(FileCollector.class);
81 dataRouterMock = mock(DataRouterPublisher.class);
83 doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
84 doReturn(consumerMock).when(testedObject).createConsumerTask();
85 doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
86 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
89 private MessageMetaData messageMetaData() {
90 return ImmutableMessageMetaData.builder() //
91 .productName("productName") //
93 .lastEpochMicrosec("") //
95 .startEpochMicrosec("") //
96 .timeZoneOffset("") //
97 .changeIdentifier("") //
102 private FileData fileData(int instanceNumber) {
103 return ImmutableFileData.builder() //
104 .name("name" + instanceNumber) //
105 .fileFormatType("") //
106 .fileFormatVersion("") //
107 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
108 .scheme(Scheme.FTPS) //
113 private List<FileData> files(int size, boolean uniqueNames) {
114 List<FileData> list = new LinkedList<FileData>();
115 for (int i = 0; i < size; ++i) {
119 list.add(fileData(uniqueValue));
124 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
125 MessageMetaData md = messageMetaData();
126 return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
127 .files(files(numberOfFiles, uniqueNames)).build();
130 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
131 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
132 for (int i = 0; i < numberOfEvents; ++i) {
133 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
135 return Flux.fromIterable(list);
138 private ConsumerDmaapModel consumerData() {
139 return ImmutableConsumerDmaapModel //
143 .lastEpochMicrosec("") //
145 .startEpochMicrosec("") //
146 .timeZoneOffset("") //
149 .internalLocation("internalLocation") //
151 .fileFormatType("") //
152 .fileFormatVersion("") //
157 public void notingToConsume() {
158 doReturn(consumerMock).when(testedObject).createConsumerTask();
159 doReturn(Flux.empty()).when(consumerMock).execute();
161 testedObject.scheduleMainDatafileEventTask(any());
163 assertEquals(0, testedObject.getCurrentNumberOfTasks());
164 verify(consumerMock, times(1)).execute();
165 verifyNoMoreInteractions(consumerMock);
169 public void consume_successfulCase() {
170 final int noOfEvents = 200;
171 final int noOfFilesPerEvent = 200;
172 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
174 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
175 doReturn(fileReadyMessages).when(consumerMock).execute();
177 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
178 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
179 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
181 StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
182 .expectNextCount(noOfFiles) //
186 assertEquals(0, testedObject.getCurrentNumberOfTasks());
187 verify(consumerMock, times(1)).execute();
188 verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
189 verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
190 verifyNoMoreInteractions(dataRouterMock);
191 verifyNoMoreInteractions(fileCollectorMock);
192 verifyNoMoreInteractions(consumerMock);
196 public void consume_fetchFailedOnce() {
197 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
198 doReturn(fileReadyMessages).when(consumerMock).execute();
200 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
201 Mono<Object> error = Mono.error(new Exception("problem"));
203 // First file collect will fail, 3 will succeed
204 doReturn(error, collectedFile, collectedFile, collectedFile) //
205 .when(fileCollectorMock) //
206 .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
208 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
209 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
211 StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
212 .expectNextCount(3) //
216 assertEquals(0, testedObject.getCurrentNumberOfTasks());
217 verify(consumerMock, times(1)).execute();
218 verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
219 verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
220 verifyNoMoreInteractions(dataRouterMock);
221 verifyNoMoreInteractions(fileCollectorMock);
222 verifyNoMoreInteractions(consumerMock);
226 public void consume_publishFailedOnce() {
228 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
229 doReturn(fileReadyMessages).when(consumerMock).execute();
231 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
232 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
234 Mono<Object> error = Mono.error(new Exception("problem"));
235 // One publish will fail, the rest will succeed
236 doReturn(collectedFile, error, collectedFile, collectedFile) //
237 .when(dataRouterMock) //
238 .execute(notNull(), anyLong(), notNull(), any());
240 StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
241 .expectNextCount(3) // 3 completed files
245 assertEquals(0, testedObject.getCurrentNumberOfTasks());
246 verify(consumerMock, times(1)).execute();
247 verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
248 verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
249 verifyNoMoreInteractions(dataRouterMock);
250 verifyNoMoreInteractions(fileCollectorMock);
251 verifyNoMoreInteractions(consumerMock);
255 public void consume_successfulCase_sameFileNames() {
256 final int noOfEvents = 1;
257 final int noOfFilesPerEvent = 100;
259 // 100 files with the same name
260 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
261 doReturn(fileReadyMessages).when(consumerMock).execute();
263 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
264 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
265 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
267 StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
268 .expectNextCount(1) // 99 is skipped
272 assertEquals(0, testedObject.getCurrentNumberOfTasks());
273 verify(consumerMock, times(1)).execute();
274 verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
275 verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
276 verifyNoMoreInteractions(dataRouterMock);
277 verifyNoMoreInteractions(fileCollectorMock);
278 verifyNoMoreInteractions(consumerMock);