1 package org.openecomp.core.tools.importinfo;
 
   3 import com.datastax.driver.core.BoundStatement;
 
   4 import com.datastax.driver.core.DataType.Name;
 
   5 import com.datastax.driver.core.PreparedStatement;
 
   6 import com.datastax.driver.core.ResultSet;
 
   7 import com.datastax.driver.core.ResultSetFuture;
 
   8 import com.datastax.driver.core.Session;
 
   9 import com.google.common.collect.ImmutableMap;
 
  10 import com.google.common.collect.ImmutableMap.Builder;
 
  11 import com.google.common.collect.Sets;
 
  12 import com.google.common.util.concurrent.FutureCallback;
 
  13 import com.google.common.util.concurrent.Futures;
 
  14 import org.apache.commons.lang3.StringUtils;
 
  15 import org.codehaus.jackson.map.ObjectMapper;
 
  16 import org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory;
 
  17 import org.openecomp.core.tools.exportinfo.ExportDataCommand;
 
  18 import org.openecomp.core.tools.model.ColumnDefinition;
 
  19 import org.openecomp.core.tools.model.TableData;
 
  20 import org.openecomp.core.tools.util.Utils;
 
  21 import org.openecomp.sdc.logging.api.Logger;
 
  22 import org.openecomp.sdc.logging.api.LoggerFactory;
 
  24 import java.io.IOException;
 
  25 import java.nio.ByteBuffer;
 
  26 import java.nio.file.Path;
 
  27 import java.util.Base64;
 
  28 import java.util.Date;
 
  29 import java.util.HashMap;
 
  30 import java.util.List;
 
  33 import java.util.stream.Collectors;
 
  35 public class ImportSingleTable {
 
  37     private static final Logger logger = LoggerFactory.getLogger(ImportSingleTable.class);
 
  39     public static final String INSERT_INTO = "INSERT INTO ";
 
  40     public static final String VALUES = " VALUES ";
 
  41     private static final Map<String, PreparedStatement> statementsCache = new HashMap<>();
 
  43     public void importFile(Path file) {
 
  45             ObjectMapper objectMapper = new ObjectMapper();
 
  46             TableData tableData = objectMapper.readValue(file.toFile(), TableData.class);
 
  47             Session session = CassandraSessionFactory.getSession();
 
  48             PreparedStatement ps = getPrepareStatement(tableData, session);
 
  49             tableData.rows.parallelStream().forEach(row -> executeQuery(session, ps, tableData.definitions, row));
 
  50         } catch (IOException e) {
 
  51             Utils.logError(logger, e);
 
  56     private PreparedStatement getPrepareStatement(TableData tableData, Session session) {
 
  57         String query = createQuery(tableData);
 
  58         if (statementsCache.containsKey(query)) {
 
  59             return statementsCache.get(query);
 
  61         PreparedStatement preparedStatement = session.prepare(query);
 
  62         statementsCache.put(query, preparedStatement);
 
  63         return preparedStatement;
 
  66     private void executeQuery(Session session, PreparedStatement ps, List<ColumnDefinition> definitions, List<String> rows) {
 
  67         BoundStatement bind = ps.bind();
 
  68         for (int i = 0; i < definitions.size(); i++) {
 
  69             ColumnDefinition columnDefinition = definitions.get(i);
 
  70             String rowData = rows.get(i);
 
  71             Name name = dataTypesMap.get(columnDefinition.getType());
 
  72             handleByType(bind, i, rowData, name);
 
  74         ResultSetFuture resultSetFuture = session.executeAsync(bind);
 
  75         Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
 
  77             public void onSuccess(ResultSet resultSet) {
 
  78                 Utils.printMessage(logger, "successful write ");
 
  82             public void onFailure(Throwable t) {
 
  83                 Utils.logError(logger, t);
 
  88     private void handleByType(BoundStatement bind, int i, String rowData, Name name) {
 
  93                 bind.setString(i, new String(Base64.getDecoder().decode(rowData)));
 
  96                 bind.setBytes(i, ByteBuffer.wrap(Base64.getDecoder().decode(rowData.getBytes())));
 
  99                 bind.setDate(i, new Date(Long.parseLong(rowData)));
 
 102                 bind.setBool(i, Boolean.parseBoolean(rowData));
 
 105                 bind.setLong(i, Long.parseLong(rowData));
 
 108                 bind.setInt(i, Integer.parseInt(rowData));
 
 111                 bind.setFloat(i, Float.parseFloat(rowData));
 
 114                 byte[] decoded = Base64.getDecoder().decode(rowData);
 
 115                 String decodedStr = new String(decoded);
 
 116                 if (!StringUtils.isEmpty(decodedStr)) {
 
 117                     String[] splitted = decodedStr.split(ExportDataCommand.JOIN_DELIMITER_SPLITTER);
 
 118                     Set set = Sets.newHashSet(splitted);
 
 122                     bind.setSet(i, null);
 
 126                 byte[] decodedMap = Base64.getDecoder().decode(rowData);
 
 127                 String mapStr = new String(decodedMap);
 
 128                 if (!StringUtils.isEmpty(mapStr)) {
 
 129                     String[] splittedMap = mapStr.split(ExportDataCommand.JOIN_DELIMITER_SPLITTER);
 
 130                     Map<String, String> map = new HashMap<>();
 
 131                     for (String keyValue : splittedMap) {
 
 132                         String[] split = keyValue.split(ExportDataCommand.MAP_DELIMITER_SPLITTER);
 
 133                         map.put(split[0], split[1]);
 
 137                     bind.setMap(i, null);
 
 141                 throw new UnsupportedOperationException("Name is not supported :" + name);
 
 146     private String createQuery(TableData tableData) {
 
 147         ColumnDefinition def = tableData.definitions.iterator().next();
 
 148         StringBuilder sb = new StringBuilder();
 
 149         sb.append(INSERT_INTO).append(def.getKeyspace()).append(".").append(def.getTable());
 
 150         sb.append(tableData.definitions.stream().map(definition -> definition.getName()).collect(Collectors.joining(" , ", " ( ", " ) ")));
 
 151         sb.append(VALUES).append(tableData.definitions.stream().map(definition -> "?").collect(Collectors.joining(" , ", " ( ", " ) "))).append(";");
 
 152         return sb.toString();
 
 155     public static final ImmutableMap<String, Name> dataTypesMap;
 
 158         Builder<String, Name> builder = ImmutableMap.builder();
 
 159         Name[] values = Name.values();
 
 160         for (Name name : values) {
 
 161             builder.put(name.name().toLowerCase(), name);
 
 163         dataTypesMap = builder.build();