2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
26 package org.openecomp.sparky.synchronizer;
28 import com.fasterxml.jackson.core.JsonProcessingException;
29 import com.fasterxml.jackson.databind.JsonNode;
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import com.fasterxml.jackson.databind.node.ArrayNode;
32 import com.fasterxml.jackson.databind.node.ObjectNode;
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.Iterator;
38 import java.util.List;
40 import org.openecomp.sparky.dal.rest.OperationResult;
41 import org.openecomp.sparky.dal.rest.RestDataProvider;
42 import org.openecomp.sparky.synchronizer.entity.ObjectIdCollection;
43 import org.openecomp.sparky.synchronizer.entity.SearchableEntity;
44 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
45 import org.openecomp.cl.api.Logger;
46 import org.openecomp.cl.eelf.LoggerFactory;
47 import org.openecomp.sparky.logging.AaiUiMsgs;
50 * The Class ElasticSearchIndexCleaner.
52 public class ElasticSearchIndexCleaner implements IndexCleaner {
54 private static final Logger LOG =
55 LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
57 private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
58 private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
60 private ObjectIdCollection before;
61 private ObjectIdCollection after;
66 private String indexName;
67 private String indexType;
68 private int scrollContextTimeToLiveInMinutes;
69 private int numItemsToGetBulkRequest;
71 private RestDataProvider restDataProvider;
72 private ObjectMapper mapper;
75 * Instantiates a new elastic search index cleaner.
77 * @param restDataProvider the rest data provider
78 * @param indexName the index name
79 * @param indexType the index type
80 * @param host the host
81 * @param port the port
82 * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
83 * @param numItemsToGetBulkRequest the num items to get bulk request
85 protected ElasticSearchIndexCleaner(RestDataProvider restDataProvider, String indexName,
86 String indexType, String host, String port, int scrollContextTimeToLiveInMinutes,
87 int numItemsToGetBulkRequest) {
88 this.restDataProvider = restDataProvider;
91 this.indexName = indexName;
92 this.indexType = indexType;
93 this.mapper = new ObjectMapper();
96 this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
97 this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
101 * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
104 public OperationState populatePreOperationCollection() {
107 before = retrieveAllDocumentIdentifiers();
108 return OperationState.OK;
109 } catch (Exception exc) {
110 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
111 return OperationState.ERROR;
117 * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
120 public OperationState populatePostOperationCollection() {
122 after = retrieveAllDocumentIdentifiers();
123 return OperationState.OK;
124 } catch (Exception exc) {
125 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
126 return OperationState.ERROR;
131 * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
134 public OperationState performCleanup() {
135 // TODO Auto-generated method stub
136 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, indexName);
138 int sizeBefore = before.getSize();
139 int sizeAfter = after.getSize();
141 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
142 String.valueOf(sizeAfter));
145 * If the processedImportIds size <= 0, then something has failed in the sync operation and we
146 * shouldn't do the selective delete right now.
151 Collection<String> presyncIds = before.getImportedObjectIds();
152 presyncIds.removeAll(after.getImportedObjectIds());
155 LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, indexName, indexType,
156 String.valueOf(presyncIds.size()));
158 ObjectIdCollection bulkIds = new ObjectIdCollection();
160 Iterator<String> it = presyncIds.iterator();
161 int numItemsInBulkRequest = 0;
162 int numItemsRemainingToBeDeleted = presyncIds.size();
164 while (it.hasNext()) {
166 bulkIds.addObjectId(it.next());
167 numItemsInBulkRequest++;
169 if (numItemsInBulkRequest >= this.numItemsToGetBulkRequest) {
170 LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
171 OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIds());
172 // pegCountersForElasticBulkDelete(bulkDeleteResult);
173 numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
174 numItemsInBulkRequest = 0;
179 if (numItemsRemainingToBeDeleted > 0) {
180 LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
181 OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIds());
182 // pegCountersForElasticBulkDelete(bulkDeleteResult);
186 } catch (Exception exc) {
187 LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, indexName, exc.getLocalizedMessage());
192 return OperationState.OK;
196 public String getIndexName() {
200 public void setIndexName(String indexName) {
201 this.indexName = indexName;
205 * Builds the initial scroll request payload.
207 * @param numItemsToGetPerRequest the num items to get per request
208 * @param fieldList the field list
210 * @throws JsonProcessingException the json processing exception
212 protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
213 List<String> fieldList) throws JsonProcessingException {
215 ObjectNode rootNode = mapper.createObjectNode();
216 rootNode.put("size", numItemsToGetPerRequest);
218 ArrayNode fields = mapper.createArrayNode();
220 for (String f : fieldList) {
224 rootNode.set("fields", fields);
226 ObjectNode queryNode = mapper.createObjectNode();
227 queryNode.set("match_all", mapper.createObjectNode());
229 rootNode.set("query", queryNode);
231 return mapper.writeValueAsString(rootNode);
236 * Builds the subsequent scroll context request payload.
238 * @param scrollId the scroll id
239 * @param contextTimeToLiveInMinutes the context time to live in minutes
241 * @throws JsonProcessingException the json processing exception
243 protected String buildSubsequentScrollContextRequestPayload(String scrollId,
244 int contextTimeToLiveInMinutes) throws JsonProcessingException {
246 ObjectNode rootNode = mapper.createObjectNode();
248 rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
249 rootNode.put("scroll_id", scrollId);
251 return mapper.writeValueAsString(rootNode);
256 * Parses the elastic search result.
258 * @param jsonResult the json result
259 * @return the json node
260 * @throws JsonProcessingException the json processing exception
261 * @throws IOException Signals that an I/O exception has occurred.
263 protected JsonNode parseElasticSearchResult(String jsonResult)
264 throws JsonProcessingException, IOException {
265 ObjectMapper mapper = new ObjectMapper();
266 return mapper.readTree(jsonResult);
273 * @param docs the docs
274 * @return the array list
276 protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
277 List<SearchableEntity> docs) {
278 ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
280 if (ids != null && docs != null) {
281 for (SearchableEntity d : docs) {
282 if (ids.contains(d.getId())) {
292 * Builds the delete data object.
294 * @param index the index
295 * @param type the type
297 * @return the object node
299 protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
301 ObjectNode indexDocProperties = mapper.createObjectNode();
303 indexDocProperties.put("_index", index);
304 indexDocProperties.put("_type", type);
305 indexDocProperties.put("_id", id);
307 ObjectNode rootNode = mapper.createObjectNode();
308 rootNode.set("delete", indexDocProperties);
314 * This method might appear to be a little strange, and is simply an optimization to take an
315 * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
317 * @param startNode the start node
318 * @param fieldPath the field path
319 * @return the node path
321 protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
323 JsonNode jsonNode = null;
325 for (String field : fieldPath) {
326 if (jsonNode == null) {
327 jsonNode = startNode.get(field);
329 jsonNode = jsonNode.get(field);
333 * This is our safety net in case any intermediate path returns a null
336 if (jsonNode == null) {
348 * @param resourceUrl the resource url
349 * @return the full url
351 private String getFullUrl(String resourceUrl) {
352 return String.format("http://%s:%s%s", host, port, resourceUrl);
356 * Retrieve all document identifiers.
358 * @return the object id collection
359 * @throws IOException Signals that an I/O exception has occurred.
361 public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
363 ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
365 long opStartTimeInMs = System.currentTimeMillis();
367 List<String> fields = new ArrayList<String>();
369 // fields.add("entityType");
371 String scrollRequestPayload =
372 buildInitialScrollRequestPayload(this.numItemsToGetBulkRequest, fields);
374 final String fullUrlStr = getFullUrl("/" + indexName + "/" + indexType + "/_search?scroll="
375 + this.scrollContextTimeToLiveInMinutes + "m");
377 OperationResult result =
378 restDataProvider.doPost(fullUrlStr, scrollRequestPayload, "application/json");
380 if (result.wasSuccessful()) {
382 JsonNode rootNode = parseElasticSearchResult(result.getResult());
385 * Check the result for success / failure, and enumerate all the index ids that resulted in
386 * success, and ignore the ones that failed or log them so we have a record of the failure.
388 int totalRecordsAvailable = 0;
389 String scrollId = null;
390 int numRecordsFetched = 0;
392 if (rootNode != null) {
394 scrollId = getFieldValue(rootNode, "_scroll_id");
395 final String tookStr = getFieldValue(rootNode, "took");
396 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
397 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
400 LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
401 String.valueOf(tookInMs));
403 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
404 String.valueOf(tookInMs));
407 JsonNode hitsNode = rootNode.get("hits");
408 totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
410 LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
411 String.valueOf(totalRecordsAvailable));
414 * Collect all object ids
417 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
419 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
423 JsonNode jsonNode = null;
425 while (nodeIterator.hasNext()) {
427 jsonNode = nodeIterator.next();
429 key = getFieldValue(jsonNode, "_id");
432 currentDocumentIds.addObjectId(key);
438 * JsonNode fieldsNode = jNode.get("fields");
440 * if (fieldsNode != null) {
442 * JsonNode entityTypeNode = fieldsNode.get("entityType");
444 * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
446 * if (aNode.size() > 0) { value = aNode.get(0).asText(); objAndtTypesMap.put(key, value);
447 * numRecordsFetched++; } } } }
452 int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
454 int numRequiredAdditionalFetches =
455 (totalRecordsRemainingToFetch / this.numItemsToGetBulkRequest);
458 * Do an additional fetch for the remaining items (if needed)
461 if (totalRecordsRemainingToFetch % numItemsToGetBulkRequest != 0) {
462 numRequiredAdditionalFetches += 1;
465 if (LOG.isDebugEnabled()) {
466 LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
467 String.valueOf(numRequiredAdditionalFetches));
471 for (int x = 0; x < numRequiredAdditionalFetches; x++) {
473 if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
474 // abort the whole thing because now we can't reliably cleanup the orphans.
475 throw new IOException(
476 "Failed to collect pre-sync doc collection from index. Aborting operation");
478 if (LOG.isDebugEnabled()) {
479 LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
480 String.valueOf(currentDocumentIds.getSize()),
481 String.valueOf(totalRecordsAvailable));
489 // scroll context get failed, nothing else to do
490 LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
493 LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
494 String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
496 return currentDocumentIds;
501 * Collect items from scroll context.
503 * @param scrollId the scroll id
504 * @param objectIds the object ids
505 * @return the operation state
506 * @throws IOException Signals that an I/O exception has occurred.
508 private OperationState collectItemsFromScrollContext(String scrollId,
509 ObjectIdCollection objectIds) throws IOException {
511 // ObjectIdCollection documentIdCollection = new ObjectIdCollection();
513 String requestPayload =
514 buildSubsequentScrollContextRequestPayload(scrollId, scrollContextTimeToLiveInMinutes);
516 final String fullUrlStr = getFullUrl("/_search/scroll");
518 OperationResult opResult =
519 restDataProvider.doPost(fullUrlStr, requestPayload, "application/json");
521 if (opResult.getResultCode() >= 300) {
522 LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
523 return OperationState.ERROR;
526 JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
527 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
528 final String tookStr = getFieldValue(rootNode, "took");
529 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
531 JsonNode hitsNode = rootNode.get("hits");
534 * Check the result for success / failure, and enumerate all the index ids that resulted in
535 * success, and ignore the ones that failed or log them so we have a record of the failure.
538 if (rootNode != null) {
541 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
543 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
547 * Collect all object ids
550 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
553 JsonNode jsonNode = null;
555 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
557 while (nodeIterator.hasNext()) {
559 jsonNode = nodeIterator.next();
561 key = getFieldValue(jsonNode, "_id");
564 objectIds.addObjectId(key);
567 * JsonNode fieldsNode = jNode.get("fields");
569 * if (fieldsNode != null) {
571 * JsonNode entityTypeNode = fieldsNode.get("entityType");
573 * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
575 * if (aNode.size() > 0) { value = aNode.get(0).asText(); objectIdsAndTypes.put(key,
584 return OperationState.OK;
588 * Gets the field value.
590 * @param node the node
591 * @param fieldName the field name
592 * @return the field value
594 protected String getFieldValue(JsonNode node, String fieldName) {
596 JsonNode field = node.get(fieldName);
599 return field.asText();
609 * @param docIds the doc ids
610 * @return the operation result
611 * @throws IOException Signals that an I/O exception has occurred.
613 public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
615 if (docIds == null || docIds.size() == 0) {
616 LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
617 return new OperationResult(500,
618 "Skipping bulkDelete(); operation because docs to delete list is empty");
621 LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
623 StringBuilder sb = new StringBuilder(128);
625 for (String id : docIds) {
627 String.format(BULK_OP_LINE_TEMPLATE, buildDeleteDataObject(indexName, indexType, id)));
632 final String fullUrlStr = getFullUrl("/_bulk");
634 return restDataProvider.doPost(fullUrlStr, sb.toString(), "application/x-www-form-urlencoded");