1 package org.openecomp.core.tools.importinfo;
3 import com.datastax.driver.core.BoundStatement;
4 import com.datastax.driver.core.DataType.Name;
5 import com.datastax.driver.core.PreparedStatement;
6 import com.datastax.driver.core.ResultSet;
7 import com.datastax.driver.core.ResultSetFuture;
8 import com.datastax.driver.core.Session;
9 import com.google.common.collect.ImmutableMap;
10 import com.google.common.collect.ImmutableMap.Builder;
11 import com.google.common.collect.Sets;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import org.apache.commons.lang3.StringUtils;
15 import org.codehaus.jackson.map.ObjectMapper;
16 import org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory;
17 import org.openecomp.core.tools.exportinfo.ExportDataCommand;
18 import org.openecomp.core.tools.model.ColumnDefinition;
19 import org.openecomp.core.tools.model.TableData;
20 import org.openecomp.core.tools.util.Utils;
21 import org.openecomp.sdc.logging.api.Logger;
22 import org.openecomp.sdc.logging.api.LoggerFactory;
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.file.Path;
27 import java.util.Base64;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.List;
33 import java.util.stream.Collectors;
35 public class ImportSingleTable {
37 private static final Logger logger = LoggerFactory.getLogger(ImportSingleTable.class);
39 public static final String INSERT_INTO = "INSERT INTO ";
40 public static final String VALUES = " VALUES ";
41 private static final Map<String, PreparedStatement> statementsCache = new HashMap<>();
43 public void importFile(Path file) {
45 ObjectMapper objectMapper = new ObjectMapper();
46 TableData tableData = objectMapper.readValue(file.toFile(), TableData.class);
47 Session session = CassandraSessionFactory.getSession();
48 PreparedStatement ps = getPrepareStatement(tableData, session);
49 tableData.rows.parallelStream().forEach(row -> executeQuery(session, ps, tableData.definitions, row));
50 } catch (IOException e) {
51 Utils.logError(logger, e);
56 private PreparedStatement getPrepareStatement(TableData tableData, Session session) {
57 String query = createQuery(tableData);
58 if (statementsCache.containsKey(query)) {
59 return statementsCache.get(query);
61 PreparedStatement preparedStatement = session.prepare(query);
62 statementsCache.put(query, preparedStatement);
63 return preparedStatement;
66 private void executeQuery(Session session, PreparedStatement ps, List<ColumnDefinition> definitions, List<String> rows) {
67 BoundStatement bind = ps.bind();
68 for (int i = 0; i < definitions.size(); i++) {
69 ColumnDefinition columnDefinition = definitions.get(i);
70 String rowData = rows.get(i);
71 Name name = dataTypesMap.get(columnDefinition.getType());
72 handleByType(bind, i, rowData, name);
74 ResultSetFuture resultSetFuture = session.executeAsync(bind);
75 Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
77 public void onSuccess(ResultSet resultSet) {
78 Utils.printMessage(logger, "successful write ");
82 public void onFailure(Throwable t) {
83 Utils.logError(logger, t);
88 private void handleByType(BoundStatement bind, int i, String rowData, Name name) {
93 bind.setString(i, new String(Base64.getDecoder().decode(rowData)));
96 bind.setBytes(i, ByteBuffer.wrap(Base64.getDecoder().decode(rowData.getBytes())));
99 bind.setDate(i, new Date(Long.parseLong(rowData)));
102 bind.setBool(i, Boolean.parseBoolean(rowData));
105 bind.setLong(i, Long.parseLong(rowData));
108 bind.setInt(i, Integer.parseInt(rowData));
111 bind.setFloat(i, Float.parseFloat(rowData));
114 byte[] decoded = Base64.getDecoder().decode(rowData);
115 String decodedStr = new String(decoded);
116 if (!StringUtils.isEmpty(decodedStr)) {
117 String[] splitted = decodedStr.split(ExportDataCommand.JOIN_DELIMITER_SPILTTER);
118 Set set = Sets.newHashSet(splitted);
122 bind.setSet(i, null);
126 byte[] decodedMap = Base64.getDecoder().decode(rowData);
127 String mapStr = new String(decodedMap);
128 if (!StringUtils.isEmpty(mapStr)) {
129 String[] splittedMap = mapStr.split(ExportDataCommand.JOIN_DELIMITER_SPILTTER);
130 Map<String, String> map = new HashMap<>();
131 for (String keyValue : splittedMap) {
132 String[] split = keyValue.split(ExportDataCommand.MAP_DELIMITER_SPLITTER);
133 map.put(split[0], split[1]);
137 bind.setMap(i, null);
141 throw new UnsupportedOperationException("Name is not supported :" + name);
146 private String createQuery(TableData tableData) {
147 ColumnDefinition def = tableData.definitions.iterator().next();
148 StringBuilder sb = new StringBuilder();
149 sb.append(INSERT_INTO).append(def.getKeyspace()).append(".").append(def.getTable());
150 sb.append(tableData.definitions.stream().map(definition -> definition.getName()).collect(Collectors.joining(" , ", " ( ", " ) ")));
151 sb.append(VALUES).append(tableData.definitions.stream().map(definition -> "?").collect(Collectors.joining(" , ", " ( ", " ) "))).append(";");
152 return sb.toString();
155 public static final ImmutableMap<String, Name> dataTypesMap;
158 Builder<String, Name> builder = ImmutableMap.builder();
159 Name[] values = Name.values();
160 for (Name name : values) {
161 builder.put(name.name().toLowerCase(), name);
163 dataTypesMap = builder.build();