import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType.Name;
import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.commons.lang3.StringUtils;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory;
-import org.openecomp.core.tools.exportinfo.ExportDataCommand;
-import org.openecomp.core.tools.model.ColumnDefinition;
-import org.openecomp.core.tools.model.TableData;
-import org.openecomp.core.tools.util.Utils;
-import org.openecomp.sdc.logging.api.Logger;
-import org.openecomp.sdc.logging.api.LoggerFactory;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.openecomp.core.nosqldb.impl.cassandra.CassandraSessionFactory;
+import org.openecomp.core.tools.exportinfo.ExportDataCommand;
+import org.openecomp.core.tools.model.ColumnDefinition;
+import org.openecomp.core.tools.model.TableData;
+import org.openecomp.core.tools.util.Utils;
+import org.openecomp.sdc.logging.api.Logger;
+import org.openecomp.sdc.logging.api.LoggerFactory;
public class ImportSingleTable {
private static final Logger logger = LoggerFactory.getLogger(ImportSingleTable.class);
- public static final String INSERT_INTO = "INSERT INTO ";
- public static final String VALUES = " VALUES ";
+ private static final String INSERT_INTO = "INSERT INTO ";
+ private static final String VALUES = " VALUES ";
private static final Map<String, PreparedStatement> statementsCache = new HashMap<>();
public void importFile(Path file) {
TableData tableData = objectMapper.readValue(file.toFile(), TableData.class);
Session session = CassandraSessionFactory.getSession();
PreparedStatement ps = getPrepareStatement(tableData, session);
- tableData.rows.parallelStream().forEach(row -> executeQuery(session, ps, tableData.definitions, row));
+ tableData.rows.forEach(row -> executeQuery(session, ps, tableData.definitions, row));
} catch (IOException e) {
Utils.logError(logger, e);
}
Name name = dataTypesMap.get(columnDefinition.getType());
handleByType(bind, i, rowData, name);
}
- ResultSetFuture resultSetFuture = session.executeAsync(bind);
- Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet resultSet) {
- Utils.printMessage(logger, "successful write ");
- }
-
- @Override
- public void onFailure(Throwable t) {
- Utils.logError(logger, t);
- }
- });
+ session.execute(bind);
}
private void handleByType(BoundStatement bind, int i, String rowData, Name name) {
bind.setBytes(i, ByteBuffer.wrap(Base64.getDecoder().decode(rowData.getBytes())));
break;
case TIMESTAMP:
- bind.setDate(i, new Date(Long.parseLong(rowData)));
+ if (StringUtils.isEmpty(rowData)){
+ bind.setDate(i, null);
+ } else {
+ bind.setDate(i, new Date(Long.parseLong(rowData)));
+ }
break;
case BOOLEAN:
bind.setBool(i, Boolean.parseBoolean(rowData));
ColumnDefinition def = tableData.definitions.iterator().next();
StringBuilder sb = new StringBuilder();
sb.append(INSERT_INTO).append(def.getKeyspace()).append(".").append(def.getTable());
- sb.append(tableData.definitions.stream().map(definition -> definition.getName()).collect(Collectors.joining(" , ", " ( ", " ) ")));
+ sb.append(tableData.definitions.stream().map(ColumnDefinition::getName).collect(Collectors.joining(" , ", " ( ", " ) ")));
sb.append(VALUES).append(tableData.definitions.stream().map(definition -> "?").collect(Collectors.joining(" , ", " ( ", " ) "))).append(";");
return sb.toString();
}