2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.tasks;
22 import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
23 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
24 import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
25 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
26 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
27 import org.onap.dcaegen2.collectors.datafile.model.FileData;
28 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.stereotype.Component;
34 import reactor.core.publisher.Flux;
37 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
40 public class XnfCollectorTaskImpl implements XnfCollectorTask {
42 private static final String FTPES = "ftpes";
43 private static final String FTPS = "ftps";
44 private static final String SFTP = "sftp";
46 private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
48 private final FtpsClient ftpsClient;
49 private final SftpClient sftpClient;
52 protected XnfCollectorTaskImpl(FtpsClient ftpsCleint, SftpClient sftpClient) {
53 this.ftpsClient = ftpsCleint;
54 this.sftpClient = sftpClient;
58 public Flux<ConsumerDmaapModel> execute(FileData fileData) {
59 logger.trace("Entering execute with {}", fileData);
60 String localFile = collectFile(fileData);
62 if (localFile != null) {
63 ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
64 logger.trace("Exiting execute with {}", consumerDmaapModel);
65 return Flux.just(consumerDmaapModel);
67 logger.trace("Exiting execute with empty");
71 private String collectFile(FileData fileData) {
72 String location = fileData.location();
73 URI uri = URI.create(location);
74 String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
75 FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(uri.getHost())
76 .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "")
77 .port(uri.getPort()).build();
78 String remoteFile = uri.getPath();
79 String localFile = "target" + File.separator + fileData.name();
80 String scheme = uri.getScheme();
82 boolean fileDownloaded = false;
83 if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
84 fileDownloaded = ftpsClient.collectFile(fileServerData, remoteFile, localFile);
85 } else if (SFTP.equals(scheme)) {
86 fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile);
89 logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
90 FTPES, FTPS, SFTP, fileData);
93 if (!fileDownloaded) {
99 private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
100 String[] userInfo = null;
101 if (userInfoString != null && !userInfoString.isEmpty()) {
102 userInfo = userInfoString.split(":");
107 private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
108 String name = fileData.name();
109 String compression = fileData.compression();
110 String fileFormatType = fileData.fileFormatType();
111 String fileFormatVersion = fileData.fileFormatVersion();
113 return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression)
114 .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build();