0662216b2daf631e2f1e6c79d1f0c0b75c826b1f
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.ArgumentMatchers.anyLong;
22 import static org.mockito.ArgumentMatchers.notNull;
23 import static org.mockito.Mockito.doReturn;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29
30 import java.time.Duration;
31 import java.util.LinkedList;
32 import java.util.List;
33
34 import org.junit.jupiter.api.BeforeEach;
35 import org.junit.jupiter.api.Test;
36 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
37 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
38 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
39 import org.onap.dcaegen2.collectors.datafile.model.FileData;
40 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
41 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
42 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
43 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
44 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
45 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
46 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
47 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
48
49 import reactor.core.publisher.Flux;
50 import reactor.core.publisher.Mono;
51 import reactor.test.StepVerifier;
52
53 public class ScheduledTasksTest {
54
55     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
56
57     private AppConfig appConfig = mock(AppConfig.class);
58     private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
59
60     private int uniqueValue = 0;
61     private DMaaPMessageConsumerTask consumerMock;
62     private FileCollector fileCollectorMock;
63     private DataRouterPublisher dataRouterMock;
64
65     @BeforeEach
66     private void setUp() {
67         DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
68                 .dmaapContentType("application/json") //
69                 .dmaapHostName("54.45.33.2") //
70                 .dmaapPortNumber(1234) //
71                 .dmaapProtocol("https") //
72                 .dmaapUserName("DFC") //
73                 .dmaapUserPassword("DFC") //
74                 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
75                 .trustStorePath("trustStorePath") //
76                 .trustStorePasswordPath("trustStorePasswordPath") //
77                 .keyStorePath("keyStorePath") //
78                 .keyStorePasswordPath("keyStorePasswordPath") //
79                 .enableDmaapCertAuth(true) //
80                 .build(); //
81
82         consumerMock = mock(DMaaPMessageConsumerTask.class);
83         fileCollectorMock = mock(FileCollector.class);
84         dataRouterMock = mock(DataRouterPublisher.class);
85
86         doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
87         doReturn(consumerMock).when(testedObject).createConsumerTask();
88         doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
89         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
90     }
91
92     private MessageMetaData messageMetaData() {
93         return ImmutableMessageMetaData.builder() //
94                 .productName("productName") //
95                 .vendorName("") //
96                 .lastEpochMicrosec("") //
97                 .sourceName("") //
98                 .startEpochMicrosec("") //
99                 .timeZoneOffset("") //
100                 .changeIdentifier("") //
101                 .changeType("") //
102                 .build();
103     }
104
105     private FileData fileData(int instanceNumber) {
106         return ImmutableFileData.builder() //
107                 .name("name" + instanceNumber) //
108                 .fileFormatType("") //
109                 .fileFormatVersion("") //
110                 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
111                 .scheme(Scheme.FTPS) //
112                 .compression("") //
113                 .build();
114     }
115
116     private List<FileData> files(int size, boolean uniqueNames) {
117         List<FileData> list = new LinkedList<FileData>();
118         for (int i = 0; i < size; ++i) {
119             if (uniqueNames) {
120                 ++uniqueValue;
121             }
122             list.add(fileData(uniqueValue));
123         }
124         return list;
125     }
126
127     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
128         MessageMetaData md = messageMetaData();
129         return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
130                 .files(files(numberOfFiles, uniqueNames)).build();
131     }
132
133     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
134         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
135         for (int i = 0; i < numberOfEvents; ++i) {
136             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
137         }
138         return Flux.fromIterable(list);
139     }
140
141     private ConsumerDmaapModel consumerData() {
142         return ImmutableConsumerDmaapModel //
143                 .builder() //
144                 .productName("") //
145                 .vendorName("") //
146                 .lastEpochMicrosec("") //
147                 .sourceName("") //
148                 .startEpochMicrosec("") //
149                 .timeZoneOffset("") //
150                 .name("") //
151                 .location("") //
152                 .internalLocation("internalLocation") //
153                 .compression("") //
154                 .fileFormatType("") //
155                 .fileFormatVersion("") //
156                 .build();
157     }
158
159     @Test
160     public void notingToConsume() {
161         doReturn(consumerMock).when(testedObject).createConsumerTask();
162         doReturn(Flux.empty()).when(consumerMock).execute();
163
164         testedObject.scheduleMainDatafileEventTask();
165
166         assertEquals(0, testedObject.getCurrentNumberOfTasks());
167         verify(consumerMock, times(1)).execute();
168         verifyNoMoreInteractions(consumerMock);
169     }
170
171     @Test
172     public void consume_successfulCase() {
173         final int noOfEvents = 200;
174         final int noOfFilesPerEvent = 200;
175         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
176
177         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
178         doReturn(fileReadyMessages).when(consumerMock).execute();
179
180         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
181         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
182         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
183
184         StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
185                 .expectNextCount(noOfFiles) //
186                 .expectComplete() //
187                 .verify(); //
188
189         assertEquals(0, testedObject.getCurrentNumberOfTasks());
190         verify(consumerMock, times(1)).execute();
191         verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
192         verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
193         verifyNoMoreInteractions(dataRouterMock);
194         verifyNoMoreInteractions(fileCollectorMock);
195         verifyNoMoreInteractions(consumerMock);
196     }
197
198     @Test
199     public void consume_fetchFailedOnce() {
200         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
201         doReturn(fileReadyMessages).when(consumerMock).execute();
202
203         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
204         Mono<Object> error = Mono.error(new Exception("problem"));
205
206         // First file collect will fail, 3 will succeed
207         doReturn(error, collectedFile, collectedFile, collectedFile) //
208                 .when(fileCollectorMock) //
209                 .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
210
211         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
212         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
213
214         StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
215                 .expectNextCount(3) //
216                 .expectComplete() //
217                 .verify(); //
218
219         assertEquals(0, testedObject.getCurrentNumberOfTasks());
220         verify(consumerMock, times(1)).execute();
221         verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
222         verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
223         verifyNoMoreInteractions(dataRouterMock);
224         verifyNoMoreInteractions(fileCollectorMock);
225         verifyNoMoreInteractions(consumerMock);
226     }
227
228     @Test
229     public void consume_publishFailedOnce() {
230
231         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
232         doReturn(fileReadyMessages).when(consumerMock).execute();
233
234         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
235         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
236
237         Mono<Object> error = Mono.error(new Exception("problem"));
238         // One publish will fail, the rest will succeed
239         doReturn(collectedFile, error, collectedFile, collectedFile) //
240                 .when(dataRouterMock) //
241                 .execute(notNull(), anyLong(), notNull());
242
243         StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
244                 .expectNextCount(3) // 3 completed files
245                 .expectComplete() //
246                 .verify(); //
247
248         assertEquals(0, testedObject.getCurrentNumberOfTasks());
249         verify(consumerMock, times(1)).execute();
250         verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
251         verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
252         verifyNoMoreInteractions(dataRouterMock);
253         verifyNoMoreInteractions(fileCollectorMock);
254         verifyNoMoreInteractions(consumerMock);
255     }
256
257     @Test
258     public void consume_successfulCase_sameFileNames() {
259         final int noOfEvents = 1;
260         final int noOfFilesPerEvent = 100;
261
262         // 100 files with the same name
263         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
264         doReturn(fileReadyMessages).when(consumerMock).execute();
265
266         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
267         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
268         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
269
270         StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
271                 .expectNextCount(1) // 99 is skipped
272                 .expectComplete() //
273                 .verify(); //
274
275         assertEquals(0, testedObject.getCurrentNumberOfTasks());
276         verify(consumerMock, times(1)).execute();
277         verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
278         verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
279         verifyNoMoreInteractions(dataRouterMock);
280         verifyNoMoreInteractions(fileCollectorMock);
281         verifyNoMoreInteractions(consumerMock);
282     }
283
284
285 }