09908f136ab32ed5ca354b0105db533448363fd9
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.ArgumentMatchers.anyLong;
22 import static org.mockito.ArgumentMatchers.anyString;
23 import static org.mockito.ArgumentMatchers.notNull;
24 import static org.mockito.Mockito.doReturn;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.verifyNoMoreInteractions;
30
31 import java.nio.file.Paths;
32 import java.time.Duration;
33 import java.util.HashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Map;
37
38 import org.junit.jupiter.api.BeforeEach;
39 import org.junit.jupiter.api.Test;
40 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
41 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
42 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
43 import org.onap.dcaegen2.collectors.datafile.model.FileData;
44 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
45 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
46 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
47 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
48 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
49 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
50 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
51 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
52
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
55 import reactor.test.StepVerifier;
56
57 public class ScheduledTasksTest {
58
59     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
60
61     private AppConfig appConfig = mock(AppConfig.class);
62     private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
63
64     private int uniqueValue = 0;
65     private DMaaPMessageConsumerTask consumerMock;
66     private PublishedChecker publishedCheckerMock;
67     private FileCollector fileCollectorMock;
68     private DataRouterPublisher dataRouterMock;
69     private Map<String, String> contextMap = new HashMap<String, String>();
70
71     @BeforeEach
72     private void setUp() {
73         DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
74                 .dmaapContentType("application/json") //
75                 .dmaapHostName("54.45.33.2") //
76                 .dmaapPortNumber(1234) //
77                 .dmaapProtocol("https") //
78                 .dmaapUserName("DFC") //
79                 .dmaapUserPassword("DFC") //
80                 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
81                 .trustStorePath("trustStorePath") //
82                 .trustStorePasswordPath("trustStorePasswordPath") //
83                 .keyStorePath("keyStorePath") //
84                 .keyStorePasswordPath("keyStorePasswordPath") //
85                 .enableDmaapCertAuth(true) //
86                 .build(); //
87         doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
88
89         consumerMock = mock(DMaaPMessageConsumerTask.class);
90         publishedCheckerMock = mock(PublishedChecker.class);
91         fileCollectorMock = mock(FileCollector.class);
92         dataRouterMock = mock(DataRouterPublisher.class);
93
94         doReturn(consumerMock).when(testedObject).createConsumerTask();
95         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
96         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
97         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
98     }
99
100     private MessageMetaData messageMetaData() {
101         return ImmutableMessageMetaData.builder() //
102                 .productName("productName") //
103                 .vendorName("") //
104                 .lastEpochMicrosec("") //
105                 .sourceName("") //
106                 .startEpochMicrosec("") //
107                 .timeZoneOffset("") //
108                 .changeIdentifier("") //
109                 .changeType("") //
110                 .build();
111     }
112
113     private FileData fileData(int instanceNumber) {
114         return ImmutableFileData.builder() //
115                 .name("name" + instanceNumber) //
116                 .fileFormatType("") //
117                 .fileFormatVersion("") //
118                 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
119                 .scheme(Scheme.FTPS) //
120                 .compression("") //
121                 .messageMetaData(messageMetaData())
122                 .build();
123     }
124
125     private List<FileData> files(int size, boolean uniqueNames) {
126         List<FileData> list = new LinkedList<FileData>();
127         for (int i = 0; i < size; ++i) {
128             if (uniqueNames) {
129                 ++uniqueValue;
130             }
131             list.add(fileData(uniqueValue));
132         }
133         return list;
134     }
135
136     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
137         return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
138     }
139
140     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
141         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
142         for (int i = 0; i < numberOfEvents; ++i) {
143             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
144         }
145         return Flux.fromIterable(list);
146     }
147
148     private ConsumerDmaapModel consumerData() {
149         return ImmutableConsumerDmaapModel //
150                 .builder() //
151                 .productName("") //
152                 .vendorName("") //
153                 .lastEpochMicrosec("") //
154                 .sourceName("") //
155                 .startEpochMicrosec("") //
156                 .timeZoneOffset("") //
157                 .name("") //
158                 .location("") //
159                 .internalLocation(Paths.get("internalLocation")) //
160                 .compression("") //
161                 .fileFormatType("") //
162                 .fileFormatVersion("") //
163                 .build();
164     }
165
166     @Test
167     public void notingToConsume() {
168         doReturn(consumerMock).when(testedObject).createConsumerTask();
169         doReturn(Flux.empty()).when(consumerMock).execute();
170
171         testedObject.executeDatafileMainTask();
172
173         assertEquals(0, testedObject.getCurrentNumberOfTasks());
174         verify(consumerMock, times(1)).execute();
175         verifyNoMoreInteractions(consumerMock);
176     }
177
178     @Test
179     public void consume_successfulCase() {
180         final int noOfEvents = 200;
181         final int noOfFilesPerEvent = 200;
182         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
183
184         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
185         doReturn(fileReadyMessages).when(consumerMock).execute();
186
187         doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
188
189         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
190         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
191         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
192
193         StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
194                 .expectNextCount(noOfFiles) //
195                 .expectComplete() //
196                 .verify(); //
197
198         assertEquals(0, testedObject.getCurrentNumberOfTasks());
199         verify(consumerMock, times(1)).execute();
200         verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull());
201         verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
202         verifyNoMoreInteractions(dataRouterMock);
203         verifyNoMoreInteractions(fileCollectorMock);
204         verifyNoMoreInteractions(consumerMock);
205     }
206
207     @Test
208     public void consume_fetchFailedOnce() {
209         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
210         doReturn(fileReadyMessages).when(consumerMock).execute();
211
212         doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
213
214         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
215         Mono<Object> error = Mono.error(new Exception("problem"));
216
217         // First file collect will fail, 3 will succeed
218         doReturn(error, collectedFile, collectedFile, collectedFile) //
219                 .when(fileCollectorMock) //
220                 .execute(any(FileData.class), anyLong(), any(Duration.class), notNull());
221
222         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
223         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
224
225         StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
226                 .expectNextCount(3) //
227                 .expectComplete() //
228                 .verify(); //
229
230         assertEquals(0, testedObject.getCurrentNumberOfTasks());
231         verify(consumerMock, times(1)).execute();
232         verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
233         verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
234         verifyNoMoreInteractions(dataRouterMock);
235         verifyNoMoreInteractions(fileCollectorMock);
236         verifyNoMoreInteractions(consumerMock);
237     }
238
239     @Test
240     public void consume_publishFailedOnce() {
241
242         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
243         doReturn(fileReadyMessages).when(consumerMock).execute();
244
245         doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
246
247         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
248         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
249
250         Mono<Object> error = Mono.error(new Exception("problem"));
251         // One publish will fail, the rest will succeed
252         doReturn(collectedFile, error, collectedFile, collectedFile) //
253                 .when(dataRouterMock) //
254                 .execute(notNull(), anyLong(), notNull(), any());
255
256         StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
257                 .expectNextCount(3) // 3 completed files
258                 .expectComplete() //
259                 .verify(); //
260
261         assertEquals(0, testedObject.getCurrentNumberOfTasks());
262         verify(consumerMock, times(1)).execute();
263         verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull());
264         verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
265         verifyNoMoreInteractions(dataRouterMock);
266         verifyNoMoreInteractions(fileCollectorMock);
267         verifyNoMoreInteractions(consumerMock);
268     }
269
270     @Test
271     public void consume_successfulCase_sameFileNames() {
272         final int noOfEvents = 1;
273         final int noOfFilesPerEvent = 100;
274
275         // 100 files with the same name
276         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
277         doReturn(fileReadyMessages).when(consumerMock).execute();
278
279         doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
280
281         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
282         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull());
283         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
284
285         StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
286                 .expectNextCount(1) // 99 is skipped
287                 .expectComplete() //
288                 .verify(); //
289
290         assertEquals(0, testedObject.getCurrentNumberOfTasks());
291         verify(consumerMock, times(1)).execute();
292         verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull());
293         verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
294         verifyNoMoreInteractions(dataRouterMock);
295         verifyNoMoreInteractions(fileCollectorMock);
296         verifyNoMoreInteractions(consumerMock);
297     }
298
299
300 }