2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.sync;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.List;
31 import javax.ws.rs.core.MediaType;
33 import org.onap.aai.cl.api.Logger;
34 import org.onap.aai.cl.eelf.LoggerFactory;
35 import org.onap.aai.restclient.client.OperationResult;
36 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
37 import org.onap.aai.sparky.logging.AaiUiMsgs;
38 import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
39 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
40 import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
41 import org.onap.aai.sparky.sync.entity.SearchableEntity;
42 import org.onap.aai.sparky.sync.enumeration.OperationState;
44 import com.fasterxml.jackson.core.JsonProcessingException;
45 import com.fasterxml.jackson.databind.JsonNode;
46 import com.fasterxml.jackson.databind.ObjectMapper;
47 import com.fasterxml.jackson.databind.node.ArrayNode;
48 import com.fasterxml.jackson.databind.node.ObjectNode;
51 * The Class ElasticSearchIndexCleaner.
53 public class ElasticSearchIndexCleaner implements IndexCleaner {
55 private static final Logger LOG =
56 LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
58 private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
59 private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
61 private ObjectIdCollection before;
62 private ObjectIdCollection after;
64 private ObjectMapper mapper;
65 private ElasticSearchAdapter esAdapter;
66 private ElasticSearchEndpointConfig endpointConfig;
67 private ElasticSearchSchemaConfig schemaConfig;
70 * Instantiates a new elastic search index cleaner.
72 * @param restDataProvider the rest data provider
73 * @param indexName the index name
74 * @param indexType the index type
75 * @param host the host
76 * @param port the port
77 * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
78 * @param numItemsToGetBulkRequest the num items to get bulk request
80 public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter,
81 ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
82 this.esAdapter = esAdapter;
85 this.endpointConfig = endpointConfig;
86 this.schemaConfig = schemaConfig;
87 this.mapper = new ObjectMapper();
93 * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
96 public OperationState populatePreOperationCollection() {
99 before = retrieveAllDocumentIdentifiers();
100 return OperationState.OK;
101 } catch (Exception exc) {
102 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
103 return OperationState.ERROR;
111 * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
114 public OperationState populatePostOperationCollection() {
116 after = retrieveAllDocumentIdentifiers();
117 return OperationState.OK;
118 } catch (Exception exc) {
119 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
120 return OperationState.ERROR;
127 * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
130 public OperationState performCleanup() {
131 // TODO Auto-generated method stub
132 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
134 int sizeBefore = before.getSize();
135 int sizeAfter = after.getSize();
137 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
138 String.valueOf(sizeAfter));
141 * If the processedImportIds size <= 0, then something has failed in the sync operation and we
142 * shouldn't do the selective delete right now.
147 Collection<String> presyncIds = before.getImportedObjectIds();
148 presyncIds.removeAll(after.getImportedObjectIds());
151 LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(),
152 schemaConfig.getIndexDocType(), String.valueOf(presyncIds.size()));
154 ObjectIdCollection bulkIds = new ObjectIdCollection();
156 Iterator<String> it = presyncIds.iterator();
157 int numItemsInBulkRequest = 0;
158 int numItemsRemainingToBeDeleted = presyncIds.size();
160 while (it.hasNext()) {
162 bulkIds.addObjectId(it.next());
163 numItemsInBulkRequest++;
165 if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
166 LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(),
167 String.valueOf(bulkIds.getSize()));
168 bulkDelete(bulkIds.getImportedObjectIds());
169 numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
170 numItemsInBulkRequest = 0;
175 if (numItemsRemainingToBeDeleted > 0) {
176 LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(),
177 String.valueOf(bulkIds.getSize()));
178 bulkDelete(bulkIds.getImportedObjectIds());
182 } catch (Exception exc) {
183 LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(),
184 exc.getLocalizedMessage());
189 return OperationState.OK;
193 public String getIndexName() {
194 return schemaConfig.getIndexName();
198 * Builds the initial scroll request payload.
200 * @param numItemsToGetPerRequest the num items to get per request
201 * @param fieldList the field list
203 * @throws JsonProcessingException the json processing exception
205 protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
206 List<String> fieldList) throws JsonProcessingException {
208 ObjectNode rootNode = mapper.createObjectNode();
209 rootNode.put("size", numItemsToGetPerRequest);
211 ArrayNode fields = mapper.createArrayNode();
213 for (String f : fieldList) {
217 rootNode.set("fields", fields);
219 ObjectNode queryNode = mapper.createObjectNode();
220 queryNode.set("match_all", mapper.createObjectNode());
222 rootNode.set("query", queryNode);
224 return mapper.writeValueAsString(rootNode);
229 * Builds the subsequent scroll context request payload.
231 * @param scrollId the scroll id
232 * @param contextTimeToLiveInMinutes the context time to live in minutes
234 * @throws JsonProcessingException the json processing exception
236 protected String buildSubsequentScrollContextRequestPayload(String scrollId,
237 int contextTimeToLiveInMinutes) throws JsonProcessingException {
239 ObjectNode rootNode = mapper.createObjectNode();
241 rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
242 rootNode.put("scroll_id", scrollId);
244 return mapper.writeValueAsString(rootNode);
249 * Parses the elastic search result.
251 * @param jsonResult the json result
252 * @return the json node
253 * @throws JsonProcessingException the json processing exception
254 * @throws IOException Signals that an I/O exception has occurred.
256 protected JsonNode parseElasticSearchResult(String jsonResult)
257 throws JsonProcessingException, IOException {
258 ObjectMapper mapper = new ObjectMapper();
259 return mapper.readTree(jsonResult);
266 * @param docs the docs
267 * @return the array list
269 protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
270 List<SearchableEntity> docs) {
271 ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
273 if (ids != null && docs != null) {
274 for (SearchableEntity d : docs) {
275 if (ids.contains(d.getId())) {
285 * Builds the delete data object.
287 * @param index the index
288 * @param type the type
290 * @return the object node
292 protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
294 ObjectNode indexDocProperties = mapper.createObjectNode();
296 indexDocProperties.put("_index", index);
297 indexDocProperties.put("_type", type);
298 indexDocProperties.put("_id", id);
300 ObjectNode rootNode = mapper.createObjectNode();
301 rootNode.set("delete", indexDocProperties);
307 * This method might appear to be a little strange, and is simply an optimization to take an
308 * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
310 * @param startNode the start node
311 * @param fieldPath the field path
312 * @return the node path
314 protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
316 JsonNode jsonNode = null;
318 for (String field : fieldPath) {
319 if (jsonNode == null) {
320 jsonNode = startNode.get(field);
322 jsonNode = jsonNode.get(field);
326 * This is our safety net in case any intermediate path returns a null
329 if (jsonNode == null) {
341 * @param resourceUrl the resource url
342 * @return the full url
344 private String getFullUrl(String resourceUrl) {
345 return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
346 endpointConfig.getEsServerPort(), resourceUrl);
350 * Retrieve all document identifiers.
352 * @return the object id collection
353 * @throws IOException Signals that an I/O exception has occurred.
355 public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
357 ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
359 long opStartTimeInMs = System.currentTimeMillis();
361 List<String> fields = new ArrayList<String>();
363 // fields.add("entityType");
365 String scrollRequestPayload =
366 buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
368 final String fullUrlStr =
369 getFullUrl("/" + schemaConfig.getIndexName() + "/" + schemaConfig.getIndexDocType()
370 + "/_search?scroll=" + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
372 OperationResult result =
373 esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
375 if (result.wasSuccessful()) {
377 JsonNode rootNode = parseElasticSearchResult(result.getResult());
380 * Check the result for success / failure, and enumerate all the index ids that resulted in
381 * success, and ignore the ones that failed or log them so we have a record of the failure.
383 int totalRecordsAvailable = 0;
384 String scrollId = null;
385 int numRecordsFetched = 0;
387 if (rootNode != null) {
389 scrollId = getFieldValue(rootNode, "_scroll_id");
390 final String tookStr = getFieldValue(rootNode, "took");
391 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
392 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
395 LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
396 String.valueOf(tookInMs));
398 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
399 String.valueOf(tookInMs));
402 JsonNode hitsNode = rootNode.get("hits");
403 totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
405 LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
406 String.valueOf(totalRecordsAvailable));
409 * Collect all object ids
412 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
414 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
418 JsonNode jsonNode = null;
420 while (nodeIterator.hasNext()) {
422 jsonNode = nodeIterator.next();
424 key = getFieldValue(jsonNode, "_id");
427 currentDocumentIds.addObjectId(key);
432 int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
434 int numRequiredAdditionalFetches =
435 (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
438 * Do an additional fetch for the remaining items (if needed)
441 if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
442 numRequiredAdditionalFetches += 1;
445 if (LOG.isDebugEnabled()) {
446 LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
447 String.valueOf(numRequiredAdditionalFetches));
451 for (int x = 0; x < numRequiredAdditionalFetches; x++) {
453 if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
454 // abort the whole thing because now we can't reliably cleanup the orphans.
455 throw new IOException(
456 "Failed to collect pre-sync doc collection from index. Aborting operation");
458 if (LOG.isDebugEnabled()) {
459 LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
460 String.valueOf(currentDocumentIds.getSize()),
461 String.valueOf(totalRecordsAvailable));
469 // scroll context get failed, nothing else to do
470 LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
473 LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
474 String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
476 return currentDocumentIds;
481 * Collect items from scroll context.
483 * @param scrollId the scroll id
484 * @param objectIds the object ids
485 * @return the operation state
486 * @throws IOException Signals that an I/O exception has occurred.
488 private OperationState collectItemsFromScrollContext(String scrollId,
489 ObjectIdCollection objectIds) throws IOException {
491 String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
492 endpointConfig.getScrollContextTimeToLiveInMinutes());
494 final String fullUrlStr = getFullUrl("/_search/scroll");
496 OperationResult opResult =
497 esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
499 if (opResult.getResultCode() >= 300) {
500 LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
501 return OperationState.ERROR;
504 JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
505 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
506 final String tookStr = getFieldValue(rootNode, "took");
507 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
509 JsonNode hitsNode = rootNode.get("hits");
512 * Check the result for success / failure, and enumerate all the index ids that resulted in
513 * success, and ignore the ones that failed or log them so we have a record of the failure.
516 if (rootNode != null) {
519 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
521 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
525 * Collect all object ids
528 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
531 JsonNode jsonNode = null;
533 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
535 while (nodeIterator.hasNext()) {
537 jsonNode = nodeIterator.next();
539 key = getFieldValue(jsonNode, "_id");
542 objectIds.addObjectId(key);
549 return OperationState.OK;
553 * Gets the field value.
555 * @param node the node
556 * @param fieldName the field name
557 * @return the field value
559 protected String getFieldValue(JsonNode node, String fieldName) {
561 JsonNode field = node.get(fieldName);
564 return field.asText();
574 * @param docIds the doc ids
575 * @return the operation result
576 * @throws IOException Signals that an I/O exception has occurred.
578 public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
580 if (docIds == null || docIds.size() == 0) {
581 LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
582 return new OperationResult(500,
583 "Skipping bulkDelete(); operation because docs to delete list is empty");
586 LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
588 StringBuilder sb = new StringBuilder(128);
590 for (String id : docIds) {
591 sb.append(String.format(BULK_OP_LINE_TEMPLATE,
592 buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
597 final String fullUrlStr = getFullUrl("/_bulk");
599 return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);