be6ac9ccdbba10e62858ae0d440384a9197d55bb
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import java.io.File;
20 import java.net.URI;
21
22 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
23 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
24 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
25 import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
26 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
27 import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
28 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
29 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
30 import org.onap.dcaegen2.collectors.datafile.model.FileData;
31 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.stereotype.Component;
36
37 import reactor.core.publisher.Flux;
38
39 /**
40  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
41  */
42 @Component
43 public class XnfCollectorTaskImpl implements XnfCollectorTask {
44
45     private static final String FTPES = "ftpes";
46     private static final String FTPS = "ftps";
47     private static final String SFTP = "sftp";
48
49     private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
50     private Config datafileAppConfig;
51
52     private final FtpsClient ftpsClient;
53     private final SftpClient sftpClient;
54
55     @Autowired
56     protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
57         this.datafileAppConfig = datafileAppConfig;
58         this.ftpsClient = ftpsCleint;
59         this.sftpClient = sftpClient;
60     }
61
62     @Override
63     public Flux<ConsumerDmaapModel> execute(FileData fileData) {
64         logger.trace("Entering execute with {}", fileData);
65         resolveKeyStore();
66
67         String localFile = collectFile(fileData);
68
69         if (localFile != null) {
70             ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
71             logger.trace("Exiting execute with {}", consumerDmaapModel);
72             return Flux.just(consumerDmaapModel);
73         }
74         logger.trace("Exiting execute with empty");
75         return Flux.empty();
76     }
77
78     @Override
79     public FtpesConfig resolveConfiguration() {
80         return datafileAppConfig.getFtpesConfiguration();
81     }
82
83     private void resolveKeyStore() {
84         FtpesConfig ftpesConfig = resolveConfiguration();
85         ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
86         ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
87         ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
88         ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
89     }
90
91     private String collectFile(FileData fileData) {
92         logger.trace("starting to collectFile");
93         String location = fileData.location();
94         URI uri = URI.create(location);
95         FileServerData fileServerData = getFileServerData(uri);
96         String remoteFile = uri.getPath();
97         String localFile = "target" + File.separator + fileData.name();
98         String scheme = uri.getScheme();
99         boolean fileDownloaded = false;
100         if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
101             fileDownloaded = ftpsClient.collectFile(fileServerData, remoteFile, localFile);
102         } else if (SFTP.equals(scheme)) {
103             fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile);
104         } else {
105
106             logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
107                     FTPES, FTPS, SFTP, fileData);
108             localFile = null;
109         }
110         if (!fileDownloaded) {
111             localFile = null;
112         }
113         return localFile;
114     }
115
116     private FileServerData getFileServerData(URI uri) {
117         String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
118         return ImmutableFileServerData.builder().serverAddress(uri.getHost())
119                 .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "")
120                 .port(uri.getPort()).build();
121     }
122
123     private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
124         String[] userInfo = null;
125         if (userInfoString != null && !userInfoString.isEmpty()) {
126             userInfo = userInfoString.split(":");
127         }
128         return userInfo;
129     }
130
131     private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
132         String name = fileData.name();
133         String compression = fileData.compression();
134         String fileFormatType = fileData.fileFormatType();
135         String fileFormatVersion = fileData.fileFormatVersion();
136
137         return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression)
138                 .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build();
139     }
140 }