a1021868d8040016ef25eb9b3c2c2c5fc534d786
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile.tasks;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.ArgumentMatchers.notNull;
28 import static org.mockito.Mockito.doReturn;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
34
35 import java.nio.file.Paths;
36 import java.time.Duration;
37 import java.util.HashMap;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.Test;
44 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
45 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
46 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
47 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
48 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
49 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
50 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
51 import org.onap.dcaegen2.collectors.datafile.model.FileData;
52 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
53 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
54 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
55 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
56 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
57 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
58 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
59
60 import reactor.core.publisher.Flux;
61 import reactor.core.publisher.Mono;
62 import reactor.test.StepVerifier;
63
64 public class ScheduledTasksTest {
65
66     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
67     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
68
69     private AppConfig appConfig = mock(AppConfig.class);
70     private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
71
72     private int uniqueValue = 0;
73     private DMaaPMessageConsumer consumerMock;
74     private PublishedChecker publishedCheckerMock;
75     private FileCollector fileCollectorMock;
76     private DataRouterPublisher dataRouterMock;
77     private Map<String, String> contextMap = new HashMap<String, String>();
78
79     private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
80
81     @BeforeEach
82     private void setUp() throws DatafileTaskException {
83         final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
84                 .publishUrl(publishUrl) //
85                 .logUrl("") //
86                 .userName("userName") //
87                 .passWord("passWord") //
88                 .trustStorePath("trustStorePath") //
89                 .trustStorePasswordPath("trustStorePasswordPath") //
90                 .keyStorePath("keyStorePath") //
91                 .keyStorePasswordPath("keyStorePasswordPath") //
92                 .enableDmaapCertAuth(true) //
93                 .changeIdentifier(CHANGE_IDENTIFIER) //
94                 .build(); //
95         final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
96                 .topicUrl("topicUrl").trustStorePath("trustStorePath") //
97                 .trustStorePasswordPath("trustStorePasswordPath") //
98                 .keyStorePath("keyStorePath") //
99                 .keyStorePasswordPath("keyStorePasswordPath") //
100                 .enableDmaapCertAuth(true) //
101                 .build();
102
103         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
104         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
105         doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
106
107         consumerMock = mock(DMaaPMessageConsumer.class);
108         publishedCheckerMock = mock(PublishedChecker.class);
109         fileCollectorMock = mock(FileCollector.class);
110         dataRouterMock = mock(DataRouterPublisher.class);
111
112         doReturn(consumerMock).when(testedObject).createConsumerTask();
113         doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
114         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
115         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
116     }
117
118     private MessageMetaData messageMetaData() {
119         return ImmutableMessageMetaData.builder() //
120                 .productName("productName") //
121                 .vendorName("") //
122                 .lastEpochMicrosec("") //
123                 .sourceName("") //
124                 .startEpochMicrosec("") //
125                 .timeZoneOffset("") //
126                 .changeIdentifier(CHANGE_IDENTIFIER) //
127                 .changeType("") //
128                 .build();
129     }
130
131     private FileData fileData(int instanceNumber) {
132         return ImmutableFileData.builder() //
133                 .name("name" + instanceNumber) //
134                 .fileFormatType("") //
135                 .fileFormatVersion("") //
136                 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
137                 .scheme(Scheme.FTPS) //
138                 .compression("") //
139                 .messageMetaData(messageMetaData()) //
140                 .build();
141     }
142
143     private List<FileData> files(int size, boolean uniqueNames) {
144         List<FileData> list = new LinkedList<FileData>();
145         for (int i = 0; i < size; ++i) {
146             if (uniqueNames) {
147                 ++uniqueValue;
148             }
149             list.add(fileData(uniqueValue));
150         }
151         return list;
152     }
153
154     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
155         return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build();
156     }
157
158     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
159         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
160         for (int i = 0; i < numberOfEvents; ++i) {
161             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
162         }
163         return Flux.fromIterable(list);
164     }
165
166     private FilePublishInformation filePublishInformation() {
167         return ImmutableFilePublishInformation //
168                 .builder() //
169                 .productName("") //
170                 .vendorName("") //
171                 .lastEpochMicrosec("") //
172                 .sourceName("") //
173                 .startEpochMicrosec("") //
174                 .timeZoneOffset("") //
175                 .name("") //
176                 .location("") //
177                 .internalLocation(Paths.get("internalLocation")) //
178                 .compression("") //
179                 .fileFormatType("") //
180                 .fileFormatVersion("") //
181                 .changeIdentifier(CHANGE_IDENTIFIER) //
182                 .context(new HashMap<String, String>()).build();
183     }
184
185     @Test
186     public void notingToConsume() throws DatafileTaskException {
187         doReturn(consumerMock).when(testedObject).createConsumerTask();
188         doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
189
190         testedObject.executeDatafileMainTask();
191
192         assertEquals(0, testedObject.getCurrentNumberOfTasks());
193         verify(consumerMock, times(1)).getMessageRouterResponse();
194         verifyNoMoreInteractions(consumerMock);
195     }
196
197     @Test
198     public void consume_successfulCase() throws DatafileTaskException {
199         final int noOfEvents = 200;
200         final int noOfFilesPerEvent = 200;
201         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
202
203         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
204         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
205
206         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
207
208         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
209         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
210         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
211
212         StepVerifier //
213                 .create(testedObject.createMainTask(contextMap)) //
214                 .expectSubscription() //
215                 .expectNextCount(noOfFiles) //
216                 .expectComplete() //
217                 .verify(); //
218
219         assertEquals(0, testedObject.getCurrentNumberOfTasks());
220         assertEquals(0, testedObject.getThreadPoolQueueSize());
221         verify(consumerMock, times(1)).getMessageRouterResponse();
222         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
223         verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
224         verifyNoMoreInteractions(dataRouterMock);
225         verifyNoMoreInteractions(fileCollectorMock);
226         verifyNoMoreInteractions(consumerMock);
227     }
228
229     @Test
230     public void consume_fetchFailedOnce() throws DatafileTaskException {
231         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
232         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
233
234         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
235
236         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
237         Mono<Object> error = Mono.error(new Exception("problem"));
238
239         // First file collect will fail, 3 will succeed
240         doReturn(error, collectedFile, collectedFile, collectedFile) //
241                 .when(fileCollectorMock) //
242                 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
243
244         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
245         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
246
247         StepVerifier //
248                 .create(testedObject.createMainTask(contextMap)) //
249                 .expectSubscription() //
250                 .expectNextCount(3) //
251                 .expectComplete() //
252                 .verify(); //
253
254         assertEquals(0, testedObject.getCurrentNumberOfTasks());
255         verify(consumerMock, times(1)).getMessageRouterResponse();
256         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
257         verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
258         verifyNoMoreInteractions(dataRouterMock);
259         verifyNoMoreInteractions(fileCollectorMock);
260         verifyNoMoreInteractions(consumerMock);
261     }
262
263     @Test
264     public void consume_publishFailedOnce() throws DatafileTaskException {
265
266         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
267         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
268
269         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
270
271         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
272         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
273
274         Mono<Object> error = Mono.error(new Exception("problem"));
275         // One publish will fail, the rest will succeed
276         doReturn(collectedFile, error, collectedFile, collectedFile) //
277                 .when(dataRouterMock) //
278                 .publishFile(notNull(), anyLong(), notNull());
279
280         StepVerifier //
281                 .create(testedObject.createMainTask(contextMap)) //
282                 .expectSubscription() //
283                 .expectNextCount(3) // 3 completed files
284                 .expectComplete() //
285                 .verify(); //
286
287         assertEquals(0, testedObject.getCurrentNumberOfTasks());
288         verify(consumerMock, times(1)).getMessageRouterResponse();
289         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
290         verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
291         verifyNoMoreInteractions(dataRouterMock);
292         verifyNoMoreInteractions(fileCollectorMock);
293         verifyNoMoreInteractions(consumerMock);
294     }
295
296     @Test
297     public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
298         final int noOfEvents = 1;
299         final int noOfFilesPerEvent = 100;
300
301         // 100 files with the same name
302         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
303         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
304
305         doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
306
307         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
308         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
309         doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
310
311         StepVerifier //
312                 .create(testedObject.createMainTask(contextMap)).expectSubscription() //
313                 .expectNextCount(1) // 99 is skipped
314                 .expectComplete() //
315                 .verify(); //
316
317         assertEquals(0, testedObject.getCurrentNumberOfTasks());
318         verify(consumerMock, times(1)).getMessageRouterResponse();
319         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
320         verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
321         verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
322         verifyNoMoreInteractions(dataRouterMock);
323         verifyNoMoreInteractions(fileCollectorMock);
324         verifyNoMoreInteractions(consumerMock);
325         verifyNoMoreInteractions(dataRouterMock);
326     }
327 }