5bd0bf3045e2c2befc21feb6bf0b02fe1e203aa0
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19
20 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
21 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
22 import org.onap.dcaegen2.collectors.datafile.model.FileData;
23 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
24 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
25 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.stereotype.Component;
30
31 import reactor.core.publisher.Flux;
32 import reactor.core.publisher.Mono;
33
34 /**
35  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
36  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
37  */
38 @Component
39 public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
40
41     private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
42
43     private Config datafileAppConfig;
44     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
45     private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
46
47     @Autowired
48     public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
49         this.datafileAppConfig = datafileAppConfig;
50         this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
51     }
52
53     protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
54                                     DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
55                                     DmaapConsumerJsonParser dmaapConsumerJsonParser) {
56         this.datafileAppConfig = datafileAppConfig;
57         this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
58         this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
59     }
60
61     @Override
62     Flux<FileData> consume(Mono<String> message) {
63         logger.trace("consume called with arg {}", message);
64         return dmaapConsumerJsonParser.getJsonObject(message);
65     }
66
67     @Override
68     protected Flux<FileData> execute(String object) {
69         dmaaPConsumerReactiveHttpClient = resolveClient();
70         logger.trace("execute called with arg {}", object);
71         return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
72     }
73
74     @Override
75     void initConfigs() {
76         datafileAppConfig.initFileStreamReader();
77     }
78
79     @Override
80     protected DmaapConsumerConfiguration resolveConfiguration() {
81         return datafileAppConfig.getDmaapConsumerConfiguration();
82     }
83
84     @Override
85     protected DMaaPConsumerReactiveHttpClient resolveClient() {
86         return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
87     }
88 }