2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.tasks;
22 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
23 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
24 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
25 import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
26 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
27 import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
28 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
29 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
30 import org.onap.dcaegen2.collectors.datafile.model.FileData;
31 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.stereotype.Component;
37 import reactor.core.publisher.Flux;
40 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
43 public class XnfCollectorTaskImpl implements XnfCollectorTask {
45 private static final String FTPES = "ftpes";
46 private static final String FTPS = "ftps";
47 private static final String SFTP = "sftp";
49 private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
50 private Config datafileAppConfig;
52 private final FtpsClient ftpsClient;
53 private final SftpClient sftpClient;
56 protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
57 this.datafileAppConfig = datafileAppConfig;
58 this.ftpsClient = ftpsCleint;
59 this.sftpClient = sftpClient;
63 public Flux<ConsumerDmaapModel> execute(FileData fileData) {
64 logger.trace("Entering execute with {}", fileData);
67 String localFile = collectFile(fileData);
69 if (localFile != null) {
70 ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
71 logger.trace("Exiting execute with {}", consumerDmaapModel);
72 return Flux.just(consumerDmaapModel);
74 logger.trace("Exiting execute with empty");
79 public FtpesConfig resolveConfiguration() {
80 return datafileAppConfig.getFtpesConfiguration();
83 private void resolveKeyStore() {
84 FtpesConfig ftpesConfig = resolveConfiguration();
85 ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
86 ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
87 ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
88 ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
91 private String collectFile(FileData fileData) {
92 logger.trace("starting to collectFile");
93 String location = fileData.location();
94 URI uri = URI.create(location);
95 FileServerData fileServerData = getFileServerData(uri);
96 String remoteFile = uri.getPath();
97 String localFile = "target" + File.separator + fileData.name();
98 String scheme = uri.getScheme();
99 boolean fileDownloaded = false;
100 if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
101 fileDownloaded = ftpsClient.collectFile(fileServerData, remoteFile, localFile);
102 } else if (SFTP.equals(scheme)) {
103 fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile);
106 logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
107 FTPES, FTPS, SFTP, fileData);
110 if (!fileDownloaded) {
116 private FileServerData getFileServerData(URI uri) {
117 String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
118 return ImmutableFileServerData.builder().serverAddress(uri.getHost())
119 .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "")
120 .port(uri.getPort()).build();
123 private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
124 String[] userInfo = null;
125 if (userInfoString != null && !userInfoString.isEmpty()) {
126 userInfo = userInfoString.split(":");
131 private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
132 String name = fileData.name();
133 String compression = fileData.compression();
134 String fileFormatType = fileData.fileFormatType();
135 String fileFormatVersion = fileData.fileFormatVersion();
137 return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression)
138 .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build();