2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.tasks;
22 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
23 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
24 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
25 import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
26 import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
27 import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
28 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
29 import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
30 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
31 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
32 import org.onap.dcaegen2.collectors.datafile.model.FileData;
33 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Component;
39 import reactor.core.publisher.Flux;
42 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
45 public class XnfCollectorTaskImpl implements XnfCollectorTask {
47 private static final String FTPES = "ftpes";
48 private static final String FTPS = "ftps";
49 private static final String SFTP = "sftp";
50 private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
51 private Config datafileAppConfig;
52 private final FtpsClient ftpsClient;
53 private final SftpClient sftpClient;
54 private RetryTimer retryTimer;
57 protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
58 this.datafileAppConfig = datafileAppConfig;
59 this.ftpsClient = ftpsCleint;
60 this.sftpClient = sftpClient;
64 public Flux<ConsumerDmaapModel> execute(FileData fileData) {
65 logger.trace("Entering execute with {}", fileData);
68 String localFile = collectFile(fileData);
70 if (localFile != null) {
71 ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
72 logger.trace("Exiting execute with {}", consumerDmaapModel);
73 return Flux.just(consumerDmaapModel);
75 logger.trace("Exiting execute with empty");
80 public FtpesConfig resolveConfiguration() {
81 return datafileAppConfig.getFtpesConfiguration();
84 private void resolveKeyStore() {
85 FtpesConfig ftpesConfig = resolveConfiguration();
86 ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
87 ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
88 ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
89 ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
92 private String collectFile(FileData fileData) {
93 logger.trace("starting to collectFile");
94 String location = fileData.location();
95 URI uri = URI.create(location);
96 FileServerData fileServerData = getFileServerData(uri);
97 String remoteFile = uri.getPath();
98 String localFile = "target" + File.separator + fileData.name();
100 FileCollectClient currentClient = selectClient(fileData, uri);
102 if (currentClient != null) {
103 FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
104 if (!fileCollectResult.downloadSuccessful()) {
105 fileCollectResult = retry(fileCollectResult, currentClient);
107 if (!fileCollectResult.downloadSuccessful()) {
109 logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
110 fileServerData, fileCollectResult.getErrorData());
118 private FileServerData getFileServerData(URI uri) {
119 String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
121 return ImmutableFileServerData.builder()
122 .serverAddress(uri.getHost())
123 .userId(userInfo != null ? userInfo[0] : "")
124 .password(userInfo != null ? userInfo[1] : "")
130 private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
131 String[] userInfo = null;
132 if (userInfoString != null && !userInfoString.isEmpty()) {
133 userInfo = userInfoString.split(":");
138 private FileCollectClient selectClient(FileData fileData, URI uri) {
139 FileCollectClient selectedClient = null;
140 String scheme = uri.getScheme();
141 if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
142 selectedClient = ftpsClient;
143 } else if (SFTP.equals(scheme)) {
144 selectedClient = sftpClient;
146 logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
147 FTPES, FTPS, SFTP, fileData);
149 return selectedClient;
152 private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
154 FileCollectResult newResult = fileCollectResult;
155 while (!newResult.downloadSuccessful() && retryCount++ < 3) {
156 getRetryTimer().waitRetryTime();
157 newResult = fileCollectClient.retryCollectFile();
162 private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
163 String productName = fileData.fileMetaData().productName();
164 String vendorName = fileData.fileMetaData().vendorName();
165 String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec();
166 String sourceName = fileData.fileMetaData().sourceName();
167 String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec();
168 String timeZoneOffset = fileData.fileMetaData().timeZoneOffset();
169 String name = fileData.name();
170 String location = fileData.location();
171 String internalLocation = localFile;
172 String compression = fileData.compression();
173 String fileFormatType = fileData.fileFormatType();
174 String fileFormatVersion = fileData.fileFormatVersion();
177 return ImmutableConsumerDmaapModel.builder()
178 .productName(productName)
179 .vendorName(vendorName)
180 .lastEpochMicrosec(lastEpochMicrosec)
181 .sourceName(sourceName)
182 .startEpochMicrosec(startEpochMicrosec)
183 .timeZoneOffset(timeZoneOffset)
186 .internalLocation(internalLocation)
187 .compression(compression)
188 .fileFormatType(fileFormatType)
189 .fileFormatVersion(fileFormatVersion)
194 private RetryTimer getRetryTimer() {
195 if (retryTimer == null) {
196 retryTimer = new RetryTimer();
201 protected void setRetryTimer(RetryTimer retryTimer) {
202 this.retryTimer = retryTimer;