re base code
[sdc.git] / asdctool / src / main / java / org / openecomp / sdc / asdctool / impl / DataMigration.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.asdctool.impl;
22
23
24 import com.carrotsearch.hppc.cursors.ObjectCursor;
25 import com.fasterxml.jackson.core.type.TypeReference;
26 import com.fasterxml.jackson.databind.ObjectMapper;
27 import fj.data.Either;
28 import org.apache.commons.lang.SystemUtils;
29 import org.elasticsearch.action.search.SearchResponse;
30 import org.elasticsearch.cluster.metadata.IndexMetaData;
31 import org.elasticsearch.common.collect.ImmutableOpenMap;
32 import org.elasticsearch.common.settings.Settings;
33 import org.elasticsearch.common.unit.TimeValue;
34 import org.elasticsearch.index.query.QueryBuilder;
35 import org.elasticsearch.index.query.QueryBuilders;
36 import org.elasticsearch.search.SearchHit;
37 import org.openecomp.sdc.be.auditing.api.AuditEventFactory;
38 import org.openecomp.sdc.be.auditing.impl.AuditAuthRequestEventFactory;
39 import org.openecomp.sdc.be.auditing.impl.AuditConsumerEventFactory;
40 import org.openecomp.sdc.be.auditing.impl.AuditGetUebClusterEventFactory;
41 import org.openecomp.sdc.be.auditing.impl.category.AuditCategoryEventFactory;
42 import org.openecomp.sdc.be.auditing.impl.category.AuditGetCategoryHierarchyEventFactory;
43 import org.openecomp.sdc.be.auditing.impl.distribution.*;
44 import org.openecomp.sdc.be.auditing.impl.resourceadmin.AuditResourceAdminEventMigrationFactory;
45 import org.openecomp.sdc.be.auditing.impl.usersadmin.AuditGetUsersListEventFactory;
46 import org.openecomp.sdc.be.auditing.impl.usersadmin.AuditUserAccessEventFactory;
47 import org.openecomp.sdc.be.auditing.impl.usersadmin.AuditUserAdminEventFactory;
48 import org.openecomp.sdc.be.dao.cassandra.ArtifactCassandraDao;
49 import org.openecomp.sdc.be.dao.cassandra.AuditCassandraDao;
50 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
51 import org.openecomp.sdc.be.dao.cassandra.schema.Table;
52 import org.openecomp.sdc.be.dao.es.ElasticSearchClient;
53 import org.openecomp.sdc.be.resources.data.ESArtifactData;
54 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
55 import org.openecomp.sdc.be.resources.data.auditing.AuditingGenericEvent;
56 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
57 import org.openecomp.sdc.be.resources.data.auditing.model.*;
58 import org.openecomp.sdc.common.datastructure.AuditingFieldsKey;
59 import org.openecomp.sdc.common.log.wrappers.Logger;
60 import org.springframework.beans.factory.annotation.Autowired;
61
62 import java.io.*;
63 import java.net.MalformedURLException;
64 import java.net.URISyntaxException;
65 import java.net.URL;
66 import java.nio.file.Files;
67 import java.nio.file.Paths;
68 import java.util.EnumMap;
69 import java.util.Map;
70
71 /**
72  * Created by mlando on 5/16/2016.
73  */
74 public class DataMigration {
75
76         private ObjectMapper jsonMapper = new ObjectMapper();
77
78         private static Logger log = Logger.getLogger(DataMigration.class.getName());
79
80         private ElasticSearchClient elasticSearchClient;
81         @Autowired
82         private AuditCassandraDao auditCassandraDao;
83         @Autowired
84         private ArtifactCassandraDao artifactCassandraDao;
85
86     /**
87          * the method exports and imports the records from ES to cassandra the flow
88          * will check to see if the files are not empty if the files are not empty
89          * the export will be skiped and the flow will use the existing files. the
90          * flow will check if the tables in cassandra are empty, if the tables are
91          * not empty the proces will stop and exit. if the tables are empty the
92          * method will import the records from the files. in case of a fail the flow
93          * will exit and clear all the Cassandra tables.
94          *
95          * @param appConfigDir
96          *            the location of the dir in wich the output files will be
97          *            stored
98          * @param exportFromEs
99          *            should the es be exported again and overwrite the old export
100          * @param importToCassandra
101          *            should we import the data into cassandra
102          * @return true in case the operation was successful.
103          */
104         public boolean migrateDataESToCassndra(String appConfigDir, boolean exportFromEs, boolean importToCassandra) {
105                 if (!initEsClient()) {
106                         return false;
107                 }
108                 Map<Table, File> files = createOutPutFiles(appConfigDir, exportFromEs);
109                 if (files == null) {
110                         return false;
111                 }
112                 if (exportFromEs && filesEmpty(files)) {
113                         Map<Table, PrintWriter> printerWritersMap = createWriters(files);
114                         if (printerWritersMap == null) {
115                                 return false;
116                         }
117                         try {
118                                 ImmutableOpenMap<String, IndexMetaData> indexData = getIndexData();
119                                 for (ObjectCursor<String> key : indexData.keys()) {
120                                         if (("resources".equalsIgnoreCase(key.value) || key.value.startsWith("auditingevents"))
121                         && !exportArtifacts(key.value, printerWritersMap)) {
122                         return false;
123                     }
124                                 }
125                         } finally {
126                                 if (elasticSearchClient != null) {
127                                         elasticSearchClient.close();
128                                 }
129                                 for (PrintWriter writer : printerWritersMap.values()) {
130                                         writer.close();
131                                 }
132                         }
133                 }
134                 if (importToCassandra && !importToCassndra(files)) {
135                         return false;
136                 }
137
138                 return true;
139         }
140
141         private boolean initEsClient() {
142                 String configHome = System.getProperty("config.home");
143                 URL url = null;
144                 Settings settings = null;
145                 try {
146                         if (SystemUtils.IS_OS_WINDOWS) {
147                                 url = new URL("file:///" + configHome + "/elasticsearch.yml");
148                         } else {
149                                 url = new URL("file:" + configHome + "/elasticsearch.yml");
150                         }
151                         log.debug("URL {}", url);
152                         settings = Settings.settingsBuilder().loadFromPath(Paths.get(url.toURI())).build();
153                 } catch (MalformedURLException | URISyntaxException e1) {
154                         log.error("Failed to create URL in order to load elasticsearch yml", e1);
155                         return true;
156                 }
157
158                 this.elasticSearchClient = new ElasticSearchClient();
159                 this.elasticSearchClient.setClusterName(settings.get("cluster.name"));
160                 this.elasticSearchClient.setLocal(settings.get("elasticSearch.local"));
161                 this.elasticSearchClient.setTransportClient(settings.get("elasticSearch.transportclient"));
162                 try {
163                         elasticSearchClient.initialize();
164                 } catch (URISyntaxException e) {
165                         e.printStackTrace();
166                         return false;
167                 }
168                 return true;
169         }
170
171         /**
172          * the method clears all the cassandra tables
173          */
174         private void truncateCassandraTable() {
175                 log.info("import failed. truncating Cassandra tables.");
176                 artifactCassandraDao.deleteAllArtifacts();
177                 auditCassandraDao.deleteAllAudit();
178         }
179
180         /**
181          * the method imports the records from the files into cassandra
182          * 
183          * @param files
184          *            a map of files holding
185          * @return true if the operation was successful
186          */
187         private boolean importToCassndra(Map<Table, File> files) {
188                 log.info("starting to import date into Cassandra.");
189                 if (!validtaTablsNotEmpty(files))
190                         return true;
191                 for (Table table : files.keySet()) {
192                         log.info("importing recordes into {}", table.getTableDescription().getTableName());
193                         if (!handleImport(files, table)) {
194                                 truncateCassandraTable();
195                                 return false;
196                         }
197                 }
198                 log.info("finished to import date into Cassandra.");
199                 return true;
200         }
201
202         private boolean validtaTablsNotEmpty(Map<Table, File> files) {
203                 for (Table table : files.keySet()) {
204                         Either<Boolean, CassandraOperationStatus> isTableEmptyRes = checkIfTableIsEmpty(table);
205                         if (isTableEmptyRes.isRight() || !isTableEmptyRes.left().value()) {
206                                 log.error("Cassandra table {} is not empty operation aborted.",
207                                                 table.getTableDescription().getTableName());
208                                 return false;
209                         }
210                 }
211                 return true;
212         }
213
214         /**
215          * the method retrieves the fields from the given map and generates
216      * corresponding audit event according to the table name
217          * 
218          * @param map
219          *            the map from which we will retrive the fields enum values
220          * @param table
221          *            the table we are going to store the record in.
222          * @return an AuditingGenericEvent event representing the audit record that is going to be
223          *         created.
224          */
225         AuditingGenericEvent createAuditEvent(Map<AuditingFieldsKey, String> map, Table table) {
226                 AuditEventFactory factory = null;
227                 switch (table) {
228                         case USER_ADMIN_EVENT:
229                                 factory = getAuditUserAdminEventFactory(map);
230                                 break;
231                         case USER_ACCESS_EVENT:
232                                 factory = getAuditUserAccessEventFactory(map);
233                                 break;
234                         case RESOURCE_ADMIN_EVENT:
235                                 factory = getAuditResourceAdminEventMigrationFactory(map);
236                                 break;
237                         case DISTRIBUTION_DOWNLOAD_EVENT:
238                                 factory = getAuditDistributionDownloadEventFactory(map);
239                                 break;
240                         case DISTRIBUTION_ENGINE_EVENT:
241                                 factory = getAuditDistributionEngineEventMigrationFactory(map);
242                                 break;
243                         case DISTRIBUTION_NOTIFICATION_EVENT:
244                                 factory = getAuditDistributionNotificationEventFactory(map);
245                                 break;
246                         case DISTRIBUTION_STATUS_EVENT:
247                                 factory = getAuditDistributionStatusEventFactory(map);
248                                 break;
249                         case DISTRIBUTION_DEPLOY_EVENT:
250                                 factory = getAuditDistributionDeployEventFactory(map);
251                                 break;
252                         case DISTRIBUTION_GET_UEB_CLUSTER_EVENT:
253                                 factory = getAuditGetUebClusterEventFactory(map);
254                                 break;
255                         case AUTH_EVENT:
256                                 factory = getAuditAuthRequestEventFactory(map);
257                                 break;
258                         case CONSUMER_EVENT:
259                                 factory = getAuditConsumerEventFactory(map);
260                                 break;
261                         case CATEGORY_EVENT:
262                                 factory = getAuditCategoryEventFactory(map);
263                                 break;
264                         case GET_USERS_LIST_EVENT:
265                                 factory = getAuditGetUsersListEventFactory(map);
266                                 break;
267                         case GET_CATEGORY_HIERARCHY_EVENT:
268                                 factory = getAuditGetCategoryHierarchyEventFactory(map);
269                                 break;
270                         default:
271                                 break;
272                 }
273                 return factory != null ? factory.getDbEvent() : null;
274         }
275
276         private AuditEventFactory getAuditGetCategoryHierarchyEventFactory(Map<AuditingFieldsKey, String> map) {
277                 return new AuditGetCategoryHierarchyEventFactory(
278                         CommonAuditData.newBuilder()
279                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
280                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
281                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
282                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
283                                         .build(),
284                         map.get(AuditingFieldsKey.AUDIT_MODIFIER_UID),
285                         map.get(AuditingFieldsKey.AUDIT_DETAILS),
286                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
287         }
288
289         private AuditEventFactory getAuditGetUsersListEventFactory(Map<AuditingFieldsKey, String> map) {
290                 return new AuditGetUsersListEventFactory(
291                         CommonAuditData.newBuilder()
292                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
293                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
294                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
295                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
296                                         .build(),
297                         map.get(AuditingFieldsKey.AUDIT_MODIFIER_UID),
298                         map.get(AuditingFieldsKey.AUDIT_USER_DETAILS),
299                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
300         }
301
302         private AuditEventFactory getAuditCategoryEventFactory(Map<AuditingFieldsKey, String> map) {
303                 return new AuditCategoryEventFactory(
304                         AuditingActionEnum.fromName(map.get(AuditingFieldsKey.AUDIT_ACTION)),
305                         CommonAuditData.newBuilder()
306                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
307                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
308                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
309                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
310                                         .build(),
311                         map.get(AuditingFieldsKey.AUDIT_MODIFIER_UID),
312                         map.get(AuditingFieldsKey.AUDIT_CATEGORY_NAME),
313                         map.get(AuditingFieldsKey.AUDIT_SUB_CATEGORY_NAME),
314                         map.get(AuditingFieldsKey.AUDIT_GROUPING_NAME),
315                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_TYPE),
316                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
317         }
318
319         private AuditEventFactory getAuditUserAccessEventFactory(Map<AuditingFieldsKey, String> map) {
320                 return new AuditUserAccessEventFactory(
321                         CommonAuditData.newBuilder()
322                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
323                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
324                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
325                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
326                                         .build(),
327                         map.get(AuditingFieldsKey.AUDIT_USER_UID),
328                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
329         }
330
331         private AuditEventFactory getAuditUserAdminEventFactory(Map<AuditingFieldsKey, String> map) {
332                 return new AuditUserAdminEventFactory(
333             AuditingActionEnum.fromName(map.get(AuditingFieldsKey.AUDIT_ACTION)),
334                         CommonAuditData.newBuilder()
335                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
336                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
337                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
338                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
339                                         .build(),
340                         map.get(AuditingFieldsKey.AUDIT_MODIFIER_UID),
341                         map.get(AuditingFieldsKey.AUDIT_USER_BEFORE),
342                         map.get(AuditingFieldsKey.AUDIT_USER_AFTER),
343                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
344         }
345
346         private AuditEventFactory getAuditConsumerEventFactory(Map<AuditingFieldsKey, String> map) {
347                 return new AuditConsumerEventFactory(
348                     AuditingActionEnum.fromName(map.get(AuditingFieldsKey.AUDIT_ACTION)),
349                         CommonAuditData.newBuilder()
350                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
351                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
352                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
353                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
354                                         .build(),
355                         map.get(AuditingFieldsKey.AUDIT_MODIFIER_UID),
356                         map.get(AuditingFieldsKey.AUDIT_ECOMP_USER),
357                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
358         }
359
360         private AuditEventFactory getAuditAuthRequestEventFactory(Map<AuditingFieldsKey, String> map) {
361                 return new AuditAuthRequestEventFactory(
362                         CommonAuditData.newBuilder()
363                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
364                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
365                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
366                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
367                                         .build(),
368                         map.get(AuditingFieldsKey.AUDIT_USER_UID),
369                         map.get(AuditingFieldsKey.AUDIT_AUTH_URL),
370                         map.get(AuditingFieldsKey.AUDIT_AUTH_REALM),
371                         map.get(AuditingFieldsKey.AUDIT_AUTH_STATUS),
372                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
373         }
374
375         private AuditEventFactory getAuditGetUebClusterEventFactory(Map<AuditingFieldsKey, String> map) {
376                 return new AuditGetUebClusterEventFactory(
377                         CommonAuditData.newBuilder()
378                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
379                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
380                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
381                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
382                                         .build(),
383                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_CONSUMER_ID),
384                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
385         }
386
387         private AuditEventFactory getAuditResourceAdminEventMigrationFactory(Map<AuditingFieldsKey, String> map) {
388                 return new AuditResourceAdminEventMigrationFactory(
389                 AuditingActionEnum.fromName(map.get(AuditingFieldsKey.AUDIT_ACTION)),
390                         CommonAuditData.newBuilder()
391                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
392                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
393                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
394                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
395                                         .build(),
396                         new ResourceCommonInfo(map.get(AuditingFieldsKey.AUDIT_RESOURCE_NAME),
397                                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_TYPE)),
398                         ResourceVersionInfo.newBuilder()
399                                         .artifactUuid(map.get(AuditingFieldsKey.AUDIT_PREV_ARTIFACT_UUID))
400                                         .state(map.get(AuditingFieldsKey.AUDIT_RESOURCE_PREV_STATE))
401                                         .version(map.get(AuditingFieldsKey.AUDIT_RESOURCE_PREV_VERSION))
402                                         .distributionStatus(map.get(AuditingFieldsKey.AUDIT_RESOURCE_DPREV_STATUS))
403                                         .build(),
404                         ResourceVersionInfo.newBuilder()
405                                         .artifactUuid(map.get(AuditingFieldsKey.AUDIT_CURR_ARTIFACT_UUID))
406                                         .state(map.get(AuditingFieldsKey.AUDIT_RESOURCE_CURR_STATE))
407                                         .version(map.get(AuditingFieldsKey.AUDIT_RESOURCE_CURR_VERSION))
408                                         .distributionStatus(map.get(AuditingFieldsKey.AUDIT_RESOURCE_DCURR_STATUS))
409                                         .build(),
410                         map.get(AuditingFieldsKey.AUDIT_INVARIANT_UUID),
411                         map.get(AuditingFieldsKey.AUDIT_MODIFIER_UID),
412                         map.get(AuditingFieldsKey.AUDIT_ARTIFACT_DATA),
413                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_COMMENT),
414                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_ID),
415                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_TOSCA_NODE_TYPE),
416                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
417         }
418
419         private AuditEventFactory getAuditDistributionDownloadEventFactory(Map<AuditingFieldsKey, String> map) {
420                 return new AuditDistributionDownloadEventFactory(
421                         CommonAuditData.newBuilder()
422                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
423                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
424                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
425                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
426                                         .build(),
427                         new DistributionData(map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_CONSUMER_ID),
428                                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_URL)),
429                                 map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
430         }
431
432         private AuditEventFactory getAuditDistributionEngineEventMigrationFactory(Map<AuditingFieldsKey, String> map) {
433                 return new AuditDistributionEngineEventMigrationFactory(
434                     AuditingActionEnum.fromName(map.get(AuditingFieldsKey.AUDIT_ACTION)),
435                         CommonAuditData.newBuilder()
436                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
437                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
438                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
439                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
440                                         .build(),
441                         DistributionTopicData.newBuilder()
442                                         .notificationTopic(map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME))
443                                         .statusTopic(map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME))
444                                         .build(),
445                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_CONSUMER_ID),
446                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_API_KEY),
447                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_ENVRIONMENT_NAME),
448                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_ROLE),
449                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
450         }
451
452         private AuditEventFactory getAuditDistributionDeployEventFactory(Map<AuditingFieldsKey, String> map) {
453                 return new AuditDistributionDeployEventFactory(
454                         CommonAuditData.newBuilder()
455                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
456                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
457                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
458                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
459                                         .build(),
460                         new ResourceCommonInfo(map.get(AuditingFieldsKey.AUDIT_RESOURCE_NAME),
461                                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_TYPE)),
462                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_CURR_VERSION),
463                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_ID),
464                         map.get(AuditingFieldsKey.AUDIT_MODIFIER_UID),
465                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
466         }
467
468         private AuditEventFactory getAuditDistributionStatusEventFactory(Map<AuditingFieldsKey, String> map) {
469                 return new AuditDistributionStatusEventFactory(
470                         CommonAuditData.newBuilder()
471                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
472                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
473                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
474                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
475                                         .build(),
476                         new DistributionData(map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_CONSUMER_ID),
477                                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_URL)),
478                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_ID),
479                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_TOPIC_NAME),
480                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_STATUS_TIME),
481                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
482         }
483
484         private AuditEventFactory getAuditDistributionNotificationEventFactory(Map<AuditingFieldsKey, String> map) {
485                 return new AuditDistributionNotificationEventFactory(
486                         CommonAuditData.newBuilder()
487                                         .description(map.get(AuditingFieldsKey.AUDIT_DESC))
488                                         .status(map.get(AuditingFieldsKey.AUDIT_STATUS))
489                                         .requestId(map.get(AuditingFieldsKey.AUDIT_REQUEST_ID))
490                                         .serviceInstanceId(map.get(AuditingFieldsKey.AUDIT_SERVICE_INSTANCE_ID))
491                                         .build(),
492                         new ResourceCommonInfo(map.get(AuditingFieldsKey.AUDIT_RESOURCE_NAME),
493                                         map.get(AuditingFieldsKey.AUDIT_RESOURCE_TYPE)),
494                         ResourceVersionInfo.newBuilder()
495                                         .state(map.get(AuditingFieldsKey.AUDIT_RESOURCE_CURR_STATE))
496                                         .version(map.get(AuditingFieldsKey.AUDIT_RESOURCE_CURR_VERSION))
497                                         .build(),
498                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_ID),
499                         map.get(AuditingFieldsKey.AUDIT_MODIFIER_UID),
500                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_TOPIC_NAME),
501                         new OperationalEnvAuditData(map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_ENVIRONMENT_ID),
502                                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_VNF_WORKLOAD_CONTEXT),
503                                         map.get(AuditingFieldsKey.AUDIT_DISTRIBUTION_TENANT)),
504                         map.get(AuditingFieldsKey.AUDIT_TIMESTAMP));
505         }
506
507
508
509         /**
510          * the method reads the content of the file intended for a given table, and
511          * sores them in cassandra
512          * 
513          * @param files
514          *            a map of files from which the recordes will be retrieved.
515          * @param table
516          *            the name of the table we want to look up in the files and sore
517          *            in Cassandra // * @param store the function to call when
518          *            storing recordes in cassndra
519          * @return true if the operation was successful
520          */
521         private boolean handleImport(Map<Table, File> files, Table table) {
522                 BufferedReader br = null;
523                 try {
524                         br = new BufferedReader(new FileReader(files.get(table)));
525                         String line = null;
526                         while ((line = br.readLine()) != null) {
527                                 CassandraOperationStatus res = CassandraOperationStatus.GENERAL_ERROR;
528                                 if (Table.ARTIFACT.equals(table)) {
529                                         res = artifactCassandraDao.saveArtifact(jsonMapper.readValue(line, ESArtifactData.class));
530                                 }
531                                 else {
532                     AuditingGenericEvent recordForCassandra = createAuditRecordForCassandra(line, table);
533                                         if (recordForCassandra != null) {
534                         res = auditCassandraDao.saveRecord(recordForCassandra);
535                     }
536                                 }
537                                 if (!res.equals(CassandraOperationStatus.OK)) {
538                                         log.error("save recored to cassndra {} failed with status {} aborting.",
539                                                         table.getTableDescription().getTableName(), res);
540                                         return false;
541                                 }
542                         }
543                         return true;
544                 } catch (IOException e) {
545                         log.error("failed to read file", e);
546                         return false;
547                 } finally {
548                         if (br != null) {
549                                 try {
550                                         br.close();
551                                 } catch (IOException e) {
552                                         log.error("failed to close file reader", e);
553                                 }
554                         }
555                 }
556         }
557
558     AuditingGenericEvent createAuditRecordForCassandra(String json, Table table) throws IOException{
559         return createAuditEvent(parseToMap(json), table);
560     }
561
562         private Map<AuditingFieldsKey, String> parseToMap(String json) throws IOException {
563                 return jsonMapper.readValue(json, new TypeReference<Map<AuditingFieldsKey, String>>(){});
564         }
565
566         /**
567          * the method checks if the given table is empty
568          * 
569          * @param table
570          *            the name of the table we want to check
571          * @return true if the table is empty
572          */
573         private Either<Boolean, CassandraOperationStatus> checkIfTableIsEmpty(Table table) {
574                 if (Table.ARTIFACT.equals(table)) {
575                         return artifactCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
576                 } else {
577                         return auditCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
578                 }
579         }
580
581         private boolean filesEmpty(Map<Table, File> files) {
582                 for (Table table : files.keySet()) {
583                         File file = files.get(table);
584                         if (file.length() != 0) {
585                                 log.info("file:{} is not empty skipping export", table.getTableDescription().getTableName());
586                                 return false;
587                         }
588                 }
589                 return true;
590         }
591
592         /**
593          * the method reads the records from es index of audit's into a file as
594          * json's.
595          * 
596          * @param value
597          *            the name of the index we want
598          * @param printerWritersMap
599          *            a map of the writers we use to write to a file.
600          * @return true in case the export was successful.
601          */
602         private boolean exportAudit(String value, Map<Table, PrintWriter> printerWritersMap) {
603                 log.info("stratng to export audit data from es index{} to file.", value);
604                 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
605                 SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(value).setScroll(new TimeValue(60000))
606                                 .setQuery(queryBuilder).setSize(100).execute().actionGet();
607                 while (true) {
608                         for (SearchHit hit : scrollResp.getHits().getHits()) {
609                                 PrintWriter out = printerWritersMap.get(TypeToTableMapping.getTableByType(hit.getType()));
610                                 out.println(hit.getSourceAsString());
611                         }
612                         scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
613                                         .setScroll(new TimeValue(60000)).execute().actionGet();
614                         if (scrollResp.getHits().getHits().length == 0) {
615                                 break;
616
617                         }
618                 }
619
620                 log.info("export audit data from es to file. finished succsesfully");
621                 return true;
622         }
623
624         /**
625          * the method reads the records from es index of resources into a file as
626          * json's.
627          *
628          * @param index
629          *            the name of the index we want to read
630          * @param printerWritersMap
631          *            a map of the writers we use to write to a file.
632          * @return true in case the export was successful.
633          */
634         private boolean exportArtifacts(String index, Map<Table, PrintWriter> printerWritersMap) {
635                 log.info("stratng to export artifact data from es to file.");
636                 PrintWriter out = printerWritersMap.get(Table.ARTIFACT);
637                 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
638                 SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(index).setScroll(new TimeValue(60000))
639                                 .setQuery(queryBuilder).setSize(100).execute().actionGet();
640                 while (true) {
641                         for (SearchHit hit : scrollResp.getHits().getHits()) {
642                                 ;
643                                 out.println(hit.getSourceAsString());
644                         }
645                         scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
646                                         .setScroll(new TimeValue(60000)).execute().actionGet();
647                         if (scrollResp.getHits().getHits().length == 0) {
648                                 break;
649
650                         }
651                 }
652
653                 log.info("export artifact data from es to file. finished succsesfully");
654                 return true;
655         }
656
657         /**
658          * the method retrieves all the indexes from elasticsearch
659          * 
660          * @return a map of indexes and there metadata
661          */
662         private ImmutableOpenMap<String, IndexMetaData> getIndexData() {
663                 return elasticSearchClient.getClient().admin().cluster().prepareState().get().getState().getMetaData()
664                                 .getIndices();
665         }
666
667         /**
668          * the method creates all the files and dir which holds them. in case the
669          * files exist they will not be created again.
670          * 
671          * @param appConfigDir
672          *            the base path under which the output dir will be created and
673          *            the export result files the created filesa are named according
674          *            to the name of the table into which it will be imported.
675          * @param exportToEs
676          *            if true all the export files will be recreated
677          * @returnthe returns a map of tables and the files representing them them
678          */
679         private Map<Table, File> createOutPutFiles(String appConfigDir, boolean exportToEs) {
680                 Map<Table, File> result = new EnumMap<Table, File>(Table.class);
681                 File outputDir = new File(appConfigDir + "/output/");
682                 if (!createOutPutFolder(outputDir)) {
683                         return null;
684                 }
685                 for (Table table : Table.values()) {
686                         File file = new File(outputDir + "/" + table.getTableDescription().getTableName());
687                         if (exportToEs) {
688                                 try {
689                                         if (file.exists()) {
690                                                 Files.delete(file.toPath());
691                                         }
692                                 } catch (IOException e) {
693                                         log.error("failed to delete output file {}", file.getAbsolutePath(), e);
694                                         return null;
695                                 }
696                                 file = new File(outputDir + "/" + table.getTableDescription().getTableName());
697                         }
698                         if (!file.exists()) {
699                                 try {
700                                         file.createNewFile();
701                                 } catch (IOException e) {
702                                         log.error("failed to create output file {}", file.getAbsolutePath(), e);
703                                         return null;
704                                 }
705                         }
706                         result.put(table, file);
707
708                 }
709                 return result;
710         }
711
712         /**
713          * the method create the writers to each file
714          * 
715          * @param files
716          *            a map of the files according to table
717          * @return returns a map of writers according to table.
718          */
719         private Map<Table, PrintWriter> createWriters(Map<Table, File> files) {
720                 Map<Table, PrintWriter> printerWritersMap = new EnumMap<>(Table.class);
721                 try {
722                         for (Table table : files.keySet()) {
723                                 log.info("creating writer for {}", table);
724                                 File file = files.get(table);
725                                 FileWriter fw = new FileWriter(file, true);
726                                 BufferedWriter bw = new BufferedWriter(fw);
727                                 PrintWriter out = new PrintWriter(bw);
728                                 printerWritersMap.put(table, out);
729                                 log.info("creating writer for {} was successful", table);
730                         }
731                 } catch (IOException e) {
732                         log.error("create writer to file failed", e);
733                         return null;
734                 }
735                 return printerWritersMap;
736         }
737
738         /**
739          * the method creates the output dir in case it does not exist
740          * 
741          * @param outputDir
742          *            the path under wich the directory will be created.
743          * @return true in case the create was succsesful or the dir already exists
744          */
745         private boolean createOutPutFolder(File outputDir) {
746                 if (!outputDir.exists()) {
747                         log.info("creating output dir {}", outputDir.getAbsolutePath());
748                         try {
749                                 Files.createDirectories(outputDir.toPath());
750                         } catch (IOException e) {
751                                 log.error("failed to create output dir {}", outputDir.getAbsolutePath(), e);
752                                 return false;
753                         }
754                 }
755                 return true;
756         }
757
758         public enum TypeToTableMapping {
759                 USER_ADMIN_EVENT_TYPE(AuditingTypesConstants.USER_ADMIN_EVENT_TYPE,
760                                 Table.USER_ADMIN_EVENT), USER_ACCESS_EVENT_TYPE(AuditingTypesConstants.USER_ACCESS_EVENT_TYPE,
761                                                 Table.USER_ACCESS_EVENT), RESOURCE_ADMIN_EVENT_TYPE(
762                                                                 AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE,
763                                                                 Table.RESOURCE_ADMIN_EVENT), DISTRIBUTION_DOWNLOAD_EVENT_TYPE(
764                                                                                 AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE,
765                                                                                 Table.DISTRIBUTION_DOWNLOAD_EVENT), DISTRIBUTION_ENGINE_EVENT_TYPE(
766                                                                                                 AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE,
767                                                                                                 Table.DISTRIBUTION_ENGINE_EVENT), DISTRIBUTION_NOTIFICATION_EVENT_TYPE(
768                                                                                                                 AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE,
769                                                                                                                 Table.DISTRIBUTION_NOTIFICATION_EVENT), DISTRIBUTION_STATUS_EVENT_TYPE(
770                                                                                                                                 AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE,
771                                                                                                                                 Table.DISTRIBUTION_STATUS_EVENT), DISTRIBUTION_DEPLOY_EVENT_TYPE(
772                                                                                                                                                 AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE,
773                                                                                                                                                 Table.DISTRIBUTION_DEPLOY_EVENT), DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE(
774                                                                                                                                                                 AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE,
775                                                                                                                                                                 Table.DISTRIBUTION_GET_UEB_CLUSTER_EVENT), AUTH_EVENT_TYPE(
776                                                                                                                                                                                 AuditingTypesConstants.AUTH_EVENT_TYPE,
777                                                                                                                                                                                 Table.AUTH_EVENT), CONSUMER_EVENT_TYPE(
778                                                                                                                                                                                                 AuditingTypesConstants.CONSUMER_EVENT_TYPE,
779                                                                                                                                                                                                 Table.CONSUMER_EVENT), CATEGORY_EVENT_TYPE(
780                                                                                                                                                                                                                 AuditingTypesConstants.CATEGORY_EVENT_TYPE,
781                                                                                                                                                                                                                 Table.CATEGORY_EVENT), GET_USERS_LIST_EVENT_TYPE(
782                                                                                                                                                                                                                                 AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE,
783                                                                                                                                                                                                                                 Table.GET_USERS_LIST_EVENT), GET_CATEGORY_HIERARCHY_EVENT_TYPE(
784                                                                                                                                                                                                                                                 AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE,
785                                                                                                                                                                                                                                                 Table.GET_CATEGORY_HIERARCHY_EVENT);
786
787                 String typeName;
788                 Table table;
789
790                 TypeToTableMapping(String typeName, Table table) {
791                         this.typeName = typeName;
792                         this.table = table;
793                 }
794
795                 public String getTypeName() {
796                         return typeName;
797                 }
798
799                 public Table getTable() {
800                         return table;
801                 }
802
803                 public static Table getTableByType(String type) {
804                         for (TypeToTableMapping mapping : TypeToTableMapping.values()) {
805                                 if (mapping.getTypeName().equalsIgnoreCase(type)) {
806                                         return mapping.getTable();
807                                 }
808                         }
809                         return null;
810                 }
811         }
812
813 }