Sonar fixes in DataMigration 89/31489/2
authorwejs <maciej.wejs@nokia.com>
Mon, 12 Feb 2018 16:58:09 +0000 (17:58 +0100)
committerMichael Lando <ml636r@att.com>
Thu, 15 Feb 2018 17:48:51 +0000 (17:48 +0000)
Blocker and other sonar issues fix and Check style corrections.

Change-Id: I508a0e33666caaaa350bb7aa6e26f74b112cd6b7
Issue-ID: SDC-1035
Signed-off-by: wejs <maciej.wejs@nokia.com>
asdctool/src/main/java/org/openecomp/sdc/asdctool/impl/DataMigration.java
asdctool/src/main/java/org/openecomp/sdc/asdctool/main/EsToCassandraDataMigrationMenu.java

index 9b1e3dc..c399e58 100644 (file)
@@ -7,9 +7,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
 package org.openecomp.sdc.asdctool.impl;
 
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import fj.data.Either;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -38,8 +43,8 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TimeZone;
-
 import org.apache.commons.lang.SystemUtils;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -77,778 +82,742 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import com.carrotsearch.hppc.cursors.ObjectCursor;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import fj.data.Either;
-
 /**
  * Created by mlando on 5/16/2016.
  */
 public class DataMigration {
 
-       private Gson gson = new Gson();
-
-       private ObjectMapper jsonMapper = new ObjectMapper();
-
-       private static Logger log = LoggerFactory.getLogger(DataMigration.class.getName());
-
-       protected ElasticSearchClient elasticSearchClient;
-       @Autowired
-       protected AuditCassandraDao auditCassandraDao;
-       @Autowired
-       protected ArtifactCassandraDao artifactCassandraDao;
-       private static final String dateFormatPattern = "yyyy-MM-dd HH:mm:ss.SSS z";
-       private static SimpleDateFormat simpleDateFormat;
-
-       /**
-        * the method exports and imports the records from ES to cassandra the flow
-        * will check to see if the files are not empty if the files are not empty
-        * the export will be skiped and the flow will use the existing files. the
-        * flow will check if the tables in cassandra are empty, if the tables are
-        * not empty the proces will stop and exit. if the tables are empty the
-        * method will import the records from the files. in case of a fail the flow
-        * will exit and clear all the Cassandra tables.
-        *
-        * @param appConfigDir
-        *            the location of the dir in wich the output files will be
-        *            stored
-        * @param exportFromEs
-        *            should the es be exported again and overwrite the old export
-        * @param importToCassandra
-        *            should we import the data into cassandra
-        * @return true in case the operation was successful.
-        */
-       public boolean migrateDataESToCassndra(String appConfigDir, boolean exportFromEs, boolean importToCassandra) {
-               initFormater();
-               if (!initEsClient())
-                       return false;
-               Map<Table, File> files = createOutPutFiles(appConfigDir, exportFromEs);
-               if (files == null) {
-                       return false;
-               }
-               if (exportFromEs && filesEmpty(files)) {
-                       Map<Table, PrintWriter> printerWritersMap = createWriters(files);
-                       if (printerWritersMap == null) {
-                               return false;
-                       }
-                       try {
-                               ImmutableOpenMap<String, IndexMetaData> indexData = getIndexData();
-                               for (ObjectCursor<String> key : indexData.keys()) {
-                                       if ("resources".equalsIgnoreCase(key.value)) {
-                                               if (!exportArtifacts(key.value, printerWritersMap)) {
-                                                       return false;
-                                               }
-                                       } else if (key.value.startsWith("auditingevents")) {
-                                               if (!exportAudit(key.value, printerWritersMap)) {
-                                                       return false;
-                                               }
-                                       }
-                               }
-                       } finally {
-                               if (elasticSearchClient != null) {
-                                       elasticSearchClient.close();
-                               }
-                               for (PrintWriter writer : printerWritersMap.values()) {
-                                       writer.close();
-                               }
-                       }
-               }
-               if (importToCassandra && !importToCassndra(files)) {
-                       return false;
-               }
-
-               return true;
-       }
-
-       private void initFormater() {
-               simpleDateFormat = new SimpleDateFormat(dateFormatPattern);
-               simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-       }
-
-       private boolean initEsClient() {
-               String configHome = System.getProperty("config.home");
-               URL url = null;
-               Settings settings = null;
-               try {
-                       if (SystemUtils.IS_OS_WINDOWS) {
-                               url = new URL("file:///" + configHome + "/elasticsearch.yml");
-                       } else {
-                               url = new URL("file:" + configHome + "/elasticsearch.yml");
-                       }
-                       log.debug("URL {}", url);
-                       settings = Settings.settingsBuilder().loadFromPath(Paths.get(url.toURI())).build();
-               } catch (MalformedURLException | URISyntaxException e1) {
-                       log.error("Failed to create URL in order to load elasticsearch yml", e1);
-                       return true;
-               }
-
-               this.elasticSearchClient = new ElasticSearchClient();
-               this.elasticSearchClient.setClusterName(settings.get("cluster.name"));
-               this.elasticSearchClient.setLocal(settings.get("elasticSearch.local"));
-               this.elasticSearchClient.setTransportClient(settings.get("elasticSearch.transportclient"));
-               try {
-                       elasticSearchClient.initialize();
-               } catch (URISyntaxException e) {
-                       e.printStackTrace();
-                       return false;
-               }
-               return true;
-       }
-
-       /**
-        * the method clears all the cassandra tables
-        */
-       private void truncateCassandraTable() {
-               log.info("import failed. truncating Cassandra tables.");
-               artifactCassandraDao.deleteAllArtifacts();
-               auditCassandraDao.deleteAllAudit();
-       }
-
-       /**
-        * the method imports the records from the files into cassandra
-        * 
-        * @param files
-        *            a map of files holding
-        * @return true if the operation was successful
-        */
-       private boolean importToCassndra(Map<Table, File> files) {
-               log.info("starting to import date into Cassandra.");
-               if (!validtaTablsNotEmpty(files))
-                       return true;
-               for (Table table : files.keySet()) {
-                       log.info("importing recordes into {}", table.getTableDescription().getTableName());
-                       if (!handleImport(files, table)) {
-                               truncateCassandraTable();
-                               return false;
-                       }
-               }
-               log.info("finished to import date into Cassandra.");
-               return true;
-       }
-
-       private boolean validtaTablsNotEmpty(Map<Table, File> files) {
-               for (Table table : files.keySet()) {
-                       Either<Boolean, CassandraOperationStatus> isTableEmptyRes = checkIfTableIsEmpty(table);
-                       if (isTableEmptyRes.isRight() || !isTableEmptyRes.left().value()) {
-                               log.error("Cassandra table {} is not empty operation aborted.",
-                                               table.getTableDescription().getTableName());
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       /**
-        * the method retrieves the fields from the given map and praprs them for
-        * storage as an audit according to the table name
-        * 
-        * @param map
-        *            the map from which we will retrive the fields enum values
-        * @param table
-        *            the table we are going to store the record in.
-        * @return a enummap representing the audit record that is going to be
-        *         created.
-        */
-       private EnumMap<AuditingFieldsKeysEnum, Object> createAuditMap(Map<String, String> map, Table table) {
-               EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = new EnumMap<>(AuditingFieldsKeysEnum.class);
-               switch (table) {
-               case USER_ADMIN_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_AFTER, map.get("USER_AFTER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_BEFORE, map.get("USER_BEFORE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
-                       break;
-               case USER_ACCESS_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_UID, map.get("USER"));
-                       break;
-               case RESOURCE_ADMIN_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_INVARIANT_UUID, map.get("INVARIANT_UUID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_VERSION, map.get("PREV_VERSION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_STATE, map.get("PREV_STATE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DPREV_STATUS, map.get("DPREV_STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DCURR_STATUS, map.get("DCURR_STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TOSCA_NODE_TYPE, map.get("TOSCA_NODE_TYPE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_COMMENT, map.get("COMMENT"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ARTIFACT_DATA, map.get("ARTIFACT_DATA"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_PREV_ARTIFACT_UUID, map.get("PREV_ARTIFACT_UUID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CURR_ARTIFACT_UUID, map.get("CURR_ARTIFACT_UUID"));
-                       break;
-               case DISTRIBUTION_DOWNLOAD_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
-                       break;
-               case DISTRIBUTION_ENGINE_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       if (map.get("TOPIC_NAME") != null) {
-                               if (map.get("TOPIC_NAME").contains("-STATUS-")) {
-                                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
-                                                       map.get("TOPIC_NAME"));
-                               } else {
-                                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
-                                                       map.get("TOPIC_NAME"));
-                               }
-                       } else {
-                               auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
-                                               map.get("DSTATUS_TOPIC"));
-                               auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
-                                               map.get("DNOTIF_TOPIC"));
-                       }
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ROLE, map.get("ROLE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_API_KEY, map.get("API_KEY"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ENVRIONMENT_NAME, map.get("D_ENV"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
-                       break;
-               case DISTRIBUTION_NOTIFICATION_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
-                       break;
-               case DISTRIBUTION_STATUS_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TIME, map.get("STATUS_TIME"));
-                       break;
-               case DISTRIBUTION_DEPLOY_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
-                       break;
-               case DISTRIBUTION_GET_UEB_CLUSTER_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       if (map.get("STATUS_DESC") != null) {
-                               auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("STATUS_DESC"));
-                       } else {
-                               auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       }
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
-                       break;
-               case AUTH_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_USER, map.get("USER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_URL, map.get("URL"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_STATUS, map.get("AUTH_STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_REALM, map.get("REALM"));
-                       break;
-               case CONSUMER_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ECOMP_USER, map.get("ECOMP_USER"));
-                       break;
-               case CATEGORY_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CATEGORY_NAME, map.get("CATEGORY_NAME"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SUB_CATEGORY_NAME, map.get("SUB_CATEGORY_NAME"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_GROUPING_NAME, map.get("GROUPING_NAME"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
-                       break;
-               case GET_USERS_LIST_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DETAILS, map.get("DETAILS"));
-                       break;
-               case GET_CATEGORY_HIERARCHY_EVENT:
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
-                       auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DETAILS, map.get("DETAILS"));
-                       break;
-               default:
-                       auditingFields = null;
-                       break;
-               }
-               return auditingFields;
-       }
-
-       /**
-        * the method reads the content of the file intended for a given table, and
-        * sores them in cassandra
-        * 
-        * @param files
-        *            a map of files from which the recordes will be retrieved.
-        * @param table
-        *            the name of the table we want to look up in the files and sore
-        *            in Cassandra // * @param store the function to call when
-        *            storing recordes in cassndra
-        * @return true if the operation was successful
-        */
-       private boolean handleImport(Map<Table, File> files, Table table) {
-               BufferedReader br = null;
-               try {
-                       br = new BufferedReader(new FileReader(files.get(table)));
-                       String line = null;
-                       while ((line = br.readLine()) != null) {
-                               CassandraOperationStatus res = null;
-                               if (Table.ARTIFACT.equals(table)) {
-                                       res = artifactCassandraDao.saveArtifact(jsonMapper.readValue(line, ESArtifactData.class));
-                               } else {
-                                       Type type = new TypeToken<Map<String, String>>() {
-                                       }.getType();
-                                       Map<String, String> map = gson.fromJson(line, type);
-                                       EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = createAuditMap(map, table);
-                                       AuditingGenericEvent recordForCassandra = null;
-                                       try {
-                                               recordForCassandra = createAuditRecord(auditingFields);
-                                       } catch (ParseException e) {
-                                               log.error("filed to parse time stemp in recored {}", auditingFields);
-                                               return false;
-                                       }
-
-                                       res = auditCassandraDao.saveRecord(recordForCassandra);
-                               }
-                               if (!res.equals(CassandraOperationStatus.OK)) {
-                                       log.error("save recored to cassndra {} failed with status {} aborting.",
-                                                       table.getTableDescription().getTableName(), res);
-                                       return false;
-                               }
-                       }
-                       return true;
-               } catch (IOException e) {
-                       log.error("failed to read file", e);
-                       return false;
-               } finally {
-                       if (br != null) {
-                               try {
-                                       br.close();
-                               } catch (IOException e) {
-                                       log.error("failed to close file reader", e);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * the method checks if the given table is empty
-        * 
-        * @param table
-        *            the name of the table we want to check
-        * @return true if the table is empty
-        */
-       private Either<Boolean, CassandraOperationStatus> checkIfTableIsEmpty(Table table) {
-               if (Table.ARTIFACT.equals(table)) {
-                       return artifactCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
-               } else {
-                       return auditCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
-               }
-       }
-
-       private boolean filesEmpty(Map<Table, File> files) {
-               for (Table table : files.keySet()) {
-                       File file = files.get(table);
-                       if (file.length() != 0) {
-                               log.info("file:{} is not empty skipping export", table.getTableDescription().getTableName());
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       /**
-        * the method reads the records from es index of audit's into a file as
-        * json's.
-        * 
-        * @param value
-        *            the name of the index we want
-        * @param printerWritersMap
-        *            a map of the writers we use to write to a file.
-        * @return true in case the export was successful.
-        */
-       private boolean exportAudit(String value, Map<Table, PrintWriter> printerWritersMap) {
-               log.info("stratng to export audit data from es index{} to file.", value);
-               QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
-               SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(value).setScroll(new TimeValue(60000))
-                               .setQuery(queryBuilder).setSize(100).execute().actionGet();
-               while (true) {
-                       for (SearchHit hit : scrollResp.getHits().getHits()) {
-                               PrintWriter out = printerWritersMap.get(TypeToTableMapping.getTableByType(hit.getType()));
-                               out.println(hit.getSourceAsString());
-                       }
-                       scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
-                                       .setScroll(new TimeValue(60000)).execute().actionGet();
-                       if (scrollResp.getHits().getHits().length == 0) {
-                               break;
-
-                       }
-               }
-
-               log.info("export audit data from es to file. finished succsesfully");
-               return true;
-       }
-
-       /**
-        * the method reads the records from es index of resources into a file as
-        * json's.
-        *
-        * @param index
-        *            the name of the index we want to read
-        * @param printerWritersMap
-        *            a map of the writers we use to write to a file.
-        * @return true in case the export was successful.
-        */
-       private boolean exportArtifacts(String index, Map<Table, PrintWriter> printerWritersMap) {
-               log.info("stratng to export artifact data from es to file.");
-               PrintWriter out = printerWritersMap.get(Table.ARTIFACT);
-               QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
-               SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(index).setScroll(new TimeValue(60000))
-                               .setQuery(queryBuilder).setSize(100).execute().actionGet();
-               while (true) {
-                       for (SearchHit hit : scrollResp.getHits().getHits()) {
-                               ;
-                               out.println(hit.getSourceAsString());
-                       }
-                       scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
-                                       .setScroll(new TimeValue(60000)).execute().actionGet();
-                       if (scrollResp.getHits().getHits().length == 0) {
-                               break;
-
-                       }
-               }
-
-               log.info("export artifact data from es to file. finished succsesfully");
-               return true;
-       }
-
-       /**
-        * the method retrieves all the indexes from elasticsearch
-        * 
-        * @return a map of indexes and there metadata
-        */
-       private ImmutableOpenMap<String, IndexMetaData> getIndexData() {
-               return elasticSearchClient.getClient().admin().cluster().prepareState().get().getState().getMetaData()
-                               .getIndices();
-       }
-
-       /**
-        * the method creates all the files and dir which holds them. in case the
-        * files exist they will not be created again.
-        * 
-        * @param appConfigDir
-        *            the base path under which the output dir will be created and
-        *            the export result files the created filesa are named according
-        *            to the name of the table into which it will be imported.
-        * @param exportToEs
-        *            if true all the export files will be recreated
-        * @returnthe returns a map of tables and the files representing them them
-        */
-       private Map<Table, File> createOutPutFiles(String appConfigDir, boolean exportToEs) {
-               Map<Table, File> result = new EnumMap<Table, File>(Table.class);
-               File outputDir = new File(appConfigDir + "/output/");
-               if (!createOutPutFolder(outputDir)) {
-                       return null;
-               }
-               for (Table table : Table.values()) {
-                       File file = new File(outputDir + "/" + table.getTableDescription().getTableName());
-                       if (exportToEs) {
-                               try {
-                                       if (file.exists()) {
-                                               Files.delete(file.toPath());
-                                       }
-                               } catch (IOException e) {
-                                       log.error("failed to delete output file {}", file.getAbsolutePath(), e);
-                                       return null;
-                               }
-                               file = new File(outputDir + "/" + table.getTableDescription().getTableName());
-                       }
-                       if (!file.exists()) {
-                               try {
-                                       file.createNewFile();
-                               } catch (IOException e) {
-                                       log.error("failed to create output file {}", file.getAbsolutePath(), e);
-                                       return null;
-                               }
-                       }
-                       result.put(table, file);
-
-               }
-               return result;
-       }
-
-       /**
-        * the method create the writers to each file
-        * 
-        * @param files
-        *            a map of the files according to table
-        * @return returns a map of writers according to table.
-        */
-       private Map<Table, PrintWriter> createWriters(Map<Table, File> files) {
-               Map<Table, PrintWriter> printerWritersMap = new EnumMap<>(Table.class);
-               try {
-                       for (Table table : files.keySet()) {
-                               log.info("creating writer for {}", table);
-                               File file = files.get(table);
-                               FileWriter fw = new FileWriter(file, true);
-                               BufferedWriter bw = new BufferedWriter(fw);
-                               PrintWriter out = new PrintWriter(bw);
-                               printerWritersMap.put(table, out);
-                               log.info("creating writer for {} was successful", table);
-                       }
-               } catch (IOException e) {
-                       log.error("create writer to file failed", e);
-                       return null;
-               }
-               return printerWritersMap;
-       }
-
-       /**
-        * the method creates the output dir in case it does not exist
-        * 
-        * @param outputDir
-        *            the path under wich the directory will be created.
-        * @return true in case the create was succsesful or the dir already exists
-        */
-       private boolean createOutPutFolder(File outputDir) {
-               if (!outputDir.exists()) {
-                       log.info("creating output dir {}", outputDir.getAbsolutePath());
-                       try {
-                               Files.createDirectories(outputDir.toPath());
-                       } catch (IOException e) {
-                               log.error("failed to create output dir {}", outputDir.getAbsolutePath(), e);
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       public enum TypeToTableMapping {
-               USER_ADMIN_EVENT_TYPE(AuditingTypesConstants.USER_ADMIN_EVENT_TYPE,
-                               Table.USER_ADMIN_EVENT), USER_ACCESS_EVENT_TYPE(AuditingTypesConstants.USER_ACCESS_EVENT_TYPE,
-                                               Table.USER_ACCESS_EVENT), RESOURCE_ADMIN_EVENT_TYPE(
-                                                               AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE,
-                                                               Table.RESOURCE_ADMIN_EVENT), DISTRIBUTION_DOWNLOAD_EVENT_TYPE(
-                                                                               AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE,
-                                                                               Table.DISTRIBUTION_DOWNLOAD_EVENT), DISTRIBUTION_ENGINE_EVENT_TYPE(
-                                                                                               AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE,
-                                                                                               Table.DISTRIBUTION_ENGINE_EVENT), DISTRIBUTION_NOTIFICATION_EVENT_TYPE(
-                                                                                                               AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE,
-                                                                                                               Table.DISTRIBUTION_NOTIFICATION_EVENT), DISTRIBUTION_STATUS_EVENT_TYPE(
-                                                                                                                               AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE,
-                                                                                                                               Table.DISTRIBUTION_STATUS_EVENT), DISTRIBUTION_DEPLOY_EVENT_TYPE(
-                                                                                                                                               AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE,
-                                                                                                                                               Table.DISTRIBUTION_DEPLOY_EVENT), DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE(
-                                                                                                                                                               AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE,
-                                                                                                                                                               Table.DISTRIBUTION_GET_UEB_CLUSTER_EVENT), AUTH_EVENT_TYPE(
-                                                                                                                                                                               AuditingTypesConstants.AUTH_EVENT_TYPE,
-                                                                                                                                                                               Table.AUTH_EVENT), CONSUMER_EVENT_TYPE(
-                                                                                                                                                                                               AuditingTypesConstants.CONSUMER_EVENT_TYPE,
-                                                                                                                                                                                               Table.CONSUMER_EVENT), CATEGORY_EVENT_TYPE(
-                                                                                                                                                                                                               AuditingTypesConstants.CATEGORY_EVENT_TYPE,
-                                                                                                                                                                                                               Table.CATEGORY_EVENT), GET_USERS_LIST_EVENT_TYPE(
-                                                                                                                                                                                                                               AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE,
-                                                                                                                                                                                                                               Table.GET_USERS_LIST_EVENT), GET_CATEGORY_HIERARCHY_EVENT_TYPE(
-                                                                                                                                                                                                                                               AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE,
-                                                                                                                                                                                                                                               Table.GET_CATEGORY_HIERARCHY_EVENT);
-
-               String typeName;
-               Table table;
-
-               TypeToTableMapping(String typeName, Table table) {
-                       this.typeName = typeName;
-                       this.table = table;
-               }
-
-               public String getTypeName() {
-                       return typeName;
-               }
-
-               public Table getTable() {
-                       return table;
-               }
-
-               public static Table getTableByType(String type) {
-                       for (TypeToTableMapping mapping : TypeToTableMapping.values()) {
-                               if (mapping.getTypeName().equalsIgnoreCase(type)) {
-                                       return mapping.getTable();
-                               }
-                       }
-                       return null;
-               }
-       }
-
-       public static AuditingGenericEvent createAuditRecord(EnumMap<AuditingFieldsKeysEnum, Object> auditingFields)
-                       throws ParseException {
-               AuditingActionEnum actionEnum = AuditingActionEnum
-                               .getActionByName((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_ACTION));
-               String tableName = actionEnum.getAuditingEsType();
-               AuditingGenericEvent event = null;
-               Date date = null;
-               switch (tableName) {
-               case AuditingTypesConstants.USER_ADMIN_EVENT_TYPE:
-                       UserAdminEvent userAdminEvent = new UserAdminEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       userAdminEvent.setTimestamp1(date);
-                       event = userAdminEvent;
-                       break;
-               case AuditingTypesConstants.AUTH_EVENT_TYPE:
-                       AuthEvent authEvent = new AuthEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       authEvent.setTimestamp1(date);
-                       event = authEvent;
-                       break;
-               case AuditingTypesConstants.CATEGORY_EVENT_TYPE:
-                       CategoryEvent categoryEvent = new CategoryEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       categoryEvent.setTimestamp1(date);
-                       event = categoryEvent;
-                       break;
-               case AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE:
-                       ResourceAdminEvent resourceAdminEvent = new ResourceAdminEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       resourceAdminEvent.setTimestamp1(date);
-                       event = resourceAdminEvent;
-                       break;
-               case AuditingTypesConstants.USER_ACCESS_EVENT_TYPE:
-                       event = new UserAccessEvent(auditingFields);
-                       UserAccessEvent userAccessEvent = new UserAccessEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       userAccessEvent.setTimestamp1(date);
-                       event = userAccessEvent;
-                       break;
-               case AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE:
-                       DistributionStatusEvent distributionStatusEvent = new DistributionStatusEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       distributionStatusEvent.setTimestamp1(date);
-                       event = distributionStatusEvent;
-                       break;
-               case AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE:
-                       DistributionDownloadEvent distributionDownloadEvent = new DistributionDownloadEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       distributionDownloadEvent.setTimestamp1(date);
-                       event = distributionDownloadEvent;
-                       break;
-               case AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE:
-                       DistributionEngineEvent distributionEngineEvent = new DistributionEngineEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       distributionEngineEvent.setTimestamp1(date);
-                       event = distributionEngineEvent;
-                       break;
-               case AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE:
-                       DistributionNotificationEvent distributionNotificationEvent = new DistributionNotificationEvent(
-                                       auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       distributionNotificationEvent.setTimestamp1(date);
-                       event = distributionNotificationEvent;
-                       break;
-               case AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE:
-                       DistributionDeployEvent distributionDeployEvent = new DistributionDeployEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       distributionDeployEvent.setTimestamp1(date);
-                       event = distributionDeployEvent;
-                       break;
-               case AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE:
-                       AuditingGetUebClusterEvent auditingGetUebClusterEvent = new AuditingGetUebClusterEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       auditingGetUebClusterEvent.setTimestamp1(date);
-                       event = auditingGetUebClusterEvent;
-                       break;
-               case AuditingTypesConstants.CONSUMER_EVENT_TYPE:
-                       ConsumerEvent consumerEvent = new ConsumerEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       consumerEvent.setTimestamp1(date);
-                       event = consumerEvent;
-                       break;
-               case AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE:
-                       GetUsersListEvent getUsersListEvent = new GetUsersListEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       getUsersListEvent.setTimestamp1(date);
-                       event = getUsersListEvent;
-                       break;
-               case AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE:
-                       GetCategoryHierarchyEvent getCategoryHierarchyEvent = new GetCategoryHierarchyEvent(auditingFields);
-                       date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
-                       getCategoryHierarchyEvent.setTimestamp1(date);
-                       event = getCategoryHierarchyEvent;
-                       break;
-
-               }
-               return event;
-       }
+    private Gson gson = new Gson();
+
+    private ObjectMapper jsonMapper = new ObjectMapper();
+
+    private static Logger log = LoggerFactory.getLogger(DataMigration.class.getName());
+
+    private ElasticSearchClient elasticSearchClient;
+
+    @Autowired
+    protected AuditCassandraDao auditCassandraDao;
+    @Autowired
+    protected ArtifactCassandraDao artifactCassandraDao;
+
+    private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS z";
+    private SimpleDateFormat simpleDateFormat;
+
+    /**
+     * the method exports and imports the records from ES to cassandra the flow
+     * will check to see if the files are not empty if the files are not empty
+     * the export will be skiped and the flow will use the existing files. the
+     * flow will check if the tables in cassandra are empty, if the tables are
+     * not empty the proces will stop and exit. if the tables are empty the
+     * method will import the records from the files. in case of a fail the flow
+     * will exit and clear all the Cassandra tables.
+     *
+     * @param appConfigDir
+     *            the location of the dir in wich the output files will be
+     *            stored
+     * @param exportFromEs
+     *            should the es be exported again and overwrite the old export
+     * @param importToCassandra
+     *            should we import the data into cassandra
+     * @return true in case the operation was successful.
+     */
+    public boolean migrateDataEsToCassandra(String appConfigDir, boolean exportFromEs, boolean importToCassandra) {
+        initFormater();
+        if (!initEsClient()) {
+            return false;
+        }
+        Map<Table, File> files = createOutPutFiles(appConfigDir, exportFromEs);
+        if (files == null) {
+            return false;
+        }
+        if (exportFromEs && filesEmpty(files)) {
+            Map<Table, PrintWriter> printerWritersMap = createWriters(files);
+            if (printerWritersMap == null) {
+                return false;
+            }
+            try {
+                ImmutableOpenMap<String, IndexMetaData> indexData = getIndexData();
+                for (ObjectCursor<String> key : indexData.keys()) {
+                    if (("resources".equalsIgnoreCase(key.value) && !exportArtifacts(key.value, printerWritersMap))
+                        || (key.value.startsWith("auditingevents") && !exportAudit(key.value, printerWritersMap))) {
+                        return false;
+                    }
+                }
+            } finally {
+                if (elasticSearchClient != null) {
+                    elasticSearchClient.close();
+                }
+                for (PrintWriter writer : printerWritersMap.values()) {
+                    writer.close();
+                }
+            }
+        }
+        return !importToCassandra || importToCassndra(files);
+    }
+
+    private void initFormater() {
+        simpleDateFormat = new SimpleDateFormat(DATE_FORMAT_PATTERN);
+        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    }
+
+    private boolean initEsClient() {
+        String configHome = System.getProperty("config.home");
+        URL url;
+        Settings settings;
+        try {
+            if (SystemUtils.IS_OS_WINDOWS) {
+                url = new URL("file:///" + configHome + "/elasticsearch.yml");
+            } else {
+                url = new URL("file:" + configHome + "/elasticsearch.yml");
+            }
+            log.debug("URL {}", url);
+            settings = Settings.settingsBuilder().loadFromPath(Paths.get(url.toURI())).build();
+        } catch (MalformedURLException | URISyntaxException e1) {
+            log.error("Failed to create URL in order to load elasticsearch yml", e1);
+            return true;
+        }
+
+        this.elasticSearchClient = new ElasticSearchClient();
+        this.elasticSearchClient.setClusterName(settings.get("cluster.name"));
+        this.elasticSearchClient.setLocal(settings.get("elasticSearch.local"));
+        this.elasticSearchClient.setTransportClient(settings.get("elasticSearch.transportclient"));
+        try {
+            elasticSearchClient.initialize();
+        } catch (URISyntaxException e) {
+            log.error("Failed to initialize elasticSearchClient", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * the method clears all the cassandra tables.
+     */
+    private void truncateCassandraTable() {
+        log.info("import failed. truncating Cassandra tables.");
+        artifactCassandraDao.deleteAllArtifacts();
+        auditCassandraDao.deleteAllAudit();
+    }
+
+    /**
+     * the method imports the records from the files into cassandra.
+     *
+     * @param files
+     *            a map of files holding
+     * @return true if the operation was successful
+     */
+    private boolean importToCassndra(Map<Table, File> files) {
+        log.info("starting to import date into Cassandra.");
+        if (!validtaTablsNotEmpty(files)) {
+            return true;
+        }
+        for (Table table : files.keySet()) {
+            log.info("importing recordes into {}", table.getTableDescription().getTableName());
+            if (!handleImport(files, table)) {
+                truncateCassandraTable();
+                return false;
+            }
+        }
+        log.info("finished to import date into Cassandra.");
+        return true;
+    }
+
+    private boolean validtaTablsNotEmpty(Map<Table, File> files) {
+        for (Table table : files.keySet()) {
+            Either<Boolean, CassandraOperationStatus> isTableEmptyRes = checkIfTableIsEmpty(table);
+            if (isTableEmptyRes.isRight() || !isTableEmptyRes.left().value()) {
+                log.error("Cassandra table {} is not empty operation aborted.",
+                    table.getTableDescription().getTableName());
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * the method retrieves the fields from the given map and praprs them for
+     * storage as an audit according to the table name
+     *
+     * @param map
+     *            the map from which we will retrive the fields enum values
+     * @param table
+     *            the table we are going to store the record in.
+     * @return a enummap representing the audit record that is going to be
+     *         created.
+     */
+    private EnumMap<AuditingFieldsKeysEnum, Object> createAuditMap(Map<String, String> map, Table table) {
+        EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = new EnumMap<>(AuditingFieldsKeysEnum.class);
+        switch (table) {
+            case USER_ADMIN_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_AFTER, map.get("USER_AFTER"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_BEFORE, map.get("USER_BEFORE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+                break;
+            case USER_ACCESS_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_UID, map.get("USER"));
+                break;
+            case RESOURCE_ADMIN_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_INVARIANT_UUID, map.get("INVARIANT_UUID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_VERSION, map.get("PREV_VERSION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_STATE, map.get("PREV_STATE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DPREV_STATUS, map.get("DPREV_STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DCURR_STATUS, map.get("DCURR_STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TOSCA_NODE_TYPE, map.get("TOSCA_NODE_TYPE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_COMMENT, map.get("COMMENT"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ARTIFACT_DATA, map.get("ARTIFACT_DATA"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_PREV_ARTIFACT_UUID, map.get("PREV_ARTIFACT_UUID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CURR_ARTIFACT_UUID, map.get("CURR_ARTIFACT_UUID"));
+                break;
+            case DISTRIBUTION_DOWNLOAD_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
+                break;
+            case DISTRIBUTION_ENGINE_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                if (map.get("TOPIC_NAME") != null) {
+                    if (map.get("TOPIC_NAME").contains("-STATUS-")) {
+                        auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
+                            map.get("TOPIC_NAME"));
+                    } else {
+                        auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
+                            map.get("TOPIC_NAME"));
+                    }
+                } else {
+                    auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
+                        map.get("DSTATUS_TOPIC"));
+                    auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
+                        map.get("DNOTIF_TOPIC"));
+                }
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ROLE, map.get("ROLE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_API_KEY, map.get("API_KEY"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ENVRIONMENT_NAME, map.get("D_ENV"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
+                break;
+            case DISTRIBUTION_NOTIFICATION_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
+                break;
+            case DISTRIBUTION_STATUS_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TIME, map.get("STATUS_TIME"));
+                break;
+            case DISTRIBUTION_DEPLOY_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
+                break;
+            case DISTRIBUTION_GET_UEB_CLUSTER_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                if (map.get("STATUS_DESC") != null) {
+                    auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("STATUS_DESC"));
+                } else {
+                    auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                }
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
+                break;
+            case AUTH_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_USER, map.get("USER"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_URL, map.get("URL"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_STATUS, map.get("AUTH_STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_REALM, map.get("REALM"));
+                break;
+            case CONSUMER_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ECOMP_USER, map.get("ECOMP_USER"));
+                break;
+            case CATEGORY_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CATEGORY_NAME, map.get("CATEGORY_NAME"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SUB_CATEGORY_NAME, map.get("SUB_CATEGORY_NAME"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_GROUPING_NAME, map.get("GROUPING_NAME"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
+                break;
+            case GET_USERS_LIST_EVENT:
+            case GET_CATEGORY_HIERARCHY_EVENT:
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
+                auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DETAILS, map.get("DETAILS"));
+                break;
+            default:
+                auditingFields = null;
+                break;
+        }
+        return auditingFields;
+    }
+
+    /**
+     * the method reads the content of the file intended for a given table, and
+     * sores them in cassandra
+     *
+     * @param files
+     *            a map of files from which the recordes will be retrieved.
+     * @param table
+     *            the name of the table we want to look up in the files and sore
+     *            in Cassandra // * @param store the function to call when
+     *            storing recordes in cassndra
+     * @return true if the operation was successful
+     */
+    private boolean handleImport(Map<Table, File> files, Table table) {
+        try (BufferedReader br = new BufferedReader(new FileReader(files.get(table)))) {
+            String line;
+            while ((line = br.readLine()) != null) {
+                CassandraOperationStatus res;
+                if (Table.ARTIFACT.equals(table)) {
+                    res = artifactCassandraDao.saveArtifact(jsonMapper.readValue(line, ESArtifactData.class));
+                } else {
+                    Type type = new TypeToken<Map<String, String>>() {}.getType();
+                    Map<String, String> map = gson.fromJson(line, type);
+                    EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = createAuditMap(map, table);
+                    AuditingGenericEvent recordForCassandra;
+                    try {
+                        recordForCassandra = createAuditRecord(auditingFields);
+                    } catch (ParseException e) {
+                        log.error("filed to parse time stemp in recored {}", auditingFields);
+                        return false;
+                    }
+                    res = auditCassandraDao.saveRecord(recordForCassandra);
+                }
+                if (!res.equals(CassandraOperationStatus.OK)) {
+                    log.error("save recored to cassndra {} failed with status {} aborting.",
+                        table.getTableDescription().getTableName(), res);
+                    return false;
+                }
+            }
+            return true;
+        } catch (IOException e) {
+            log.error("failed to read file", e);
+            return false;
+        }
+    }
+
+    /**
+     * the method checks if the given table is empty.
+     *
+     * @param table
+     *            the name of the table we want to check
+     * @return true if the table is empty
+     */
+    private Either<Boolean, CassandraOperationStatus> checkIfTableIsEmpty(Table table) {
+        if (Table.ARTIFACT.equals(table)) {
+            return artifactCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
+        } else {
+            return auditCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
+        }
+    }
+
+    private boolean filesEmpty(Map<Table, File> files) {
+        for (Entry<Table, File> entry : files.entrySet()) {
+            File file = entry.getValue();
+            if (file.length() != 0) {
+                log.info("file:{} is not empty skipping export", entry.getKey().getTableDescription().getTableName());
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * the method reads the records from es index of audit's into a file as
+     * json's.
+     *
+     * @param value
+     *            the name of the index we want
+     * @param printerWritersMap
+     *            a map of the writers we use to write to a file.
+     * @return true in case the export was successful.
+     */
+    private boolean exportAudit(String value, Map<Table, PrintWriter> printerWritersMap) {
+        log.info("stratng to export audit data from es index{} to file.", value);
+        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
+        SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(value).setScroll(new TimeValue(60000))
+            .setQuery(queryBuilder).setSize(100).execute().actionGet();
+        while (true) {
+            for (SearchHit hit : scrollResp.getHits().getHits()) {
+                PrintWriter out = printerWritersMap.get(TypeToTableMapping.getTableByType(hit.getType()));
+                out.println(hit.getSourceAsString());
+            }
+            scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
+                .setScroll(new TimeValue(60000)).execute().actionGet();
+            if (scrollResp.getHits().getHits().length == 0) {
+                break;
+
+            }
+        }
+        log.info("export audit data from es to file. finished succsesfully");
+        return true;
+    }
+
+    /**
+     * the method reads the records from es index of resources into a file as
+     * json's.
+     *
+     * @param index
+     *            the name of the index we want to read
+     * @param printerWritersMap
+     *            a map of the writers we use to write to a file.
+     * @return true in case the export was successful.
+     */
+    private boolean exportArtifacts(String index, Map<Table, PrintWriter> printerWritersMap) {
+        log.info("stratng to export artifact data from es to file.");
+        PrintWriter out = printerWritersMap.get(Table.ARTIFACT);
+        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
+        SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(index).setScroll(new TimeValue(60000))
+            .setQuery(queryBuilder).setSize(100).execute().actionGet();
+        while (true) {
+            for (SearchHit hit : scrollResp.getHits().getHits()) {
+                out.println(hit.getSourceAsString());
+            }
+            scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
+                .setScroll(new TimeValue(60000)).execute().actionGet();
+            if (scrollResp.getHits().getHits().length == 0) {
+                break;
+            }
+        }
+        log.info("export artifact data from es to file. finished succsesfully");
+        return true;
+    }
+
+    /**
+     * the method retrieves all the indexes from elasticsearch.
+     *
+     * @return a map of indexes and there metadata
+     */
+    private ImmutableOpenMap<String, IndexMetaData> getIndexData() {
+        return elasticSearchClient.getClient().admin().cluster().prepareState().get().getState().getMetaData()
+            .getIndices();
+    }
+
+    /**
+     * the method creates all the files and dir which holds them. in case the
+     * files exist they will not be created again.
+     *
+     * @param appConfigDir
+     *            the base path under which the output dir will be created and
+     *            the export result files the created filesa are named according
+     *            to the name of the table into which it will be imported.
+     * @param exportToEs
+     *            if true all the export files will be recreated
+     * @return returns a map of tables and the files representing them them
+     */
+    private Map<Table, File> createOutPutFiles(String appConfigDir, boolean exportToEs) {
+        Map<Table, File> result = new EnumMap<>(Table.class);
+        File outputDir = new File(appConfigDir + "/output/");
+        if (!createOutPutFolder(outputDir)) {
+            return null;
+        }
+        for (Table table : Table.values()) {
+            File file = new File(outputDir + "/" + table.getTableDescription().getTableName());
+            if (exportToEs) {
+                try {
+                    if (file.exists()) {
+                        Files.delete(file.toPath());
+                    }
+                } catch (IOException e) {
+                    log.error("failed to delete output file {}", file.getAbsolutePath(), e);
+                    return null;
+                }
+                file = new File(outputDir + "/" + table.getTableDescription().getTableName());
+            }
+            if (!file.exists()) {
+                try {
+                    file.createNewFile();
+                } catch (IOException e) {
+                    log.error("failed to create output file {}", file.getAbsolutePath(), e);
+                    return null;
+                }
+            }
+            result.put(table, file);
+        }
+        return result;
+    }
+
+    /**
+     * the method create the writers to each file
+     *
+     * @param files
+     *            a map of the files according to table
+     * @return returns a map of writers according to table.
+     */
+    private Map<Table, PrintWriter> createWriters(Map<Table, File> files) {
+        Map<Table, PrintWriter> printerWritersMap = new EnumMap<>(Table.class);
+        try {
+            for (Entry<Table, File> entry : files.entrySet()) {
+                log.info("creating writer for {}", entry.getKey());
+                File file = entry.getValue();
+                FileWriter fw = new FileWriter(file, true);
+                BufferedWriter bw = new BufferedWriter(fw);
+                PrintWriter out = new PrintWriter(bw);
+                printerWritersMap.put(entry.getKey(), out);
+                log.info("creating writer for {} was successful", entry.getKey());
+            }
+        } catch (IOException e) {
+            log.error("create writer to file failed", e);
+            return null;
+        }
+        return printerWritersMap;
+    }
+
+    /**
+     * the method creates the output dir in case it does not exist
+     *
+     * @param outputDir
+     *            the path under wich the directory will be created.
+     * @return true in case the create was succsesful or the dir already exists
+     */
+    private boolean createOutPutFolder(File outputDir) {
+        if (!outputDir.exists()) {
+            log.info("creating output dir {}", outputDir.getAbsolutePath());
+            try {
+                Files.createDirectories(outputDir.toPath());
+            } catch (IOException e) {
+                log.error("failed to create output dir {}", outputDir.getAbsolutePath(), e);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public enum TypeToTableMapping {
+        USER_ADMIN_EVENT_TYPE(AuditingTypesConstants.USER_ADMIN_EVENT_TYPE,
+            Table.USER_ADMIN_EVENT), USER_ACCESS_EVENT_TYPE(AuditingTypesConstants.USER_ACCESS_EVENT_TYPE,
+            Table.USER_ACCESS_EVENT), RESOURCE_ADMIN_EVENT_TYPE(
+            AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE,
+            Table.RESOURCE_ADMIN_EVENT), DISTRIBUTION_DOWNLOAD_EVENT_TYPE(
+            AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE,
+            Table.DISTRIBUTION_DOWNLOAD_EVENT), DISTRIBUTION_ENGINE_EVENT_TYPE(
+            AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE,
+            Table.DISTRIBUTION_ENGINE_EVENT), DISTRIBUTION_NOTIFICATION_EVENT_TYPE(
+            AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE,
+            Table.DISTRIBUTION_NOTIFICATION_EVENT), DISTRIBUTION_STATUS_EVENT_TYPE(
+            AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE,
+            Table.DISTRIBUTION_STATUS_EVENT), DISTRIBUTION_DEPLOY_EVENT_TYPE(
+            AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE,
+            Table.DISTRIBUTION_DEPLOY_EVENT), DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE(
+            AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE,
+            Table.DISTRIBUTION_GET_UEB_CLUSTER_EVENT), AUTH_EVENT_TYPE(
+            AuditingTypesConstants.AUTH_EVENT_TYPE,
+            Table.AUTH_EVENT), CONSUMER_EVENT_TYPE(
+            AuditingTypesConstants.CONSUMER_EVENT_TYPE,
+            Table.CONSUMER_EVENT), CATEGORY_EVENT_TYPE(
+            AuditingTypesConstants.CATEGORY_EVENT_TYPE,
+            Table.CATEGORY_EVENT), GET_USERS_LIST_EVENT_TYPE(
+            AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE,
+            Table.GET_USERS_LIST_EVENT), GET_CATEGORY_HIERARCHY_EVENT_TYPE(
+            AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE,
+            Table.GET_CATEGORY_HIERARCHY_EVENT);
+
+        String typeName;
+        Table table;
+
+        TypeToTableMapping(String typeName, Table table) {
+            this.typeName = typeName;
+            this.table = table;
+        }
+
+        public String getTypeName() {
+            return typeName;
+        }
+
+        public Table getTable() {
+            return table;
+        }
+
+        public static Table getTableByType(String type) {
+            for (TypeToTableMapping mapping : TypeToTableMapping.values()) {
+                if (mapping.getTypeName().equalsIgnoreCase(type)) {
+                    return mapping.getTable();
+                }
+            }
+            return null;
+        }
+    }
+
+    public AuditingGenericEvent createAuditRecord(EnumMap<AuditingFieldsKeysEnum, Object> auditingFields)
+        throws ParseException {
+        AuditingActionEnum actionEnum = AuditingActionEnum
+            .getActionByName((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_ACTION));
+        String tableName = actionEnum.getAuditingEsType();
+        AuditingGenericEvent event;
+        Date date;
+        switch (tableName) {
+            case AuditingTypesConstants.USER_ADMIN_EVENT_TYPE:
+                UserAdminEvent userAdminEvent = new UserAdminEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                userAdminEvent.setTimestamp1(date);
+                event = userAdminEvent;
+                break;
+            case AuditingTypesConstants.AUTH_EVENT_TYPE:
+                AuthEvent authEvent = new AuthEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                authEvent.setTimestamp1(date);
+                event = authEvent;
+                break;
+            case AuditingTypesConstants.CATEGORY_EVENT_TYPE:
+                CategoryEvent categoryEvent = new CategoryEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                categoryEvent.setTimestamp1(date);
+                event = categoryEvent;
+                break;
+            case AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE:
+                ResourceAdminEvent resourceAdminEvent = new ResourceAdminEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                resourceAdminEvent.setTimestamp1(date);
+                event = resourceAdminEvent;
+                break;
+            case AuditingTypesConstants.USER_ACCESS_EVENT_TYPE:
+                UserAccessEvent userAccessEvent = new UserAccessEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                userAccessEvent.setTimestamp1(date);
+                event = userAccessEvent;
+                break;
+            case AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE:
+                DistributionStatusEvent distributionStatusEvent = new DistributionStatusEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                distributionStatusEvent.setTimestamp1(date);
+                event = distributionStatusEvent;
+                break;
+            case AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE:
+                DistributionDownloadEvent distributionDownloadEvent = new DistributionDownloadEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                distributionDownloadEvent.setTimestamp1(date);
+                event = distributionDownloadEvent;
+                break;
+            case AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE:
+                DistributionEngineEvent distributionEngineEvent = new DistributionEngineEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                distributionEngineEvent.setTimestamp1(date);
+                event = distributionEngineEvent;
+                break;
+            case AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE:
+                DistributionNotificationEvent distributionNotificationEvent = new DistributionNotificationEvent(
+                    auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                distributionNotificationEvent.setTimestamp1(date);
+                event = distributionNotificationEvent;
+                break;
+            case AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE:
+                DistributionDeployEvent distributionDeployEvent = new DistributionDeployEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                distributionDeployEvent.setTimestamp1(date);
+                event = distributionDeployEvent;
+                break;
+            case AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE:
+                AuditingGetUebClusterEvent auditingGetUebClusterEvent = new AuditingGetUebClusterEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                auditingGetUebClusterEvent.setTimestamp1(date);
+                event = auditingGetUebClusterEvent;
+                break;
+            case AuditingTypesConstants.CONSUMER_EVENT_TYPE:
+                ConsumerEvent consumerEvent = new ConsumerEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                consumerEvent.setTimestamp1(date);
+                event = consumerEvent;
+                break;
+            case AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE:
+                GetUsersListEvent getUsersListEvent = new GetUsersListEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                getUsersListEvent.setTimestamp1(date);
+                event = getUsersListEvent;
+                break;
+            case AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE:
+                GetCategoryHierarchyEvent getCategoryHierarchyEvent = new GetCategoryHierarchyEvent(auditingFields);
+                date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
+                getCategoryHierarchyEvent.setTimestamp1(date);
+                event = getCategoryHierarchyEvent;
+                break;
+            default:
+                event = null;
+                break;
+        }
+        return event;
+    }
 
 }
index d563f2c..0bac1df 100644 (file)
@@ -55,7 +55,7 @@ public class EsToCassandraDataMigrationMenu {
                        case "es-to-cassndra-migration":
                                dataMigration = (DataMigration) context.getBean("DataMigrationBean");
                                log.debug("Start migration from ES to C* ");
-                               if (dataMigration.migrateDataESToCassndra(appConfigDir, true, true)) {
+                               if (dataMigration.migrateDataEsToCassandra(appConfigDir, true, true)) {
                                        log.debug("migration from ES to C* was finished successfull");
                                        System.exit(0);
                                } else {
@@ -66,7 +66,7 @@ public class EsToCassandraDataMigrationMenu {
                        case "es-to-cassndra-migration-export-only":
                                dataMigration = (DataMigration) context.getBean("DataMigrationBean");
                                log.debug("Start migration export only from ES to C* ");
-                               if (dataMigration.migrateDataESToCassndra(appConfigDir, true, false)) {
+                               if (dataMigration.migrateDataEsToCassandra(appConfigDir, true, false)) {
                                        log.debug("migration export only from ES to C* was finished successfull");
                                        System.exit(0);
                                } else {
@@ -77,7 +77,7 @@ public class EsToCassandraDataMigrationMenu {
                        case "es-to-cassndra-migration-import-only":
                                dataMigration = (DataMigration) context.getBean("DataMigrationBean");
                                log.debug("Start migration import only from ES to C* ");
-                               if (dataMigration.migrateDataESToCassndra(appConfigDir, false, true)) {
+                               if (dataMigration.migrateDataEsToCassandra(appConfigDir, false, true)) {
                                        log.debug("migration import only from ES to C* was finished successfull");
                                        System.exit(0);
                                } else {