Add collaboration feature
[sdc.git] / openecomp-be / tools / zusammen-tools / src / main / java / org / openecomp / core / tools / importinfo / ImportSingleTable.java
1 package org.openecomp.core.tools.importinfo;
2
3 import com.datastax.driver.core.BoundStatement;
4 import com.datastax.driver.core.DataType.Name;
5 import com.datastax.driver.core.PreparedStatement;
6 import com.datastax.driver.core.ResultSet;
7 import com.datastax.driver.core.ResultSetFuture;
8 import com.datastax.driver.core.Session;
9 import com.google.common.collect.ImmutableMap;
10 import com.google.common.collect.ImmutableMap.Builder;
11 import com.google.common.collect.Sets;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import org.apache.commons.lang3.StringUtils;
15 import org.codehaus.jackson.map.ObjectMapper;
16 import org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory;
17 import org.openecomp.core.tools.exportinfo.ExportDataCommand;
18 import org.openecomp.core.tools.model.ColumnDefinition;
19 import org.openecomp.core.tools.model.TableData;
20 import org.openecomp.core.tools.util.Utils;
21 import org.openecomp.sdc.logging.api.Logger;
22 import org.openecomp.sdc.logging.api.LoggerFactory;
23
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 import java.nio.file.Path;
27 import java.util.Base64;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.stream.Collectors;
34
35 public class ImportSingleTable {
36
37     private static final Logger logger = LoggerFactory.getLogger(ImportSingleTable.class);
38
39     public static final String INSERT_INTO = "INSERT INTO ";
40     public static final String VALUES = " VALUES ";
41     private static final Map<String, PreparedStatement> statementsCache = new HashMap<>();
42
43     public void importFile(Path file) {
44         try {
45             ObjectMapper objectMapper = new ObjectMapper();
46             TableData tableData = objectMapper.readValue(file.toFile(), TableData.class);
47             Session session = CassandraSessionFactory.getSession();
48             PreparedStatement ps = getPrepareStatement(tableData, session);
49             tableData.rows.parallelStream().forEach(row -> executeQuery(session, ps, tableData.definitions, row));
50         } catch (IOException e) {
51             Utils.logError(logger, e);
52         }
53
54     }
55
56     private PreparedStatement getPrepareStatement(TableData tableData, Session session) {
57         String query = createQuery(tableData);
58         if (statementsCache.containsKey(query)) {
59             return statementsCache.get(query);
60         }
61         PreparedStatement preparedStatement = session.prepare(query);
62         statementsCache.put(query, preparedStatement);
63         return preparedStatement;
64     }
65
66     private void executeQuery(Session session, PreparedStatement ps, List<ColumnDefinition> definitions, List<String> rows) {
67         BoundStatement bind = ps.bind();
68         for (int i = 0; i < definitions.size(); i++) {
69             ColumnDefinition columnDefinition = definitions.get(i);
70             String rowData = rows.get(i);
71             Name name = dataTypesMap.get(columnDefinition.getType());
72             handleByType(bind, i, rowData, name);
73         }
74         ResultSetFuture resultSetFuture = session.executeAsync(bind);
75         Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
76             @Override
77             public void onSuccess(ResultSet resultSet) {
78                 Utils.printMessage(logger, "successful write ");
79             }
80
81             @Override
82             public void onFailure(Throwable t) {
83                 Utils.logError(logger, t);
84             }
85         });
86     }
87
88     private void handleByType(BoundStatement bind, int i, String rowData, Name name) {
89         switch (name) {
90             case VARCHAR:
91             case TEXT:
92             case ASCII:
93                 bind.setString(i, new String(Base64.getDecoder().decode(rowData)));
94                 break;
95             case BLOB:
96                 bind.setBytes(i, ByteBuffer.wrap(Base64.getDecoder().decode(rowData.getBytes())));
97                 break;
98             case TIMESTAMP:
99                 bind.setDate(i, new Date(Long.parseLong(rowData)));
100                 break;
101             case BOOLEAN:
102                 bind.setBool(i, Boolean.parseBoolean(rowData));
103                 break;
104             case COUNTER:
105                 bind.setLong(i, Long.parseLong(rowData));
106                 break;
107             case INT:
108                 bind.setInt(i, Integer.parseInt(rowData));
109                 break;
110             case FLOAT:
111                 bind.setFloat(i, Float.parseFloat(rowData));
112                 break;
113             case SET:
114                 byte[] decoded = Base64.getDecoder().decode(rowData);
115                 String decodedStr = new String(decoded);
116                 if (!StringUtils.isEmpty(decodedStr)) {
117                     String[] splitted = decodedStr.split(ExportDataCommand.JOIN_DELIMITER_SPILTTER);
118                     Set set = Sets.newHashSet(splitted);
119                     set.remove("");
120                     bind.setSet(i, set);
121                 } else {
122                     bind.setSet(i, null);
123                 }
124                 break;
125             case MAP:
126                 byte[] decodedMap = Base64.getDecoder().decode(rowData);
127                 String mapStr = new String(decodedMap);
128                 if (!StringUtils.isEmpty(mapStr)) {
129                     String[] splittedMap = mapStr.split(ExportDataCommand.JOIN_DELIMITER_SPILTTER);
130                     Map<String, String> map = new HashMap<>();
131                     for (String keyValue : splittedMap) {
132                         String[] split = keyValue.split(ExportDataCommand.MAP_DELIMITER_SPLITTER);
133                         map.put(split[0], split[1]);
134                     }
135                     bind.setMap(i, map);
136                 } else {
137                     bind.setMap(i, null);
138                 }
139                 break;
140             default:
141                 throw new UnsupportedOperationException("Name is not supported :" + name);
142
143         }
144     }
145
146     private String createQuery(TableData tableData) {
147         ColumnDefinition def = tableData.definitions.iterator().next();
148         StringBuilder sb = new StringBuilder();
149         sb.append(INSERT_INTO).append(def.getKeyspace()).append(".").append(def.getTable());
150         sb.append(tableData.definitions.stream().map(definition -> definition.getName()).collect(Collectors.joining(" , ", " ( ", " ) ")));
151         sb.append(VALUES).append(tableData.definitions.stream().map(definition -> "?").collect(Collectors.joining(" , ", " ( ", " ) "))).append(";");
152         return sb.toString();
153     }
154
155     public static final ImmutableMap<String, Name> dataTypesMap;
156
157     static {
158         Builder<String, Name> builder = ImmutableMap.builder();
159         Name[] values = Name.values();
160         for (Name name : values) {
161             builder.put(name.name().toLowerCase(), name);
162         }
163         dataTypesMap = builder.build();
164     }
165
166 }