2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2019 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.tasks;
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.ArgumentMatchers.anyLong;
22 import static org.mockito.ArgumentMatchers.notNull;
23 import static org.mockito.Mockito.doReturn;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
30 import java.time.Duration;
31 import java.util.LinkedList;
32 import java.util.List;
34 import org.junit.jupiter.api.BeforeEach;
35 import org.junit.jupiter.api.Test;
36 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
37 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
38 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
39 import org.onap.dcaegen2.collectors.datafile.model.FileData;
40 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
41 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
42 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
43 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
44 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
45 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
46 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
47 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
49 import reactor.core.publisher.Flux;
50 import reactor.core.publisher.Mono;
51 import reactor.test.StepVerifier;
53 public class ScheduledTasksTest {
55 private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
57 private AppConfig appConfig = mock(AppConfig.class);
58 private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
60 private int uniqueValue = 0;
61 private DMaaPMessageConsumerTask consumerMock;
62 private FileCollector fileCollectorMock;
63 private DataRouterPublisher dataRouterMock;
66 private void setUp() {
67 DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
68 .dmaapContentType("application/json") //
69 .dmaapHostName("54.45.33.2") //
70 .dmaapPortNumber(1234) //
71 .dmaapProtocol("https") //
72 .dmaapUserName("DFC") //
73 .dmaapUserPassword("DFC") //
74 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
75 .trustStorePath("trustStorePath") //
76 .trustStorePasswordPath("trustStorePasswordPath") //
77 .keyStorePath("keyStorePath") //
78 .keyStorePasswordPath("keyStorePasswordPath") //
79 .enableDmaapCertAuth(true) //
82 consumerMock = mock(DMaaPMessageConsumerTask.class);
83 fileCollectorMock = mock(FileCollector.class);
84 dataRouterMock = mock(DataRouterPublisher.class);
86 doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
87 doReturn(consumerMock).when(testedObject).createConsumerTask();
88 doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
89 doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
92 private MessageMetaData messageMetaData() {
93 return ImmutableMessageMetaData.builder() //
94 .productName("productName") //
96 .lastEpochMicrosec("") //
98 .startEpochMicrosec("") //
99 .timeZoneOffset("") //
100 .changeIdentifier("") //
105 private FileData fileData(int instanceNumber) {
106 return ImmutableFileData.builder() //
107 .name("name" + instanceNumber) //
108 .fileFormatType("") //
109 .fileFormatVersion("") //
110 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
111 .scheme(Scheme.FTPS) //
116 private List<FileData> files(int size, boolean uniqueNames) {
117 List<FileData> list = new LinkedList<FileData>();
118 for (int i = 0; i < size; ++i) {
122 list.add(fileData(uniqueValue));
127 private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
128 MessageMetaData md = messageMetaData();
129 return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
130 .files(files(numberOfFiles, uniqueNames)).build();
133 private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
134 List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
135 for (int i = 0; i < numberOfEvents; ++i) {
136 list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
138 return Flux.fromIterable(list);
141 private ConsumerDmaapModel consumerData() {
142 return ImmutableConsumerDmaapModel //
146 .lastEpochMicrosec("") //
148 .startEpochMicrosec("") //
149 .timeZoneOffset("") //
152 .internalLocation("internalLocation") //
154 .fileFormatType("") //
155 .fileFormatVersion("") //
160 public void notingToConsume() {
161 doReturn(consumerMock).when(testedObject).createConsumerTask();
162 doReturn(Flux.empty()).when(consumerMock).execute();
164 testedObject.scheduleMainDatafileEventTask();
166 assertEquals(0, testedObject.getCurrentNumberOfTasks());
167 verify(consumerMock, times(1)).execute();
168 verifyNoMoreInteractions(consumerMock);
172 public void consume_successfulCase() {
173 final int noOfEvents = 200;
174 final int noOfFilesPerEvent = 200;
175 final int noOfFiles = noOfEvents * noOfFilesPerEvent;
177 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
178 doReturn(fileReadyMessages).when(consumerMock).execute();
180 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
181 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
182 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
184 StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
185 .expectNextCount(noOfFiles) //
189 assertEquals(0, testedObject.getCurrentNumberOfTasks());
190 verify(consumerMock, times(1)).execute();
191 verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
192 verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
193 verifyNoMoreInteractions(dataRouterMock);
194 verifyNoMoreInteractions(fileCollectorMock);
195 verifyNoMoreInteractions(consumerMock);
199 public void consume_fetchFailedOnce() {
200 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
201 doReturn(fileReadyMessages).when(consumerMock).execute();
203 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
204 Mono<Object> error = Mono.error(new Exception("problem"));
206 // First file collect will fail, 3 will succeed
207 doReturn(error, collectedFile, collectedFile, collectedFile) //
208 .when(fileCollectorMock) //
209 .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
211 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
212 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
214 StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
215 .expectNextCount(3) //
219 assertEquals(0, testedObject.getCurrentNumberOfTasks());
220 verify(consumerMock, times(1)).execute();
221 verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
222 verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
223 verifyNoMoreInteractions(dataRouterMock);
224 verifyNoMoreInteractions(fileCollectorMock);
225 verifyNoMoreInteractions(consumerMock);
229 public void consume_publishFailedOnce() {
231 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
232 doReturn(fileReadyMessages).when(consumerMock).execute();
234 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
235 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
237 Mono<Object> error = Mono.error(new Exception("problem"));
238 // One publish will fail, the rest will succeed
239 doReturn(collectedFile, error, collectedFile, collectedFile) //
240 .when(dataRouterMock) //
241 .execute(notNull(), anyLong(), notNull());
243 StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
244 .expectNextCount(3) // 3 completed files
248 assertEquals(0, testedObject.getCurrentNumberOfTasks());
249 verify(consumerMock, times(1)).execute();
250 verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
251 verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
252 verifyNoMoreInteractions(dataRouterMock);
253 verifyNoMoreInteractions(fileCollectorMock);
254 verifyNoMoreInteractions(consumerMock);
258 public void consume_successfulCase_sameFileNames() {
259 final int noOfEvents = 1;
260 final int noOfFilesPerEvent = 100;
262 // 100 files with the same name
263 Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
264 doReturn(fileReadyMessages).when(consumerMock).execute();
266 Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
267 doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
268 doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
270 StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
271 .expectNextCount(1) // 99 is skipped
275 assertEquals(0, testedObject.getCurrentNumberOfTasks());
276 verify(consumerMock, times(1)).execute();
277 verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
278 verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
279 verifyNoMoreInteractions(dataRouterMock);
280 verifyNoMoreInteractions(fileCollectorMock);
281 verifyNoMoreInteractions(consumerMock);