8a572be49847d41891f3808b2339305f5341a2db
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import static org.junit.jupiter.api.Assertions.assertEquals;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.ArgumentMatchers.anyLong;
22 import static org.mockito.ArgumentMatchers.notNull;
23 import static org.mockito.Mockito.doReturn;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29 import java.time.Duration;
30 import java.util.LinkedList;
31 import java.util.List;
32 import org.junit.jupiter.api.BeforeEach;
33 import org.junit.jupiter.api.Test;
34 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
35 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
36 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
37 import org.onap.dcaegen2.collectors.datafile.model.FileData;
38 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
39 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
40 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
41 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
42 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
43 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
46 import reactor.core.publisher.Flux;
47 import reactor.core.publisher.Mono;
48 import reactor.test.StepVerifier;
49
50 public class ScheduledTasksTest {
51
52     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
53
54     private AppConfig appConfig = mock(AppConfig.class);
55     private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
56
57     private int uniqueValue = 0;
58     private DMaaPMessageConsumerTask consumerMock;
59     private FileCollector fileCollectorMock;
60     private DataRouterPublisher dataRouterMock;
61
62     @BeforeEach
63     private void setUp() {
64         DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
65                 .dmaapContentType("application/json") //
66                 .dmaapHostName("54.45.33.2") //
67                 .dmaapPortNumber(1234) //
68                 .dmaapProtocol("https") //
69                 .dmaapUserName("DFC") //
70                 .dmaapUserPassword("DFC") //
71                 .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
72                 .trustStorePath("trustStorePath") //
73                 .trustStorePasswordPath("trustStorePasswordPath") //
74                 .keyStorePath("keyStorePath") //
75                 .keyStorePasswordPath("keyStorePasswordPath") //
76                 .enableDmaapCertAuth(true) //
77                 .build(); //
78
79         consumerMock = mock(DMaaPMessageConsumerTask.class);
80         fileCollectorMock = mock(FileCollector.class);
81         dataRouterMock = mock(DataRouterPublisher.class);
82
83         doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
84         doReturn(consumerMock).when(testedObject).createConsumerTask();
85         doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
86         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
87     }
88
89     private MessageMetaData messageMetaData() {
90         return ImmutableMessageMetaData.builder() //
91                 .productName("productName") //
92                 .vendorName("") //
93                 .lastEpochMicrosec("") //
94                 .sourceName("") //
95                 .startEpochMicrosec("") //
96                 .timeZoneOffset("") //
97                 .changeIdentifier("") //
98                 .changeType("") //
99                 .build();
100     }
101
102     private FileData fileData(int instanceNumber) {
103         return ImmutableFileData.builder() //
104                 .name("name" + instanceNumber) //
105                 .fileFormatType("") //
106                 .fileFormatVersion("") //
107                 .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
108                 .scheme(Scheme.FTPS) //
109                 .compression("") //
110                 .build();
111     }
112
113     private List<FileData> files(int size, boolean uniqueNames) {
114         List<FileData> list = new LinkedList<FileData>();
115         for (int i = 0; i < size; ++i) {
116             if (uniqueNames) {
117                 ++uniqueValue;
118             }
119             list.add(fileData(uniqueValue));
120         }
121         return list;
122     }
123
124     private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
125         MessageMetaData md = messageMetaData();
126         return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
127                 .files(files(numberOfFiles, uniqueNames)).build();
128     }
129
130     private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
131         List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
132         for (int i = 0; i < numberOfEvents; ++i) {
133             list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
134         }
135         return Flux.fromIterable(list);
136     }
137
138     private ConsumerDmaapModel consumerData() {
139         return ImmutableConsumerDmaapModel //
140                 .builder() //
141                 .productName("") //
142                 .vendorName("") //
143                 .lastEpochMicrosec("") //
144                 .sourceName("") //
145                 .startEpochMicrosec("") //
146                 .timeZoneOffset("") //
147                 .name("") //
148                 .location("") //
149                 .internalLocation("internalLocation") //
150                 .compression("") //
151                 .fileFormatType("") //
152                 .fileFormatVersion("") //
153                 .build();
154     }
155
156     @Test
157     public void notingToConsume() {
158         doReturn(consumerMock).when(testedObject).createConsumerTask();
159         doReturn(Flux.empty()).when(consumerMock).execute();
160
161         testedObject.scheduleMainDatafileEventTask(any());
162
163         assertEquals(0, testedObject.getCurrentNumberOfTasks());
164         verify(consumerMock, times(1)).execute();
165         verifyNoMoreInteractions(consumerMock);
166     }
167
168     @Test
169     public void consume_successfulCase() {
170         final int noOfEvents = 200;
171         final int noOfFilesPerEvent = 200;
172         final int noOfFiles = noOfEvents * noOfFilesPerEvent;
173
174         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
175         doReturn(fileReadyMessages).when(consumerMock).execute();
176
177         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
178         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
179         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
180
181         StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
182                 .expectNextCount(noOfFiles) //
183                 .expectComplete() //
184                 .verify(); //
185
186         assertEquals(0, testedObject.getCurrentNumberOfTasks());
187         verify(consumerMock, times(1)).execute();
188         verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
189         verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
190         verifyNoMoreInteractions(dataRouterMock);
191         verifyNoMoreInteractions(fileCollectorMock);
192         verifyNoMoreInteractions(consumerMock);
193     }
194
195     @Test
196     public void consume_fetchFailedOnce() {
197         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
198         doReturn(fileReadyMessages).when(consumerMock).execute();
199
200         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
201         Mono<Object> error = Mono.error(new Exception("problem"));
202
203         // First file collect will fail, 3 will succeed
204         doReturn(error, collectedFile, collectedFile, collectedFile) //
205                 .when(fileCollectorMock) //
206                 .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
207
208         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
209         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
210
211         StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
212                 .expectNextCount(3) //
213                 .expectComplete() //
214                 .verify(); //
215
216         assertEquals(0, testedObject.getCurrentNumberOfTasks());
217         verify(consumerMock, times(1)).execute();
218         verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
219         verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
220         verifyNoMoreInteractions(dataRouterMock);
221         verifyNoMoreInteractions(fileCollectorMock);
222         verifyNoMoreInteractions(consumerMock);
223     }
224
225     @Test
226     public void consume_publishFailedOnce() {
227
228         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
229         doReturn(fileReadyMessages).when(consumerMock).execute();
230
231         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
232         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
233
234         Mono<Object> error = Mono.error(new Exception("problem"));
235         // One publish will fail, the rest will succeed
236         doReturn(collectedFile, error, collectedFile, collectedFile) //
237                 .when(dataRouterMock) //
238                 .execute(notNull(), anyLong(), notNull(), any());
239
240         StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
241                 .expectNextCount(3) // 3 completed files
242                 .expectComplete() //
243                 .verify(); //
244
245         assertEquals(0, testedObject.getCurrentNumberOfTasks());
246         verify(consumerMock, times(1)).execute();
247         verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
248         verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
249         verifyNoMoreInteractions(dataRouterMock);
250         verifyNoMoreInteractions(fileCollectorMock);
251         verifyNoMoreInteractions(consumerMock);
252     }
253
254     @Test
255     public void consume_successfulCase_sameFileNames() {
256         final int noOfEvents = 1;
257         final int noOfFilesPerEvent = 100;
258
259         // 100 files with the same name
260         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
261         doReturn(fileReadyMessages).when(consumerMock).execute();
262
263         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
264         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
265         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
266
267         StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
268                 .expectNextCount(1) // 99 is skipped
269                 .expectComplete() //
270                 .verify(); //
271
272         assertEquals(0, testedObject.getCurrentNumberOfTasks());
273         verify(consumerMock, times(1)).execute();
274         verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
275         verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
276         verifyNoMoreInteractions(dataRouterMock);
277         verifyNoMoreInteractions(fileCollectorMock);
278         verifyNoMoreInteractions(consumerMock);
279     }
280
281
282 }