2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.core.tools.importinfo;
23 import static org.openecomp.core.tools.exportinfo.ExportDataCommand.NULL_REPRESENTATION;
25 import com.datastax.driver.core.BoundStatement;
26 import com.datastax.driver.core.DataType.Name;
27 import com.datastax.driver.core.PreparedStatement;
28 import com.datastax.driver.core.Session;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.collect.ImmutableMap.Builder;
31 import com.google.common.collect.Sets;
32 import org.apache.commons.lang3.StringUtils;
33 import org.codehaus.jackson.map.ObjectMapper;
34 import org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory;
35 import org.openecomp.core.tools.exportinfo.ExportDataCommand;
36 import org.openecomp.core.tools.model.ColumnDefinition;
37 import org.openecomp.core.tools.model.TableData;
38 import org.openecomp.core.tools.util.Utils;
39 import org.openecomp.sdc.logging.api.Logger;
40 import org.openecomp.sdc.logging.api.LoggerFactory;
42 import java.io.IOException;
43 import java.nio.ByteBuffer;
44 import java.nio.file.Path;
46 import java.util.stream.Collectors;
48 public class ImportSingleTable {
50 private static final Logger logger = LoggerFactory.getLogger(ImportSingleTable.class);
52 private static final String INSERT_INTO = "INSERT INTO ";
53 private static final String VALUES = " VALUES ";
54 private static final Map<String, PreparedStatement> statementsCache = new HashMap<>();
56 public static final ImmutableMap<String, Name> dataTypesMap;
58 public void importFile(Path file) {
60 ObjectMapper objectMapper = new ObjectMapper();
61 TableData tableData = objectMapper.readValue(file.toFile(), TableData.class);
62 Session session = CassandraSessionFactory.getSession();
63 PreparedStatement ps = getPrepareStatement(tableData, session);
64 tableData.rows.forEach(row -> executeQuery(session, ps, tableData.definitions, row));
65 } catch (IOException e) {
66 Utils.logError(logger, e);
71 private PreparedStatement getPrepareStatement(TableData tableData, Session session) {
72 String query = createQuery(tableData);
73 if (statementsCache.containsKey(query)) {
74 return statementsCache.get(query);
76 PreparedStatement preparedStatement = session.prepare(query);
77 statementsCache.put(query, preparedStatement);
78 return preparedStatement;
81 private void executeQuery(Session session, PreparedStatement ps, List<ColumnDefinition> definitions, List<String> rows) {
82 BoundStatement bind = ps.bind();
83 for (int i = 0; i < definitions.size(); i++) {
84 ColumnDefinition columnDefinition = definitions.get(i);
85 String rowData = rows.get(i);
86 Name name = dataTypesMap.get(columnDefinition.getType());
87 handleByType(bind, i, rowData, name);
89 session.execute(bind);
92 private void handleByType(BoundStatement bind, int i, String rowData, Name name) {
97 String string = new String(Base64.getDecoder().decode(rowData));
98 bind.setString(i, NULL_REPRESENTATION.equals(string) ? null : string);
101 bind.setBytes(i, ByteBuffer.wrap(Base64.getDecoder().decode(rowData.getBytes())));
104 if (StringUtils.isEmpty(rowData)){
105 bind.setTimestamp(i, null);
107 bind.setTimestamp(i, new Date(Long.parseLong(rowData)));
111 bind.setBool(i, Boolean.parseBoolean(rowData));
114 bind.setLong(i, Long.parseLong(rowData));
117 bind.setInt(i, Integer.parseInt(rowData));
120 bind.setFloat(i, Float.parseFloat(rowData));
123 byte[] decoded = Base64.getDecoder().decode(rowData);
124 String decodedStr = new String(decoded);
125 if (!StringUtils.isEmpty(decodedStr)) {
126 String[] splitted = decodedStr.split(ExportDataCommand.JOIN_DELIMITER_SPLITTER);
127 Set set = Sets.newHashSet(splitted);
131 bind.setSet(i, null);
135 byte[] decodedMap = Base64.getDecoder().decode(rowData);
136 String mapStr = new String(decodedMap);
137 if (!StringUtils.isEmpty(mapStr)) {
138 String[] splittedMap = mapStr.split(ExportDataCommand.JOIN_DELIMITER_SPLITTER);
139 Map<String, String> map = new HashMap<>();
140 for (String keyValue : splittedMap) {
141 String[] split = keyValue.split(ExportDataCommand.MAP_DELIMITER_SPLITTER);
142 map.put(split[0], split[1]);
146 bind.setMap(i, null);
150 throw new UnsupportedOperationException("Name is not supported :" + name);
155 private String createQuery(TableData tableData) {
156 ColumnDefinition def = tableData.definitions.iterator().next();
157 StringBuilder sb = new StringBuilder(1024);
158 sb.append(INSERT_INTO).append(def.getKeyspace()).append(".").append(def.getTable());
159 sb.append(tableData.definitions.stream().map(ColumnDefinition::getName)
160 .collect(Collectors.joining(" , ", " ( ", " ) ")));
161 sb.append(VALUES).append(tableData.definitions.stream().map(definition -> "?")
162 .collect(Collectors.joining(" , ", " ( ", " ) "))).append(";");
163 return sb.toString();
168 Builder<String, Name> builder = ImmutableMap.builder();
169 Name[] values = Name.values();
170 for (Name name : values) {
171 builder.put(name.name().toLowerCase(), name);
173 dataTypesMap = builder.build();