a29fb092d49d59287d8cff1a47b131bb84a48e24
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.tasks;
18
19 import java.io.File;
20 import java.net.URI;
21
22 import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
23 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
24 import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
25 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
26 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
27 import org.onap.dcaegen2.collectors.datafile.model.FileData;
28 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.stereotype.Component;
33
34 import reactor.core.publisher.Flux;
35
36 /**
37  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
38  */
39 @Component
40 public class XnfCollectorTaskImpl implements XnfCollectorTask {
41
42     private static final String FTPES = "ftpes";
43     private static final String FTPS = "ftps";
44     private static final String SFTP = "sftp";
45
46     private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
47
48     private final FtpsClient ftpsClient;
49     private final SftpClient sftpClient;
50
51     @Autowired
52     protected XnfCollectorTaskImpl(FtpsClient ftpsCleint, SftpClient sftpClient) {
53         this.ftpsClient = ftpsCleint;
54         this.sftpClient = sftpClient;
55     }
56
57     @Override
58     public Flux<ConsumerDmaapModel> execute(FileData fileData) {
59         logger.trace("Entering execute with {}", fileData);
60         String localFile = collectFile(fileData);
61
62         if (localFile != null) {
63             ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
64             logger.trace("Exiting execute with {}", consumerDmaapModel);
65             return Flux.just(consumerDmaapModel);
66         }
67         logger.trace("Exiting execute with empty");
68         return Flux.empty();
69     }
70
71     private String collectFile(FileData fileData) {
72         String location = fileData.location();
73         URI uri = URI.create(location);
74         String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
75         FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(uri.getHost())
76                 .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "")
77                 .port(uri.getPort()).build();
78         String remoteFile = uri.getPath();
79         String localFile = "target" + File.separator + fileData.name();
80         String scheme = uri.getScheme();
81
82         boolean fileDownloaded = false;
83         if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
84             fileDownloaded = ftpsClient.collectFile(fileServerData, remoteFile, localFile);
85         } else if (SFTP.equals(scheme)) {
86             fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile);
87         } else {
88
89             logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
90                     FTPES, FTPS, SFTP, fileData);
91             localFile = null;
92         }
93         if (!fileDownloaded) {
94             localFile = null;
95         }
96         return localFile;
97     }
98
99     private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
100         String[] userInfo = null;
101         if (userInfoString != null && !userInfoString.isEmpty()) {
102             userInfo = userInfoString.split(":");
103         }
104         return userInfo;
105     }
106
107     private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
108         String name = fileData.name();
109         String compression = fileData.compression();
110         String fileFormatType = fileData.fileFormatType();
111         String fileFormatVersion = fileData.fileFormatVersion();
112
113         return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression)
114                 .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build();
115     }
116 }