922e5c7d002906187727b36a36465daf62fb7a57
[sdc.git] /
1 package org.openecomp.core.tools.importinfo;
2
3 import com.datastax.driver.core.BoundStatement;
4 import com.datastax.driver.core.DataType.Name;
5 import com.datastax.driver.core.PreparedStatement;
6 import com.datastax.driver.core.ResultSet;
7 import com.datastax.driver.core.ResultSetFuture;
8 import com.datastax.driver.core.Session;
9 import com.google.common.collect.ImmutableMap;
10 import com.google.common.collect.ImmutableMap.Builder;
11 import com.google.common.collect.Sets;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import org.apache.commons.lang3.StringUtils;
15 import org.codehaus.jackson.map.ObjectMapper;
16 import org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory;
17 import org.openecomp.core.tools.exportinfo.ExportDataCommand;
18 import org.openecomp.core.tools.model.ColumnDefinition;
19 import org.openecomp.core.tools.model.TableData;
20 import org.openecomp.core.tools.util.Utils;
21 import org.openecomp.sdc.logging.api.Logger;
22 import org.openecomp.sdc.logging.api.LoggerFactory;
23
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.file.Path;
27 import java.util.Base64;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.stream.Collectors;
34
35 public class ImportSingleTable {
36
37     private static final Logger logger = LoggerFactory.getLogger(ImportSingleTable.class);
38
39     private static final String INSERT_INTO = "INSERT INTO ";
40     private static final String VALUES = " VALUES ";
41     private static final Map<String, PreparedStatement> statementsCache = new HashMap<>();
42
43     public void importFile(Path file) {
44         try {
45             ObjectMapper objectMapper = new ObjectMapper();
46             TableData tableData = objectMapper.readValue(file.toFile(), TableData.class);
47             Session session = CassandraSessionFactory.getSession();
48             PreparedStatement ps = getPrepareStatement(tableData, session);
49             tableData.rows.parallelStream().forEach(row -> executeQuery(session, ps, tableData.definitions, row));
50         } catch (IOException e) {
51             Utils.logError(logger, e);
52         }
53
54     }
55
56     private PreparedStatement getPrepareStatement(TableData tableData, Session session) {
57         String query = createQuery(tableData);
58         if (statementsCache.containsKey(query)) {
59             return statementsCache.get(query);
60         }
61         PreparedStatement preparedStatement = session.prepare(query);
62         statementsCache.put(query, preparedStatement);
63         return preparedStatement;
64     }
65
66     private void executeQuery(Session session, PreparedStatement ps, List<ColumnDefinition> definitions, List<String> rows) {
67         BoundStatement bind = ps.bind();
68         for (int i = 0; i < definitions.size(); i++) {
69             ColumnDefinition columnDefinition = definitions.get(i);
70             String rowData = rows.get(i);
71             Name name = dataTypesMap.get(columnDefinition.getType());
72             handleByType(bind, i, rowData, name);
73         }
74         ResultSetFuture resultSetFuture = session.executeAsync(bind);
75         Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
76             @Override
77             public void onSuccess(ResultSet resultSet) {
78                 Utils.printMessage(logger, "successful write ");
79             }
80
81             @Override
82             public void onFailure(Throwable t) {
83                 Utils.logError(logger, t);
84             }
85         });
86     }
87
88     private void handleByType(BoundStatement bind, int i, String rowData, Name name) {
89         switch (name) {
90             case VARCHAR:
91             case TEXT:
92             case ASCII:
93                 bind.setString(i, new String(Base64.getDecoder().decode(rowData)));
94                 break;
95             case BLOB:
96                 bind.setBytes(i, ByteBuffer.wrap(Base64.getDecoder().decode(rowData.getBytes())));
97                 break;
98             case TIMESTAMP:
99                 if (StringUtils.isEmpty(rowData)){
100                     bind.setSet(i, null);
101                 } else {
102                     bind.setDate(i, new Date(Long.parseLong(rowData)));
103                 }
104                 break;
105             case BOOLEAN:
106                 bind.setBool(i, Boolean.parseBoolean(rowData));
107                 break;
108             case COUNTER:
109                 bind.setLong(i, Long.parseLong(rowData));
110                 break;
111             case INT:
112                 bind.setInt(i, Integer.parseInt(rowData));
113                 break;
114             case FLOAT:
115                 bind.setFloat(i, Float.parseFloat(rowData));
116                 break;
117             case SET:
118                 byte[] decoded = Base64.getDecoder().decode(rowData);
119                 String decodedStr = new String(decoded);
120                 if (!StringUtils.isEmpty(decodedStr)) {
121                     String[] splitted = decodedStr.split(ExportDataCommand.JOIN_DELIMITER_SPLITTER);
122                     Set set = Sets.newHashSet(splitted);
123                     set.remove("");
124                     bind.setSet(i, set);
125                 } else {
126                     bind.setSet(i, null);
127                 }
128                 break;
129             case MAP:
130                 byte[] decodedMap = Base64.getDecoder().decode(rowData);
131                 String mapStr = new String(decodedMap);
132                 if (!StringUtils.isEmpty(mapStr)) {
133                     String[] splittedMap = mapStr.split(ExportDataCommand.JOIN_DELIMITER_SPLITTER);
134                     Map<String, String> map = new HashMap<>();
135                     for (String keyValue : splittedMap) {
136                         String[] split = keyValue.split(ExportDataCommand.MAP_DELIMITER_SPLITTER);
137                         map.put(split[0], split[1]);
138                     }
139                     bind.setMap(i, map);
140                 } else {
141                     bind.setMap(i, null);
142                 }
143                 break;
144             default:
145                 throw new UnsupportedOperationException("Name is not supported :" + name);
146
147         }
148     }
149
150     private String createQuery(TableData tableData) {
151         ColumnDefinition def = tableData.definitions.iterator().next();
152         StringBuilder sb = new StringBuilder();
153         sb.append(INSERT_INTO).append(def.getKeyspace()).append(".").append(def.getTable());
154         sb.append(tableData.definitions.stream().map(ColumnDefinition::getName).collect(Collectors.joining(" , ", " ( ", " ) ")));
155         sb.append(VALUES).append(tableData.definitions.stream().map(definition -> "?").collect(Collectors.joining(" , ", " ( ", " ) "))).append(";");
156         return sb.toString();
157     }
158
159     public static final ImmutableMap<String, Name> dataTypesMap;
160
161     static {
162         Builder<String, Name> builder = ImmutableMap.builder();
163         Name[] values = Name.values();
164         for (Name name : values) {
165             builder.put(name.name().toLowerCase(), name);
166         }
167         dataTypesMap = builder.build();
168     }
169
170 }