2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.asdctool.impl;
23 import java.io.BufferedReader;
24 import java.io.BufferedWriter;
26 import java.io.FileReader;
27 import java.io.FileWriter;
28 import java.io.IOException;
29 import java.io.PrintWriter;
30 import java.lang.reflect.Type;
31 import java.net.MalformedURLException;
32 import java.net.URISyntaxException;
34 import java.nio.file.Files;
35 import java.nio.file.Paths;
36 import java.text.ParseException;
37 import java.text.SimpleDateFormat;
38 import java.util.Date;
39 import java.util.EnumMap;
41 import java.util.Map.Entry;
42 import java.util.TimeZone;
44 import org.apache.commons.lang.SystemUtils;
45 import org.elasticsearch.action.search.SearchResponse;
46 import org.elasticsearch.cluster.metadata.IndexMetaData;
47 import org.elasticsearch.common.collect.ImmutableOpenMap;
48 import org.elasticsearch.common.settings.Settings;
49 import org.elasticsearch.common.unit.TimeValue;
50 import org.elasticsearch.index.query.QueryBuilder;
51 import org.elasticsearch.index.query.QueryBuilders;
52 import org.elasticsearch.search.SearchHit;
53 import org.openecomp.sdc.be.dao.cassandra.ArtifactCassandraDao;
54 import org.openecomp.sdc.be.dao.cassandra.AuditCassandraDao;
55 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
56 import org.openecomp.sdc.be.dao.cassandra.schema.Table;
57 import org.openecomp.sdc.be.dao.es.ElasticSearchClient;
58 import org.openecomp.sdc.be.resources.data.ESArtifactData;
59 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
60 import org.openecomp.sdc.be.resources.data.auditing.AuditingGenericEvent;
61 import org.openecomp.sdc.be.resources.data.auditing.AuditingGetUebClusterEvent;
62 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
63 import org.openecomp.sdc.be.resources.data.auditing.AuthEvent;
64 import org.openecomp.sdc.be.resources.data.auditing.CategoryEvent;
65 import org.openecomp.sdc.be.resources.data.auditing.ConsumerEvent;
66 import org.openecomp.sdc.be.resources.data.auditing.DistributionDeployEvent;
67 import org.openecomp.sdc.be.resources.data.auditing.DistributionDownloadEvent;
68 import org.openecomp.sdc.be.resources.data.auditing.DistributionEngineEvent;
69 import org.openecomp.sdc.be.resources.data.auditing.DistributionNotificationEvent;
70 import org.openecomp.sdc.be.resources.data.auditing.DistributionStatusEvent;
71 import org.openecomp.sdc.be.resources.data.auditing.GetCategoryHierarchyEvent;
72 import org.openecomp.sdc.be.resources.data.auditing.GetUsersListEvent;
73 import org.openecomp.sdc.be.resources.data.auditing.ResourceAdminEvent;
74 import org.openecomp.sdc.be.resources.data.auditing.UserAccessEvent;
75 import org.openecomp.sdc.be.resources.data.auditing.UserAdminEvent;
76 import org.openecomp.sdc.common.datastructure.AuditingFieldsKeysEnum;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79 import org.springframework.beans.factory.annotation.Autowired;
81 import com.carrotsearch.hppc.cursors.ObjectCursor;
82 import com.fasterxml.jackson.databind.ObjectMapper;
83 import com.google.gson.Gson;
84 import com.google.gson.reflect.TypeToken;
86 import fj.data.Either;
89 * Created by mlando on 5/16/2016.
91 public class DataMigration {
93 private Gson gson = new Gson();
95 private ObjectMapper jsonMapper = new ObjectMapper();
97 private static Logger log = LoggerFactory.getLogger(DataMigration.class.getName());
99 private ElasticSearchClient elasticSearchClient;
102 protected AuditCassandraDao auditCassandraDao;
104 protected ArtifactCassandraDao artifactCassandraDao;
106 private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS z";
107 private SimpleDateFormat simpleDateFormat;
110 * the method exports and imports the records from ES to cassandra the flow
111 * will check to see if the files are not empty if the files are not empty
112 * the export will be skiped and the flow will use the existing files. the
113 * flow will check if the tables in cassandra are empty, if the tables are
114 * not empty the proces will stop and exit. if the tables are empty the
115 * method will import the records from the files. in case of a fail the flow
116 * will exit and clear all the Cassandra tables.
118 * @param appConfigDir
119 * the location of the dir in wich the output files will be
121 * @param exportFromEs
122 * should the es be exported again and overwrite the old export
123 * @param importToCassandra
124 * should we import the data into cassandra
125 * @return true in case the operation was successful.
127 public boolean migrateDataEsToCassandra(String appConfigDir, boolean exportFromEs, boolean importToCassandra) {
129 if (!initEsClient()) {
132 Map<Table, File> files = createOutPutFiles(appConfigDir, exportFromEs);
136 if (exportFromEs && filesEmpty(files)) {
137 Map<Table, PrintWriter> printerWritersMap = createWriters(files);
138 if (printerWritersMap == null) {
142 ImmutableOpenMap<String, IndexMetaData> indexData = getIndexData();
143 for (ObjectCursor<String> key : indexData.keys()) {
144 if (("resources".equalsIgnoreCase(key.value) && !exportArtifacts(key.value, printerWritersMap))
145 || (key.value.startsWith("auditingevents") && !exportAudit(key.value, printerWritersMap))) {
150 if (elasticSearchClient != null) {
151 elasticSearchClient.close();
153 for (PrintWriter writer : printerWritersMap.values()) {
158 return !importToCassandra || importToCassndra(files);
161 private void initFormater() {
162 simpleDateFormat = new SimpleDateFormat(DATE_FORMAT_PATTERN);
163 simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
166 private boolean initEsClient() {
167 String configHome = System.getProperty("config.home");
171 if (SystemUtils.IS_OS_WINDOWS) {
172 url = new URL("file:///" + configHome + "/elasticsearch.yml");
174 url = new URL("file:" + configHome + "/elasticsearch.yml");
176 log.debug("URL {}", url);
177 settings = Settings.settingsBuilder().loadFromPath(Paths.get(url.toURI())).build();
178 } catch (MalformedURLException | URISyntaxException e1) {
179 log.error("Failed to create URL in order to load elasticsearch yml", e1);
183 this.elasticSearchClient = new ElasticSearchClient();
184 this.elasticSearchClient.setClusterName(settings.get("cluster.name"));
185 this.elasticSearchClient.setLocal(settings.get("elasticSearch.local"));
186 this.elasticSearchClient.setTransportClient(settings.get("elasticSearch.transportclient"));
188 elasticSearchClient.initialize();
189 } catch (URISyntaxException e) {
190 log.error("Failed to initialize elasticSearchClient", e);
197 * the method clears all the cassandra tables.
199 private void truncateCassandraTable() {
200 log.info("import failed. truncating Cassandra tables.");
201 artifactCassandraDao.deleteAllArtifacts();
202 auditCassandraDao.deleteAllAudit();
206 * the method imports the records from the files into cassandra.
209 * a map of files holding
210 * @return true if the operation was successful
212 private boolean importToCassndra(Map<Table, File> files) {
213 log.info("starting to import date into Cassandra.");
214 if (!validtaTablsNotEmpty(files)) {
217 for (Table table : files.keySet()) {
218 log.info("importing recordes into {}", table.getTableDescription().getTableName());
219 if (!handleImport(files, table)) {
220 truncateCassandraTable();
224 log.info("finished to import date into Cassandra.");
228 private boolean validtaTablsNotEmpty(Map<Table, File> files) {
229 for (Table table : files.keySet()) {
230 Either<Boolean, CassandraOperationStatus> isTableEmptyRes = checkIfTableIsEmpty(table);
231 if (isTableEmptyRes.isRight() || !isTableEmptyRes.left().value()) {
232 log.error("Cassandra table {} is not empty operation aborted.",
233 table.getTableDescription().getTableName());
241 * the method retrieves the fields from the given map and praprs them for
242 * storage as an audit according to the table name
245 * the map from which we will retrive the fields enum values
247 * the table we are going to store the record in.
248 * @return a enummap representing the audit record that is going to be
251 private EnumMap<AuditingFieldsKeysEnum, Object> createAuditMap(Map<String, String> map, Table table) {
252 EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = new EnumMap<>(AuditingFieldsKeysEnum.class);
254 case USER_ADMIN_EVENT:
255 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
256 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
257 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
258 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
259 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
260 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
261 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_AFTER, map.get("USER_AFTER"));
262 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_BEFORE, map.get("USER_BEFORE"));
263 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
265 case USER_ACCESS_EVENT:
266 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
267 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
268 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
269 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
270 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
271 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
272 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_USER_UID, map.get("USER"));
274 case RESOURCE_ADMIN_EVENT:
275 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
276 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
277 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
278 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_INVARIANT_UUID, map.get("INVARIANT_UUID"));
279 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
280 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
281 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
282 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
283 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
284 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
285 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
286 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_VERSION, map.get("PREV_VERSION"));
287 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_PREV_STATE, map.get("PREV_STATE"));
288 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
289 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
290 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DPREV_STATUS, map.get("DPREV_STATUS"));
291 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_DCURR_STATUS, map.get("DCURR_STATUS"));
292 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TOSCA_NODE_TYPE, map.get("TOSCA_NODE_TYPE"));
293 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_COMMENT, map.get("COMMENT"));
294 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ARTIFACT_DATA, map.get("ARTIFACT_DATA"));
295 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_PREV_ARTIFACT_UUID, map.get("PREV_ARTIFACT_UUID"));
296 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CURR_ARTIFACT_UUID, map.get("CURR_ARTIFACT_UUID"));
298 case DISTRIBUTION_DOWNLOAD_EVENT:
299 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
300 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
301 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
302 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
303 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
304 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
305 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
306 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
308 case DISTRIBUTION_ENGINE_EVENT:
309 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
310 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
311 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
312 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
313 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
314 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
315 if (map.get("TOPIC_NAME") != null) {
316 if (map.get("TOPIC_NAME").contains("-STATUS-")) {
317 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
318 map.get("TOPIC_NAME"));
320 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
321 map.get("TOPIC_NAME"));
324 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TOPIC_NAME,
325 map.get("DSTATUS_TOPIC"));
326 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_NOTIFICATION_TOPIC_NAME,
327 map.get("DNOTIF_TOPIC"));
329 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
330 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ROLE, map.get("ROLE"));
331 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_API_KEY, map.get("API_KEY"));
332 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ENVRIONMENT_NAME, map.get("D_ENV"));
333 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
335 case DISTRIBUTION_NOTIFICATION_EVENT:
336 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
337 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
338 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
339 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
340 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
341 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
342 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_STATE, map.get("CURR_STATE"));
343 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
344 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
345 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
346 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
347 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
348 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
350 case DISTRIBUTION_STATUS_EVENT:
351 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
352 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
353 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
354 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
355 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
356 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
357 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_RESOURCE_URL, map.get("RESOURCE_URL"));
358 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
359 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_TOPIC_NAME, map.get("TOPIC_NAME"));
360 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
361 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_STATUS_TIME, map.get("STATUS_TIME"));
363 case DISTRIBUTION_DEPLOY_EVENT:
364 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
365 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
366 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
367 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
368 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
369 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
370 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_ID, map.get("DID"));
371 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_NAME, map.get("RESOURCE_NAME"));
372 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
373 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
374 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_CURR_VERSION, map.get("CURR_VERSION"));
376 case DISTRIBUTION_GET_UEB_CLUSTER_EVENT:
377 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
378 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
379 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
380 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
381 if (map.get("STATUS_DESC") != null) {
382 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("STATUS_DESC"));
384 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
386 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
387 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DISTRIBUTION_CONSUMER_ID, map.get("CONSUMER_ID"));
390 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
391 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
392 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
393 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
394 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
395 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_USER, map.get("USER"));
396 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_URL, map.get("URL"));
397 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_STATUS, map.get("AUTH_STATUS"));
398 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_AUTH_REALM, map.get("REALM"));
401 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
402 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
403 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
404 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
405 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
406 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
407 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ECOMP_USER, map.get("ECOMP_USER"));
410 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
411 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
412 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
413 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
414 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
415 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
416 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SERVICE_INSTANCE_ID, map.get("SERVICE_INSTANCE_ID"));
417 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_CATEGORY_NAME, map.get("CATEGORY_NAME"));
418 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_SUB_CATEGORY_NAME, map.get("SUB_CATEGORY_NAME"));
419 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_GROUPING_NAME, map.get("GROUPING_NAME"));
420 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_RESOURCE_TYPE, map.get("RESOURCE_TYPE"));
422 case GET_USERS_LIST_EVENT:
423 case GET_CATEGORY_HIERARCHY_EVENT:
424 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP, map.get("TIMESTAMP"));
425 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_ACTION, map.get("ACTION"));
426 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DESC, map.get("DESC"));
427 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_STATUS, map.get("STATUS"));
428 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_MODIFIER_UID, map.get("MODIFIER"));
429 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_REQUEST_ID, map.get("REQUEST_ID"));
430 auditingFields.put(AuditingFieldsKeysEnum.AUDIT_DETAILS, map.get("DETAILS"));
433 auditingFields = null;
436 return auditingFields;
440 * the method reads the content of the file intended for a given table, and
441 * sores them in cassandra
444 * a map of files from which the recordes will be retrieved.
446 * the name of the table we want to look up in the files and sore
447 * in Cassandra // * @param store the function to call when
448 * storing recordes in cassndra
449 * @return true if the operation was successful
451 private boolean handleImport(Map<Table, File> files, Table table) {
452 try (BufferedReader br = new BufferedReader(new FileReader(files.get(table)))) {
454 while ((line = br.readLine()) != null) {
455 CassandraOperationStatus res;
456 if (Table.ARTIFACT.equals(table)) {
457 res = artifactCassandraDao.saveArtifact(jsonMapper.readValue(line, ESArtifactData.class));
459 Type type = new TypeToken<Map<String, String>>() {}.getType();
460 Map<String, String> map = gson.fromJson(line, type);
461 EnumMap<AuditingFieldsKeysEnum, Object> auditingFields = createAuditMap(map, table);
462 AuditingGenericEvent recordForCassandra;
464 recordForCassandra = createAuditRecord(auditingFields);
465 } catch (ParseException e) {
466 log.error("filed to parse time stemp in recored {}", auditingFields);
469 res = auditCassandraDao.saveRecord(recordForCassandra);
471 if (!res.equals(CassandraOperationStatus.OK)) {
472 log.error("save recored to cassndra {} failed with status {} aborting.",
473 table.getTableDescription().getTableName(), res);
478 } catch (IOException e) {
479 log.error("failed to read file", e);
485 * the method checks if the given table is empty.
488 * the name of the table we want to check
489 * @return true if the table is empty
491 private Either<Boolean, CassandraOperationStatus> checkIfTableIsEmpty(Table table) {
492 if (Table.ARTIFACT.equals(table)) {
493 return artifactCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
495 return auditCassandraDao.isTableEmpty(table.getTableDescription().getTableName());
499 private boolean filesEmpty(Map<Table, File> files) {
500 for (Entry<Table, File> entry : files.entrySet()) {
501 File file = entry.getValue();
502 if (file.length() != 0) {
503 log.info("file:{} is not empty skipping export", entry.getKey().getTableDescription().getTableName());
511 * the method reads the records from es index of audit's into a file as
515 * the name of the index we want
516 * @param printerWritersMap
517 * a map of the writers we use to write to a file.
518 * @return true in case the export was successful.
520 private boolean exportAudit(String value, Map<Table, PrintWriter> printerWritersMap) {
521 log.info("stratng to export audit data from es index{} to file.", value);
522 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
523 SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(value).setScroll(new TimeValue(60000))
524 .setQuery(queryBuilder).setSize(100).execute().actionGet();
526 for (SearchHit hit : scrollResp.getHits().getHits()) {
527 PrintWriter out = printerWritersMap.get(TypeToTableMapping.getTableByType(hit.getType()));
528 out.println(hit.getSourceAsString());
530 scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
531 .setScroll(new TimeValue(60000)).execute().actionGet();
532 if (scrollResp.getHits().getHits().length == 0) {
537 log.info("export audit data from es to file. finished succsesfully");
542 * the method reads the records from es index of resources into a file as
546 * the name of the index we want to read
547 * @param printerWritersMap
548 * a map of the writers we use to write to a file.
549 * @return true in case the export was successful.
551 private boolean exportArtifacts(String index, Map<Table, PrintWriter> printerWritersMap) {
552 log.info("stratng to export artifact data from es to file.");
553 PrintWriter out = printerWritersMap.get(Table.ARTIFACT);
554 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
555 SearchResponse scrollResp = elasticSearchClient.getClient().prepareSearch(index).setScroll(new TimeValue(60000))
556 .setQuery(queryBuilder).setSize(100).execute().actionGet();
558 for (SearchHit hit : scrollResp.getHits().getHits()) {
559 out.println(hit.getSourceAsString());
561 scrollResp = elasticSearchClient.getClient().prepareSearchScroll(scrollResp.getScrollId())
562 .setScroll(new TimeValue(60000)).execute().actionGet();
563 if (scrollResp.getHits().getHits().length == 0) {
567 log.info("export artifact data from es to file. finished succsesfully");
572 * the method retrieves all the indexes from elasticsearch.
574 * @return a map of indexes and there metadata
576 private ImmutableOpenMap<String, IndexMetaData> getIndexData() {
577 return elasticSearchClient.getClient().admin().cluster().prepareState().get().getState().getMetaData()
582 * the method creates all the files and dir which holds them. in case the
583 * files exist they will not be created again.
585 * @param appConfigDir
586 * the base path under which the output dir will be created and
587 * the export result files the created filesa are named according
588 * to the name of the table into which it will be imported.
590 * if true all the export files will be recreated
591 * @return returns a map of tables and the files representing them them
593 private Map<Table, File> createOutPutFiles(String appConfigDir, boolean exportToEs) {
594 Map<Table, File> result = new EnumMap<>(Table.class);
595 File outputDir = new File(appConfigDir + "/output/");
596 if (!createOutPutFolder(outputDir)) {
599 for (Table table : Table.values()) {
600 File file = new File(outputDir + "/" + table.getTableDescription().getTableName());
604 Files.delete(file.toPath());
606 } catch (IOException e) {
607 log.error("failed to delete output file {}", file.getAbsolutePath(), e);
610 file = new File(outputDir + "/" + table.getTableDescription().getTableName());
612 if (!file.exists()) {
614 file.createNewFile();
615 } catch (IOException e) {
616 log.error("failed to create output file {}", file.getAbsolutePath(), e);
620 result.put(table, file);
626 * the method create the writers to each file
629 * a map of the files according to table
630 * @return returns a map of writers according to table.
632 private Map<Table, PrintWriter> createWriters(Map<Table, File> files) {
633 Map<Table, PrintWriter> printerWritersMap = new EnumMap<>(Table.class);
635 for (Entry<Table, File> entry : files.entrySet()) {
636 log.info("creating writer for {}", entry.getKey());
637 File file = entry.getValue();
638 FileWriter fw = new FileWriter(file, true);
639 BufferedWriter bw = new BufferedWriter(fw);
640 PrintWriter out = new PrintWriter(bw);
641 printerWritersMap.put(entry.getKey(), out);
642 log.info("creating writer for {} was successful", entry.getKey());
644 } catch (IOException e) {
645 log.error("create writer to file failed", e);
648 return printerWritersMap;
652 * the method creates the output dir in case it does not exist
655 * the path under wich the directory will be created.
656 * @return true in case the create was succsesful or the dir already exists
658 private boolean createOutPutFolder(File outputDir) {
659 if (!outputDir.exists()) {
660 log.info("creating output dir {}", outputDir.getAbsolutePath());
662 Files.createDirectories(outputDir.toPath());
663 } catch (IOException e) {
664 log.error("failed to create output dir {}", outputDir.getAbsolutePath(), e);
671 public enum TypeToTableMapping {
672 USER_ADMIN_EVENT_TYPE(AuditingTypesConstants.USER_ADMIN_EVENT_TYPE,
673 Table.USER_ADMIN_EVENT), USER_ACCESS_EVENT_TYPE(AuditingTypesConstants.USER_ACCESS_EVENT_TYPE,
674 Table.USER_ACCESS_EVENT), RESOURCE_ADMIN_EVENT_TYPE(
675 AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE,
676 Table.RESOURCE_ADMIN_EVENT), DISTRIBUTION_DOWNLOAD_EVENT_TYPE(
677 AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE,
678 Table.DISTRIBUTION_DOWNLOAD_EVENT), DISTRIBUTION_ENGINE_EVENT_TYPE(
679 AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE,
680 Table.DISTRIBUTION_ENGINE_EVENT), DISTRIBUTION_NOTIFICATION_EVENT_TYPE(
681 AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE,
682 Table.DISTRIBUTION_NOTIFICATION_EVENT), DISTRIBUTION_STATUS_EVENT_TYPE(
683 AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE,
684 Table.DISTRIBUTION_STATUS_EVENT), DISTRIBUTION_DEPLOY_EVENT_TYPE(
685 AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE,
686 Table.DISTRIBUTION_DEPLOY_EVENT), DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE(
687 AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE,
688 Table.DISTRIBUTION_GET_UEB_CLUSTER_EVENT), AUTH_EVENT_TYPE(
689 AuditingTypesConstants.AUTH_EVENT_TYPE,
690 Table.AUTH_EVENT), CONSUMER_EVENT_TYPE(
691 AuditingTypesConstants.CONSUMER_EVENT_TYPE,
692 Table.CONSUMER_EVENT), CATEGORY_EVENT_TYPE(
693 AuditingTypesConstants.CATEGORY_EVENT_TYPE,
694 Table.CATEGORY_EVENT), GET_USERS_LIST_EVENT_TYPE(
695 AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE,
696 Table.GET_USERS_LIST_EVENT), GET_CATEGORY_HIERARCHY_EVENT_TYPE(
697 AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE,
698 Table.GET_CATEGORY_HIERARCHY_EVENT);
703 TypeToTableMapping(String typeName, Table table) {
704 this.typeName = typeName;
708 public String getTypeName() {
712 public Table getTable() {
716 public static Table getTableByType(String type) {
717 for (TypeToTableMapping mapping : TypeToTableMapping.values()) {
718 if (mapping.getTypeName().equalsIgnoreCase(type)) {
719 return mapping.getTable();
726 public AuditingGenericEvent createAuditRecord(EnumMap<AuditingFieldsKeysEnum, Object> auditingFields)
727 throws ParseException {
728 AuditingActionEnum actionEnum = AuditingActionEnum
729 .getActionByName((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_ACTION));
730 String tableName = actionEnum.getAuditingEsType();
731 AuditingGenericEvent event;
734 case AuditingTypesConstants.USER_ADMIN_EVENT_TYPE:
735 UserAdminEvent userAdminEvent = new UserAdminEvent(auditingFields);
736 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
737 userAdminEvent.setTimestamp1(date);
738 event = userAdminEvent;
740 case AuditingTypesConstants.AUTH_EVENT_TYPE:
741 AuthEvent authEvent = new AuthEvent(auditingFields);
742 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
743 authEvent.setTimestamp1(date);
746 case AuditingTypesConstants.CATEGORY_EVENT_TYPE:
747 CategoryEvent categoryEvent = new CategoryEvent(auditingFields);
748 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
749 categoryEvent.setTimestamp1(date);
750 event = categoryEvent;
752 case AuditingTypesConstants.RESOURCE_ADMIN_EVENT_TYPE:
753 ResourceAdminEvent resourceAdminEvent = new ResourceAdminEvent(auditingFields);
754 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
755 resourceAdminEvent.setTimestamp1(date);
756 event = resourceAdminEvent;
758 case AuditingTypesConstants.USER_ACCESS_EVENT_TYPE:
759 UserAccessEvent userAccessEvent = new UserAccessEvent(auditingFields);
760 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
761 userAccessEvent.setTimestamp1(date);
762 event = userAccessEvent;
764 case AuditingTypesConstants.DISTRIBUTION_STATUS_EVENT_TYPE:
765 DistributionStatusEvent distributionStatusEvent = new DistributionStatusEvent(auditingFields);
766 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
767 distributionStatusEvent.setTimestamp1(date);
768 event = distributionStatusEvent;
770 case AuditingTypesConstants.DISTRIBUTION_DOWNLOAD_EVENT_TYPE:
771 DistributionDownloadEvent distributionDownloadEvent = new DistributionDownloadEvent(auditingFields);
772 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
773 distributionDownloadEvent.setTimestamp1(date);
774 event = distributionDownloadEvent;
776 case AuditingTypesConstants.DISTRIBUTION_ENGINE_EVENT_TYPE:
777 DistributionEngineEvent distributionEngineEvent = new DistributionEngineEvent(auditingFields);
778 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
779 distributionEngineEvent.setTimestamp1(date);
780 event = distributionEngineEvent;
782 case AuditingTypesConstants.DISTRIBUTION_NOTIFICATION_EVENT_TYPE:
783 DistributionNotificationEvent distributionNotificationEvent = new DistributionNotificationEvent(
785 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
786 distributionNotificationEvent.setTimestamp1(date);
787 event = distributionNotificationEvent;
789 case AuditingTypesConstants.DISTRIBUTION_DEPLOY_EVENT_TYPE:
790 DistributionDeployEvent distributionDeployEvent = new DistributionDeployEvent(auditingFields);
791 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
792 distributionDeployEvent.setTimestamp1(date);
793 event = distributionDeployEvent;
795 case AuditingTypesConstants.DISTRIBUTION_GET_UEB_CLUSTER_EVENT_TYPE:
796 AuditingGetUebClusterEvent auditingGetUebClusterEvent = new AuditingGetUebClusterEvent(auditingFields);
797 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
798 auditingGetUebClusterEvent.setTimestamp1(date);
799 event = auditingGetUebClusterEvent;
801 case AuditingTypesConstants.CONSUMER_EVENT_TYPE:
802 ConsumerEvent consumerEvent = new ConsumerEvent(auditingFields);
803 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
804 consumerEvent.setTimestamp1(date);
805 event = consumerEvent;
807 case AuditingTypesConstants.GET_USERS_LIST_EVENT_TYPE:
808 GetUsersListEvent getUsersListEvent = new GetUsersListEvent(auditingFields);
809 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
810 getUsersListEvent.setTimestamp1(date);
811 event = getUsersListEvent;
813 case AuditingTypesConstants.GET_CATEGORY_HIERARCHY_EVENT_TYPE:
814 GetCategoryHierarchyEvent getCategoryHierarchyEvent = new GetCategoryHierarchyEvent(auditingFields);
815 date = simpleDateFormat.parse((String) auditingFields.get(AuditingFieldsKeysEnum.AUDIT_TIMESTAMP));
816 getCategoryHierarchyEvent.setTimestamp1(date);
817 event = getCategoryHierarchyEvent;