2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.synchronizer;
25 import com.fasterxml.jackson.core.JsonProcessingException;
26 import com.fasterxml.jackson.databind.JsonNode;
27 import com.fasterxml.jackson.databind.ObjectMapper;
28 import com.fasterxml.jackson.databind.node.ArrayNode;
29 import com.fasterxml.jackson.databind.node.ObjectNode;
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Iterator;
35 import java.util.List;
37 import org.onap.aai.sparky.dal.rest.OperationResult;
38 import org.onap.aai.sparky.dal.rest.RestDataProvider;
39 import org.onap.aai.sparky.logging.AaiUiMsgs;
40 import org.onap.aai.sparky.synchronizer.entity.ObjectIdCollection;
41 import org.onap.aai.sparky.synchronizer.entity.SearchableEntity;
42 import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
43 import org.onap.aai.cl.api.Logger;
44 import org.onap.aai.cl.eelf.LoggerFactory;
47 * The Class ElasticSearchIndexCleaner.
49 public class ElasticSearchIndexCleaner implements IndexCleaner {
51 private static final Logger LOG =
52 LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
54 private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
55 private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
57 private ObjectIdCollection before;
58 private ObjectIdCollection after;
63 private String indexName;
64 private String indexType;
65 private int scrollContextTimeToLiveInMinutes;
66 private int numItemsToGetBulkRequest;
68 private RestDataProvider restDataProvider;
69 private ObjectMapper mapper;
72 * Instantiates a new elastic search index cleaner.
74 * @param restDataProvider the rest data provider
75 * @param indexName the index name
76 * @param indexType the index type
77 * @param host the host
78 * @param port the port
79 * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
80 * @param numItemsToGetBulkRequest the num items to get bulk request
82 protected ElasticSearchIndexCleaner(RestDataProvider restDataProvider, String indexName,
83 String indexType, String host, String port, int scrollContextTimeToLiveInMinutes,
84 int numItemsToGetBulkRequest) {
85 this.restDataProvider = restDataProvider;
88 this.indexName = indexName;
89 this.indexType = indexType;
90 this.mapper = new ObjectMapper();
93 this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
94 this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
100 * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
103 public OperationState populatePreOperationCollection() {
106 before = retrieveAllDocumentIdentifiers();
107 return OperationState.OK;
108 } catch (Exception exc) {
109 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
110 return OperationState.ERROR;
118 * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
121 public OperationState populatePostOperationCollection() {
123 after = retrieveAllDocumentIdentifiers();
124 return OperationState.OK;
125 } catch (Exception exc) {
126 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
127 return OperationState.ERROR;
134 * @see org.onap.aai.sparky.synchronizer.IndexCleaner#performCleanup()
137 public OperationState performCleanup() {
138 // TODO Auto-generated method stub
139 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, indexName);
141 int sizeBefore = before.getSize();
142 int sizeAfter = after.getSize();
144 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
145 String.valueOf(sizeAfter));
148 * If the processedImportIds size <= 0, then something has failed in the sync operation and we
149 * shouldn't do the selective delete right now.
154 Collection<String> presyncIds = before.getImportedObjectIdsAsValues();
155 presyncIds.removeAll(after.getImportedObjectIdsAsValues());
158 LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, indexName, indexType,
159 String.valueOf(presyncIds.size()));
161 ObjectIdCollection bulkIds = new ObjectIdCollection();
163 Iterator<String> it = presyncIds.iterator();
164 int numItemsInBulkRequest = 0;
165 int numItemsRemainingToBeDeleted = presyncIds.size();
167 while (it.hasNext()) {
169 bulkIds.addObjectId(it.next());
170 numItemsInBulkRequest++;
172 if (numItemsInBulkRequest >= this.numItemsToGetBulkRequest) {
173 LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
174 OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues());
175 // pegCountersForElasticBulkDelete(bulkDeleteResult);
176 numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
177 numItemsInBulkRequest = 0;
182 if (numItemsRemainingToBeDeleted > 0) {
183 LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
184 OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues());
185 // pegCountersForElasticBulkDelete(bulkDeleteResult);
189 } catch (Exception exc) {
190 LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, indexName, exc.getLocalizedMessage());
195 return OperationState.OK;
199 public String getIndexName() {
203 public void setIndexName(String indexName) {
204 this.indexName = indexName;
208 * Builds the initial scroll request payload.
210 * @param numItemsToGetPerRequest the num items to get per request
211 * @param fieldList the field list
213 * @throws JsonProcessingException the json processing exception
215 protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
216 List<String> fieldList) throws JsonProcessingException {
218 ObjectNode rootNode = mapper.createObjectNode();
219 rootNode.put("size", numItemsToGetPerRequest);
221 ArrayNode fields = mapper.createArrayNode();
223 for (String f : fieldList) {
227 rootNode.set("fields", fields);
229 ObjectNode queryNode = mapper.createObjectNode();
230 queryNode.set("match_all", mapper.createObjectNode());
232 rootNode.set("query", queryNode);
234 return mapper.writeValueAsString(rootNode);
239 * Builds the subsequent scroll context request payload.
241 * @param scrollId the scroll id
242 * @param contextTimeToLiveInMinutes the context time to live in minutes
244 * @throws JsonProcessingException the json processing exception
246 protected String buildSubsequentScrollContextRequestPayload(String scrollId,
247 int contextTimeToLiveInMinutes) throws JsonProcessingException {
249 ObjectNode rootNode = mapper.createObjectNode();
251 rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
252 rootNode.put("scroll_id", scrollId);
254 return mapper.writeValueAsString(rootNode);
259 * Parses the elastic search result.
261 * @param jsonResult the json result
262 * @return the json node
263 * @throws JsonProcessingException the json processing exception
264 * @throws IOException Signals that an I/O exception has occurred.
266 protected JsonNode parseElasticSearchResult(String jsonResult)
267 throws JsonProcessingException, IOException {
268 ObjectMapper mapper = new ObjectMapper();
269 return mapper.readTree(jsonResult);
276 * @param docs the docs
277 * @return the array list
279 protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
280 List<SearchableEntity> docs) {
281 ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
283 if (ids != null && docs != null) {
284 for (SearchableEntity d : docs) {
285 if (ids.contains(d.getId())) {
295 * Builds the delete data object.
297 * @param index the index
298 * @param type the type
300 * @return the object node
302 protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
304 ObjectNode indexDocProperties = mapper.createObjectNode();
306 indexDocProperties.put("_index", index);
307 indexDocProperties.put("_type", type);
308 indexDocProperties.put("_id", id);
310 ObjectNode rootNode = mapper.createObjectNode();
311 rootNode.set("delete", indexDocProperties);
317 * This method might appear to be a little strange, and is simply an optimization to take an
318 * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
320 * @param startNode the start node
321 * @param fieldPath the field path
322 * @return the node path
324 protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
326 JsonNode jsonNode = null;
328 for (String field : fieldPath) {
329 if (jsonNode == null) {
330 jsonNode = startNode.get(field);
332 jsonNode = jsonNode.get(field);
336 * This is our safety net in case any intermediate path returns a null
339 if (jsonNode == null) {
351 * @param resourceUrl the resource url
352 * @return the full url
354 private String getFullUrl(String resourceUrl) {
355 return String.format("http://%s:%s%s", host, port, resourceUrl);
359 * Retrieve all document identifiers.
361 * @return the object id collection
362 * @throws IOException Signals that an I/O exception has occurred.
364 public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
366 ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
368 long opStartTimeInMs = System.currentTimeMillis();
370 List<String> fields = new ArrayList<String>();
372 // fields.add("entityType");
374 String scrollRequestPayload =
375 buildInitialScrollRequestPayload(this.numItemsToGetBulkRequest, fields);
377 final String fullUrlStr = getFullUrl("/" + indexName + "/" + indexType + "/_search?scroll="
378 + this.scrollContextTimeToLiveInMinutes + "m");
380 OperationResult result =
381 restDataProvider.doPost(fullUrlStr, scrollRequestPayload, "application/json");
383 if (result.wasSuccessful()) {
385 JsonNode rootNode = parseElasticSearchResult(result.getResult());
388 * Check the result for success / failure, and enumerate all the index ids that resulted in
389 * success, and ignore the ones that failed or log them so we have a record of the failure.
391 int totalRecordsAvailable = 0;
392 String scrollId = null;
393 int numRecordsFetched = 0;
395 if (rootNode != null) {
397 scrollId = getFieldValue(rootNode, "_scroll_id");
398 final String tookStr = getFieldValue(rootNode, "took");
399 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
400 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
403 LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
404 String.valueOf(tookInMs));
406 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
407 String.valueOf(tookInMs));
410 JsonNode hitsNode = rootNode.get("hits");
411 totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
413 LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
414 String.valueOf(totalRecordsAvailable));
417 * Collect all object ids
420 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
422 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
426 JsonNode jsonNode = null;
428 while (nodeIterator.hasNext()) {
430 jsonNode = nodeIterator.next();
432 key = getFieldValue(jsonNode, "_id");
435 currentDocumentIds.addObjectId(key);
441 * JsonNode fieldsNode = jNode.get("fields");
443 * if (fieldsNode != null) {
445 * JsonNode entityTypeNode = fieldsNode.get("entityType");
447 * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
449 * if (aNode.size() > 0) { value = aNode.get(0).asText(); objAndtTypesMap.put(key, value);
450 * numRecordsFetched++; } } } }
455 int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
457 int numRequiredAdditionalFetches =
458 (totalRecordsRemainingToFetch / this.numItemsToGetBulkRequest);
461 * Do an additional fetch for the remaining items (if needed)
464 if (totalRecordsRemainingToFetch % numItemsToGetBulkRequest != 0) {
465 numRequiredAdditionalFetches += 1;
468 if (LOG.isDebugEnabled()) {
469 LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
470 String.valueOf(numRequiredAdditionalFetches));
474 for (int x = 0; x < numRequiredAdditionalFetches; x++) {
476 if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
477 // abort the whole thing because now we can't reliably cleanup the orphans.
478 throw new IOException(
479 "Failed to collect pre-sync doc collection from index. Aborting operation");
481 if (LOG.isDebugEnabled()) {
482 LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
483 String.valueOf(currentDocumentIds.getSize()),
484 String.valueOf(totalRecordsAvailable));
492 // scroll context get failed, nothing else to do
493 LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
496 LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
497 String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
499 return currentDocumentIds;
504 * Collect items from scroll context.
506 * @param scrollId the scroll id
507 * @param objectIds the object ids
508 * @return the operation state
509 * @throws IOException Signals that an I/O exception has occurred.
511 private OperationState collectItemsFromScrollContext(String scrollId,
512 ObjectIdCollection objectIds) throws IOException {
514 // ObjectIdCollection documentIdCollection = new ObjectIdCollection();
516 String requestPayload =
517 buildSubsequentScrollContextRequestPayload(scrollId, scrollContextTimeToLiveInMinutes);
519 final String fullUrlStr = getFullUrl("/_search/scroll");
521 OperationResult opResult =
522 restDataProvider.doPost(fullUrlStr, requestPayload, "application/json");
524 if (opResult.getResultCode() >= 300) {
525 LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
526 return OperationState.ERROR;
529 JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
532 * Check the result for success / failure, and enumerate all the index ids that resulted in
533 * success, and ignore the ones that failed or log them so we have a record of the failure.
536 if (rootNode != null) {
537 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
538 final String tookStr = getFieldValue(rootNode, "took");
539 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
541 JsonNode hitsNode = rootNode.get("hits");
544 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
546 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
550 * Collect all object ids
553 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
556 JsonNode jsonNode = null;
558 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
560 while (nodeIterator.hasNext()) {
562 jsonNode = nodeIterator.next();
564 key = getFieldValue(jsonNode, "_id");
567 objectIds.addObjectId(key);
570 * JsonNode fieldsNode = jNode.get("fields");
572 * if (fieldsNode != null) {
574 * JsonNode entityTypeNode = fieldsNode.get("entityType");
576 * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
578 * if (aNode.size() > 0) { value = aNode.get(0).asText(); objectIdsAndTypes.put(key,
586 // scroll context get failed, nothing else to do
587 LOG.error(AaiUiMsgs.ERROR_GENERIC, opResult.toString());
590 return OperationState.OK;
594 * Gets the field value.
596 * @param node the node
597 * @param fieldName the field name
598 * @return the field value
600 protected String getFieldValue(JsonNode node, String fieldName) {
602 JsonNode field = node.get(fieldName);
605 return field.asText();
615 * @param docIds the doc ids
616 * @return the operation result
617 * @throws IOException Signals that an I/O exception has occurred.
619 public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
621 if (docIds == null || docIds.size() == 0) {
622 LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
623 return new OperationResult(500,
624 "Skipping bulkDelete(); operation because docs to delete list is empty");
627 LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
629 StringBuilder sb = new StringBuilder(128);
631 for (String id : docIds) {
633 String.format(BULK_OP_LINE_TEMPLATE, buildDeleteDataObject(indexName, indexType, id)));
638 final String fullUrlStr = getFullUrl("/_bulk");
640 return restDataProvider.doPost(fullUrlStr, sb.toString(), "application/x-www-form-urlencoded");
647 public ObjectIdCollection getBefore() {
652 * @param before the before to set
654 public void setBefore(ObjectIdCollection before) {
655 this.before = before;
661 public ObjectIdCollection getAfter() {
666 * @param after the after to set
668 public void setAfter(ObjectIdCollection after) {
675 public String getHost() {
680 * @param host the host to set
682 public void setHost(String host) {
689 public String getPort() {
694 * @param port the port to set
696 public void setPort(String port) {
701 * @return the indexType
703 public String getIndexType() {
708 * @param indexType the indexType to set
710 public void setIndexType(String indexType) {
711 this.indexType = indexType;
715 * @return the scrollContextTimeToLiveInMinutes
717 public int getScrollContextTimeToLiveInMinutes() {
718 return scrollContextTimeToLiveInMinutes;
722 * @param scrollContextTimeToLiveInMinutes the scrollContextTimeToLiveInMinutes to set
724 public void setScrollContextTimeToLiveInMinutes(int scrollContextTimeToLiveInMinutes) {
725 this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
729 * @return the numItemsToGetBulkRequest
731 public int getNumItemsToGetBulkRequest() {
732 return numItemsToGetBulkRequest;
736 * @param numItemsToGetBulkRequest the numItemsToGetBulkRequest to set
738 public void setNumItemsToGetBulkRequest(int numItemsToGetBulkRequest) {
739 this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
743 * @return the restDataProvider
745 public RestDataProvider getRestDataProvider() {
746 return restDataProvider;
750 * @param restDataProvider the restDataProvider to set
752 public void setRestDataProvider(RestDataProvider restDataProvider) {
753 this.restDataProvider = restDataProvider;
759 public ObjectMapper getMapper() {
764 * @param mapper the mapper to set
766 public void setMapper(ObjectMapper mapper) {
767 this.mapper = mapper;
773 public static Logger getLog() {
778 * @return the bulkOpLineTemplate
780 public static String getBulkOpLineTemplate() {
781 return BULK_OP_LINE_TEMPLATE;
785 * @return the timestampFormat
787 public static String getTimestampFormat() {
788 return TIMESTAMP_FORMAT;