fdd1bb495b2a51aa4c1fa063f91cc4b31d314501
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import java.util.List;
20
21 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
22 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
23 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
24 import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
25 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
26 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
27 import org.onap.dcaegen2.collectors.datafile.service.FileData;
28 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.stereotype.Component;
33
34 import reactor.core.publisher.Mono;
35
36 /**
37  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
38  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
39  */
40 @Component
41 public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
42
43     private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
44
45     private Config datafileAppConfig;
46     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
47     private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
48     FileCollector fileCollector;
49
50     @Autowired
51     public DmaapConsumerTaskImpl(AppConfig datafileAppConfig, FileCollector fileCollector) {
52         this.datafileAppConfig = datafileAppConfig;
53         this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
54         this.fileCollector = fileCollector;
55     }
56
57     protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
58             DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
59             DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) {
60         this.datafileAppConfig = datafileAppConfig;
61         this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
62         this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
63         this.fileCollector = fileCollector;
64     }
65
66     @Override
67     Mono<List<FileData>> consume(Mono<String> message) {
68         logger.trace("Method called with arg {}", message);
69         return dmaapConsumerJsonParser.getJsonObject(message);
70     }
71
72     private Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) {
73         Mono<List<ConsumerDmaapModel>> filesFromSender = fileCollector.getFilesFromSender(listOfFileData);
74         return filesFromSender;
75     }
76
77     @Override
78     protected Mono<List<ConsumerDmaapModel>> execute(String object) {
79         dmaaPConsumerReactiveHttpClient = resolveClient();
80         logger.trace("Method called with arg {}", object);
81         Mono<List<FileData>> consumerResult =
82                 consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
83         return consumerResult.flatMap(this::getFilesFromSender);
84     }
85
86     @Override
87     void initConfigs() {
88         datafileAppConfig.initFileStreamReader();
89     }
90
91     @Override
92     protected DmaapConsumerConfiguration resolveConfiguration() {
93         return datafileAppConfig.getDmaapConsumerConfiguration();
94     }
95
96     @Override
97     protected DmaapConsumerReactiveHttpClient resolveClient() {
98         return dmaaPConsumerReactiveHttpClient == null
99                 ? new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient())
100                 : dmaaPConsumerReactiveHttpClient;
101     }
102 }