d781cea3060124a5b25b407c9cc19d81f5e56887
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.ArgumentMatchers.anyLong;
22 import static org.mockito.ArgumentMatchers.anyString;
23 import static org.mockito.ArgumentMatchers.notNull;
24 import static org.mockito.Mockito.doReturn;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.verifyNoMoreInteractions;
30
31 import java.nio.file.Paths;
32 import java.time.Duration;
33 import java.util.LinkedList;
34 import java.util.List;
35
36 import org.junit.jupiter.api.BeforeEach;
37 import org.junit.jupiter.api.Test;
38 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
39 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
40 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
41 import org.onap.dcaegen2.collectors.datafile.model.FileData;
42 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
43 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
44 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
45 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
46 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
47 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
48 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
49 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
50
51 import reactor.core.publisher.Flux;
52 import reactor.core.publisher.Mono;
53 import reactor.test.StepVerifier;
54
55 public class ScheduledTasksTest {
56
57     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
58
59     private AppConfig appConfig = mock(AppConfig.class);
60     private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
61
62     private int uniqueValue = 0;
63     private DMaaPMessageConsumerTask consumerMock;
64     private PublishedChecker publishedCheckerMock;
65     private FileCollector fileCollectorMock;
66     private DataRouterPublisher dataRouterMock;
67
68     @BeforeEach
69     private void setUp() {
70         DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
71                 .dmaapContentType("application/json") //
72                 .dmaapHostName("54.45.33.2") //
73                 .dmaapPortNumber(1234) //
74                 .dmaapProtocol("https") //
75                 .dmaapUserName("DFC") //
76                 .dmaapUserPassword("DFC") //
77                 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
78                 .trustStorePath("trustStorePath") //
79                 .trustStorePasswordPath("trustStorePasswordPath") //
80                 .keyStorePath("keyStorePath") //
81                 .keyStorePasswordPath("keyStorePasswordPath") //
82                 .enableDmaapCertAuth(true) //
83                 .build(); //
84         doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
85
86         consumerMock = mock(DMaaPMessageConsumerTask.class);
87         publishedCheckerMock = mock(PublishedChecker.class);
88         fileCollectorMock = mock(FileCollector.class);
89         dataRouterMock = mock(DataRouterPublisher.class);
90
91         doReturn(consumerMock).when(testedObject).createConsumerTask();
92         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
93         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
94         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
95     }
96
97     private MessageMetaData messageMetaData() {
98         return ImmutableMessageMetaData.builder() //
99                 .productName("productName") //
100                 .vendorName("") //
101                 .lastEpochMicrosec("") //
102                 .sourceName("") //
103                 .startEpochMicrosec("") //
104                 .timeZoneOffset("") //
105                 .changeIdentifier("") //
106                 .changeType("") //
107                 .build();
108     }
109
110     private FileData fileData(int instanceNumber) {
111         return ImmutableFileData.builder() //
112                 .name("name" + instanceNumber) //
113                 .fileFormatType("") //
114                 .fileFormatVersion("") //
115                 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
116                 .scheme(Scheme.FTPS) //
117                 .compression("") //
118                 .build();
119     }
120
121     private List<FileData> files(int size, boolean uniqueNames) {
122         List<FileData> list = new LinkedList<FileData>();
123         for (int i = 0; i < size; ++i) {
124             if (uniqueNames) {
125                 ++uniqueValue;
126             }
127             list.add(fileData(uniqueValue));
128         }
129         return list;
130     }
131
132     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
133         MessageMetaData md = messageMetaData();
134         return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
135                 .files(files(numberOfFiles, uniqueNames)).build();
136     }
137
138     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
139         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
140         for (int i = 0; i < numberOfEvents; ++i) {
141             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
142         }
143         return Flux.fromIterable(list);
144     }
145
146     private ConsumerDmaapModel consumerData() {
147         return ImmutableConsumerDmaapModel //
148                 .builder() //
149                 .productName("") //
150                 .vendorName("") //
151                 .lastEpochMicrosec("") //
152                 .sourceName("") //
153                 .startEpochMicrosec("") //
154                 .timeZoneOffset("") //
155                 .name("") //
156                 .location("") //
157                 .internalLocation(Paths.get("internalLocation")) //
158                 .compression("") //
159                 .fileFormatType("") //
160                 .fileFormatVersion("") //
161                 .build();
162     }
163
164     @Test
165     public void notingToConsume() {
166         doReturn(consumerMock).when(testedObject).createConsumerTask();
167         doReturn(Flux.empty()).when(consumerMock).execute();
168
169         testedObject.scheduleMainDatafileEventTask(any());
170
171         assertEquals(0, testedObject.getCurrentNumberOfTasks());
172         verify(consumerMock, times(1)).execute();
173         verifyNoMoreInteractions(consumerMock);
174     }
175
176     @Test
177     public void consume_successfulCase() {
178         final int noOfEvents = 200;
179         final int noOfFilesPerEvent = 200;
180         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
181
182         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
183         doReturn(fileReadyMessages).when(consumerMock).execute();
184
185         doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
186
187         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
188         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
189         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
190
191         StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
192                 .expectNextCount(noOfFiles) //
193                 .expectComplete() //
194                 .verify(); //
195
196         assertEquals(0, testedObject.getCurrentNumberOfTasks());
197         verify(consumerMock, times(1)).execute();
198         verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
199         verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
200         verifyNoMoreInteractions(dataRouterMock);
201         verifyNoMoreInteractions(fileCollectorMock);
202         verifyNoMoreInteractions(consumerMock);
203     }
204
205     @Test
206     public void consume_fetchFailedOnce() {
207         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
208         doReturn(fileReadyMessages).when(consumerMock).execute();
209
210         doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
211
212         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
213         Mono<Object> error = Mono.error(new Exception("problem"));
214
215         // First file collect will fail, 3 will succeed
216         doReturn(error, collectedFile, collectedFile, collectedFile) //
217                 .when(fileCollectorMock) //
218                 .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
219
220         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
221         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
222
223         StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
224                 .expectNextCount(3) //
225                 .expectComplete() //
226                 .verify(); //
227
228         assertEquals(0, testedObject.getCurrentNumberOfTasks());
229         verify(consumerMock, times(1)).execute();
230         verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
231         verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
232         verifyNoMoreInteractions(dataRouterMock);
233         verifyNoMoreInteractions(fileCollectorMock);
234         verifyNoMoreInteractions(consumerMock);
235     }
236
237     @Test
238     public void consume_publishFailedOnce() {
239
240         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
241         doReturn(fileReadyMessages).when(consumerMock).execute();
242
243         doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
244
245         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
246         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
247
248         Mono<Object> error = Mono.error(new Exception("problem"));
249         // One publish will fail, the rest will succeed
250         doReturn(collectedFile, error, collectedFile, collectedFile) //
251                 .when(dataRouterMock) //
252                 .execute(notNull(), anyLong(), notNull(), any());
253
254         StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
255                 .expectNextCount(3) // 3 completed files
256                 .expectComplete() //
257                 .verify(); //
258
259         assertEquals(0, testedObject.getCurrentNumberOfTasks());
260         verify(consumerMock, times(1)).execute();
261         verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
262         verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
263         verifyNoMoreInteractions(dataRouterMock);
264         verifyNoMoreInteractions(fileCollectorMock);
265         verifyNoMoreInteractions(consumerMock);
266     }
267
268     @Test
269     public void consume_successfulCase_sameFileNames() {
270         final int noOfEvents = 1;
271         final int noOfFilesPerEvent = 100;
272
273         // 100 files with the same name
274         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
275         doReturn(fileReadyMessages).when(consumerMock).execute();
276
277         doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
278
279         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
280         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
281         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
282
283         StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
284                 .expectNextCount(1) // 99 is skipped
285                 .expectComplete() //
286                 .verify(); //
287
288         assertEquals(0, testedObject.getCurrentNumberOfTasks());
289         verify(consumerMock, times(1)).execute();
290         verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
291         verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
292         verifyNoMoreInteractions(dataRouterMock);
293         verifyNoMoreInteractions(fileCollectorMock);
294         verifyNoMoreInteractions(consumerMock);
295     }
296
297
298 }