2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 European Support Limited. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.core.tools.exportinfo;
22 import static java.nio.file.Files.createDirectories;
23 import static org.openecomp.core.tools.commands.CommandName.EXPORT;
25 import com.datastax.driver.core.ResultSet;
26 import com.datastax.driver.core.ResultSetFuture;
27 import com.datastax.driver.core.Session;
28 import com.google.common.collect.Sets;
29 import com.google.common.util.concurrent.FutureCallback;
30 import com.google.common.util.concurrent.Futures;
31 import java.io.IOException;
32 import java.io.InputStream;
33 import java.nio.file.Path;
34 import java.nio.file.Paths;
35 import java.time.LocalDateTime;
36 import java.time.format.DateTimeFormatter;
37 import java.util.HashSet;
38 import java.util.List;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.Executor;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.stream.Collectors;
46 import org.apache.commons.cli.CommandLine;
47 import org.apache.commons.cli.Option;
48 import org.apache.commons.io.FileUtils;
49 import org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory;
50 import org.openecomp.core.tools.commands.Command;
51 import org.openecomp.core.tools.commands.CommandName;
52 import org.openecomp.core.tools.importinfo.ImportProperties;
53 import org.openecomp.core.tools.util.Utils;
54 import org.openecomp.core.zusammen.impl.CassandraConnectionInitializer;
55 import org.openecomp.sdc.common.zip.ZipUtils;
56 import org.openecomp.sdc.common.zip.exception.ZipException;
57 import org.openecomp.sdc.logging.api.Logger;
58 import org.openecomp.sdc.logging.api.LoggerFactory;
59 import org.yaml.snakeyaml.Yaml;
62 public final class ExportDataCommand extends Command {
64 private static final Logger LOGGER = LoggerFactory.getLogger(ExportDataCommand.class);
65 private static final String ITEM_ID_OPTION = "i";
66 static final String JOIN_DELIMITER = "$#";
67 public static final String JOIN_DELIMITER_SPLITTER = "\\$\\#";
68 static final String MAP_DELIMITER = "!@";
69 public static final String MAP_DELIMITER_SPLITTER = "\\!\\@";
70 private static final int THREAD_POOL_SIZE = 6;
71 public static final String NULL_REPRESENTATION = "nnuullll";
73 public ExportDataCommand() {
75 Option.builder(ITEM_ID_OPTION).hasArg().argName("item id").desc("id of item to export, mandatory").build());
79 public boolean execute(String[] args) {
80 CommandLine cmd = parseArgs(args);
82 if (!cmd.hasOption(ITEM_ID_OPTION) || cmd.getOptionValue(ITEM_ID_OPTION) == null) {
83 LOGGER.error("Argument i is mandatory");
87 ExecutorService executor = null;
89 CassandraConnectionInitializer.setCassandraConnectionPropertiesToSystem();
90 Path rootDir = Paths.get(ImportProperties.ROOT_DIRECTORY);
92 try (Session session = CassandraSessionFactory.getSession()) {
93 final Set<String> filteredItems = Sets.newHashSet(cmd.getOptionValue(ITEM_ID_OPTION));
95 filteredItems.stream().map(fi -> fi.replaceAll("\\r", "")).collect(Collectors.toSet());
96 Map<String, List<String>> queries;
97 Yaml yaml = new Yaml();
98 try (InputStream is = ExportDataCommand.class.getResourceAsStream("/queries.yaml")) {
99 queries = (Map<String, List<String>>) yaml.load(is);
101 List<String> queriesList = queries.get("queries");
102 List<String> itemsColumns = queries.get("item_columns");
103 Set<String> vlms = new HashSet<>();
104 CountDownLatch doneQueries = new CountDownLatch(queriesList.size());
105 executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
106 for (int i = 0; i < queriesList.size(); i++) {
107 executeQuery(session, queriesList.get(i), fis, itemsColumns.get(i), vlms, doneQueries, executor);
110 if (!vlms.isEmpty()) {
111 CountDownLatch doneVmls = new CountDownLatch(queriesList.size());
112 for (int i = 0; i < queriesList.size(); i++) {
113 executeQuery(session, queriesList.get(i), vlms, itemsColumns.get(i), null, doneVmls, executor);
120 FileUtils.forceDelete(rootDir.toFile());
121 } catch (Exception ex) {
122 Utils.logError(LOGGER, ex);
124 if (executor != null) {
132 public CommandName getCommandName() {
136 private static void executeQuery(final Session session, final String query, final Set<String> filteredItems,
137 final String filteredColumn, final Set<String> vlms, final CountDownLatch donequerying, Executor executor) {
138 ResultSetFuture resultSetFuture = session.executeAsync(query);
139 Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
141 public void onSuccess(ResultSet resultSet) {
143 Utils.printMessage(LOGGER, "Start to serialize " + query);
144 new ExportSerializer().serializeResult(resultSet, filteredItems, filteredColumn, vlms);
145 donequerying.countDown();
146 } catch (Exception e) {
147 Utils.logError(LOGGER, "Serialization failed :" + query, e);
153 public void onFailure(Throwable t) {
154 Utils.logError(LOGGER, "Query failed :" + query, t);
160 private static void zipPath(final Path rootDir) throws ZipException {
161 final LocalDateTime date = LocalDateTime.now();
162 final DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
163 final String dateStr = date.format(formatter).replace(":", "_");
164 final Path zipFile = Paths.get(System.getProperty("user.home"),String.format("onboarding_import%s.zip", dateStr));
165 ZipUtils.createZipFromPath(rootDir, zipFile);
166 Utils.printMessage(LOGGER, "Zip file was created " + zipFile.toString());
167 Utils.printMessage(LOGGER, "Exported file :" + zipFile.toString());
171 public static void initDir(Path rootDir) throws IOException {
172 if (rootDir.toFile().exists()) {
173 FileUtils.forceDelete(rootDir.toFile());
175 createDirectories(rootDir);