c03d903a31e5e68ebe77b23490e8cc93b1bfe4d1
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import java.io.File;
20 import java.net.URI;
21
22 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
23 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
24 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
25 import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
26 import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
27 import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
28 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
29 import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
30 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
31 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
32 import org.onap.dcaegen2.collectors.datafile.model.FileData;
33 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Component;
38
39 import reactor.core.publisher.Flux;
40
41 /**
42  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
43  */
44 @Component
45 public class XnfCollectorTaskImpl implements XnfCollectorTask {
46
47     private static final String FTPES = "ftpes";
48     private static final String FTPS = "ftps";
49     private static final String SFTP = "sftp";
50     private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
51     private Config datafileAppConfig;
52     private final FtpsClient ftpsClient;
53     private final SftpClient sftpClient;
54     private RetryTimer retryTimer;
55
56     @Autowired
57     protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
58         this.datafileAppConfig = datafileAppConfig;
59         this.ftpsClient = ftpsCleint;
60         this.sftpClient = sftpClient;
61     }
62
63     @Override
64     public Flux<ConsumerDmaapModel> execute(FileData fileData) {
65         logger.trace("Entering execute with {}", fileData);
66         resolveKeyStore();
67
68         String localFile = collectFile(fileData);
69
70         if (localFile != null) {
71             ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
72             logger.trace("Exiting execute with {}", consumerDmaapModel);
73             return Flux.just(consumerDmaapModel);
74         }
75         logger.trace("Exiting execute with empty");
76         return Flux.empty();
77     }
78
79     @Override
80     public FtpesConfig resolveConfiguration() {
81         return datafileAppConfig.getFtpesConfiguration();
82     }
83
84     private void resolveKeyStore() {
85         FtpesConfig ftpesConfig = resolveConfiguration();
86         ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
87         ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
88         ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
89         ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
90     }
91
92     private String collectFile(FileData fileData) {
93         logger.trace("starting to collectFile");
94         String location = fileData.location();
95         URI uri = URI.create(location);
96         FileServerData fileServerData = getFileServerData(uri);
97         String remoteFile = uri.getPath();
98         String localFile = "target" + File.separator + fileData.name();
99
100         FileCollectClient currentClient = selectClient(fileData, uri);
101
102         if (currentClient != null) {
103             FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
104             if (!fileCollectResult.downloadSuccessful()) {
105                 fileCollectResult = retry(fileCollectResult, currentClient);
106             }
107             if (!fileCollectResult.downloadSuccessful()) {
108                 localFile = null;
109                 logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
110                         fileServerData, fileCollectResult.getErrorData());
111             }
112         } else {
113             localFile = null;
114         }
115         return localFile;
116     }
117
118     private FileServerData getFileServerData(URI uri) {
119         String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
120         // @formatter:off
121         return ImmutableFileServerData.builder()
122                 .serverAddress(uri.getHost())
123                 .userId(userInfo != null ? userInfo[0] : "")
124                 .password(userInfo != null ? userInfo[1] : "")
125                 .port(uri.getPort())
126                 .build();
127         // @formatter:on
128     }
129
130     private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
131         String[] userInfo = null;
132         if (userInfoString != null && !userInfoString.isEmpty()) {
133             userInfo = userInfoString.split(":");
134         }
135         return userInfo;
136     }
137
138     private FileCollectClient selectClient(FileData fileData, URI uri) {
139         FileCollectClient selectedClient = null;
140         String scheme = uri.getScheme();
141         if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
142             selectedClient = ftpsClient;
143         } else if (SFTP.equals(scheme)) {
144             selectedClient = sftpClient;
145         } else {
146             logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
147                     FTPES, FTPS, SFTP, fileData);
148         }
149         return selectedClient;
150     }
151
152     private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
153         int retryCount = 1;
154         FileCollectResult newResult = fileCollectResult;
155         while (!newResult.downloadSuccessful() && retryCount++ < 3) {
156             getRetryTimer().waitRetryTime();
157             newResult = fileCollectClient.retryCollectFile();
158         }
159         return newResult;
160     }
161
162     private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
163         String productName = fileData.fileMetaData().productName();
164         String vendorName = fileData.fileMetaData().vendorName();
165         String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec();
166         String sourceName = fileData.fileMetaData().sourceName();
167         String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec();
168         String timeZoneOffset = fileData.fileMetaData().timeZoneOffset();
169         String name = fileData.name();
170         String location = fileData.location();
171         String internalLocation = localFile;
172         String compression = fileData.compression();
173         String fileFormatType = fileData.fileFormatType();
174         String fileFormatVersion = fileData.fileFormatVersion();
175
176         // @formatter:off
177         return ImmutableConsumerDmaapModel.builder()
178                 .productName(productName)
179                 .vendorName(vendorName)
180                 .lastEpochMicrosec(lastEpochMicrosec)
181                 .sourceName(sourceName)
182                 .startEpochMicrosec(startEpochMicrosec)
183                 .timeZoneOffset(timeZoneOffset)
184                 .name(name)
185                 .location(location)
186                 .internalLocation(internalLocation)
187                 .compression(compression)
188                 .fileFormatType(fileFormatType)
189                 .fileFormatVersion(fileFormatVersion)
190                 .build();
191         // @formatter:on
192     }
193
194     private RetryTimer getRetryTimer() {
195         if (retryTimer == null) {
196             retryTimer = new RetryTimer();
197         }
198         return retryTimer;
199     }
200
201     protected void setRetryTimer(RetryTimer retryTimer) {
202         this.retryTimer = retryTimer;
203     }
204 }