2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.onap.aai.sparky.sync;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Iterator;
31 import java.util.List;
33 import javax.ws.rs.core.MediaType;
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.aai.restclient.client.OperationResult;
38 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
39 import org.onap.aai.sparky.logging.AaiUiMsgs;
40 import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
41 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
42 import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
43 import org.onap.aai.sparky.sync.entity.SearchableEntity;
44 import org.onap.aai.sparky.sync.enumeration.OperationState;
46 import com.fasterxml.jackson.core.JsonProcessingException;
47 import com.fasterxml.jackson.databind.JsonNode;
48 import com.fasterxml.jackson.databind.ObjectMapper;
49 import com.fasterxml.jackson.databind.node.ArrayNode;
50 import com.fasterxml.jackson.databind.node.ObjectNode;
53 * The Class ElasticSearchIndexCleaner.
55 public class ElasticSearchIndexCleaner implements IndexCleaner {
57 private static final Logger LOG =
58 LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
60 private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
61 private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
63 private ObjectIdCollection before;
64 private ObjectIdCollection after;
66 private ObjectMapper mapper;
67 private ElasticSearchAdapter esAdapter;
68 private ElasticSearchEndpointConfig endpointConfig;
69 private ElasticSearchSchemaConfig schemaConfig;
72 * Instantiates a new elastic search index cleaner.
74 * @param restDataProvider the rest data provider
75 * @param indexName the index name
76 * @param indexType the index type
77 * @param host the host
78 * @param port the port
79 * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
80 * @param numItemsToGetBulkRequest the num items to get bulk request
82 public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter, ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
83 this.esAdapter = esAdapter;
86 this.endpointConfig = endpointConfig;
87 this.schemaConfig = schemaConfig;
88 this.mapper = new ObjectMapper();
94 * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
97 public OperationState populatePreOperationCollection() {
100 before = retrieveAllDocumentIdentifiers();
101 return OperationState.OK;
102 } catch (Exception exc) {
103 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
104 return OperationState.ERROR;
112 * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
115 public OperationState populatePostOperationCollection() {
117 after = retrieveAllDocumentIdentifiers();
118 return OperationState.OK;
119 } catch (Exception exc) {
120 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
121 return OperationState.ERROR;
128 * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
131 public OperationState performCleanup() {
132 // TODO Auto-generated method stub
133 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
135 int sizeBefore = before.getSize();
136 int sizeAfter = after.getSize();
138 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
139 String.valueOf(sizeAfter));
142 * If the processedImportIds size <= 0, then something has failed in the sync operation and we
143 * shouldn't do the selective delete right now.
148 Collection<String> presyncIds = before.getImportedObjectIds();
149 presyncIds.removeAll(after.getImportedObjectIds());
152 LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(), schemaConfig.getIndexDocType(),
153 String.valueOf(presyncIds.size()));
155 ObjectIdCollection bulkIds = new ObjectIdCollection();
157 Iterator<String> it = presyncIds.iterator();
158 int numItemsInBulkRequest = 0;
159 int numItemsRemainingToBeDeleted = presyncIds.size();
161 while (it.hasNext()) {
163 bulkIds.addObjectId(it.next());
164 numItemsInBulkRequest++;
166 if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
167 LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
168 bulkDelete(bulkIds.getImportedObjectIds());
169 numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
170 numItemsInBulkRequest = 0;
175 if (numItemsRemainingToBeDeleted > 0) {
176 LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
177 bulkDelete(bulkIds.getImportedObjectIds());
181 } catch (Exception exc) {
182 LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(), exc.getLocalizedMessage());
187 return OperationState.OK;
191 public String getIndexName() {
192 return schemaConfig.getIndexName();
196 * Builds the initial scroll request payload.
198 * @param numItemsToGetPerRequest the num items to get per request
199 * @param fieldList the field list
201 * @throws JsonProcessingException the json processing exception
203 protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
204 List<String> fieldList) throws JsonProcessingException {
206 ObjectNode rootNode = mapper.createObjectNode();
207 rootNode.put("size", numItemsToGetPerRequest);
209 ArrayNode fields = mapper.createArrayNode();
211 for (String f : fieldList) {
215 rootNode.set("fields", fields);
217 ObjectNode queryNode = mapper.createObjectNode();
218 queryNode.set("match_all", mapper.createObjectNode());
220 rootNode.set("query", queryNode);
222 return mapper.writeValueAsString(rootNode);
227 * Builds the subsequent scroll context request payload.
229 * @param scrollId the scroll id
230 * @param contextTimeToLiveInMinutes the context time to live in minutes
232 * @throws JsonProcessingException the json processing exception
234 protected String buildSubsequentScrollContextRequestPayload(String scrollId,
235 int contextTimeToLiveInMinutes) throws JsonProcessingException {
237 ObjectNode rootNode = mapper.createObjectNode();
239 rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
240 rootNode.put("scroll_id", scrollId);
242 return mapper.writeValueAsString(rootNode);
247 * Parses the elastic search result.
249 * @param jsonResult the json result
250 * @return the json node
251 * @throws JsonProcessingException the json processing exception
252 * @throws IOException Signals that an I/O exception has occurred.
254 protected JsonNode parseElasticSearchResult(String jsonResult)
255 throws JsonProcessingException, IOException {
256 ObjectMapper mapper = new ObjectMapper();
257 return mapper.readTree(jsonResult);
264 * @param docs the docs
265 * @return the array list
267 protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
268 List<SearchableEntity> docs) {
269 ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
271 if (ids != null && docs != null) {
272 for (SearchableEntity d : docs) {
273 if (ids.contains(d.getId())) {
283 * Builds the delete data object.
285 * @param index the index
286 * @param type the type
288 * @return the object node
290 protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
292 ObjectNode indexDocProperties = mapper.createObjectNode();
294 indexDocProperties.put("_index", index);
295 indexDocProperties.put("_type", type);
296 indexDocProperties.put("_id", id);
298 ObjectNode rootNode = mapper.createObjectNode();
299 rootNode.set("delete", indexDocProperties);
305 * This method might appear to be a little strange, and is simply an optimization to take an
306 * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
308 * @param startNode the start node
309 * @param fieldPath the field path
310 * @return the node path
312 protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
314 JsonNode jsonNode = null;
316 for (String field : fieldPath) {
317 if (jsonNode == null) {
318 jsonNode = startNode.get(field);
320 jsonNode = jsonNode.get(field);
324 * This is our safety net in case any intermediate path returns a null
327 if (jsonNode == null) {
339 * @param resourceUrl the resource url
340 * @return the full url
342 private String getFullUrl(String resourceUrl) {
343 return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
344 endpointConfig.getEsServerPort(), resourceUrl);
348 * Retrieve all document identifiers.
350 * @return the object id collection
351 * @throws IOException Signals that an I/O exception has occurred.
353 public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
355 ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
357 long opStartTimeInMs = System.currentTimeMillis();
359 List<String> fields = new ArrayList<String>();
361 // fields.add("entityType");
363 String scrollRequestPayload =
364 buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
366 final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName()+ "/" + schemaConfig.getIndexDocType() + "/_search?scroll="
367 + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
369 OperationResult result =
370 esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
372 if (result.wasSuccessful()) {
374 JsonNode rootNode = parseElasticSearchResult(result.getResult());
377 * Check the result for success / failure, and enumerate all the index ids that resulted in
378 * success, and ignore the ones that failed or log them so we have a record of the failure.
380 int totalRecordsAvailable = 0;
381 String scrollId = null;
382 int numRecordsFetched = 0;
384 if (rootNode != null) {
386 scrollId = getFieldValue(rootNode, "_scroll_id");
387 final String tookStr = getFieldValue(rootNode, "took");
388 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
389 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
392 LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
393 String.valueOf(tookInMs));
395 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
396 String.valueOf(tookInMs));
399 JsonNode hitsNode = rootNode.get("hits");
400 totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
402 LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
403 String.valueOf(totalRecordsAvailable));
406 * Collect all object ids
409 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
411 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
415 JsonNode jsonNode = null;
417 while (nodeIterator.hasNext()) {
419 jsonNode = nodeIterator.next();
421 key = getFieldValue(jsonNode, "_id");
424 currentDocumentIds.addObjectId(key);
429 int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
431 int numRequiredAdditionalFetches =
432 (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
435 * Do an additional fetch for the remaining items (if needed)
438 if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
439 numRequiredAdditionalFetches += 1;
442 if (LOG.isDebugEnabled()) {
443 LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
444 String.valueOf(numRequiredAdditionalFetches));
448 for (int x = 0; x < numRequiredAdditionalFetches; x++) {
450 if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
451 // abort the whole thing because now we can't reliably cleanup the orphans.
452 throw new IOException(
453 "Failed to collect pre-sync doc collection from index. Aborting operation");
455 if (LOG.isDebugEnabled()) {
456 LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
457 String.valueOf(currentDocumentIds.getSize()),
458 String.valueOf(totalRecordsAvailable));
466 // scroll context get failed, nothing else to do
467 LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
470 LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
471 String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
473 return currentDocumentIds;
478 * Collect items from scroll context.
480 * @param scrollId the scroll id
481 * @param objectIds the object ids
482 * @return the operation state
483 * @throws IOException Signals that an I/O exception has occurred.
485 private OperationState collectItemsFromScrollContext(String scrollId,
486 ObjectIdCollection objectIds) throws IOException {
488 String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
489 endpointConfig.getScrollContextTimeToLiveInMinutes());
491 final String fullUrlStr = getFullUrl("/_search/scroll");
493 OperationResult opResult =
494 esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
496 if (opResult.getResultCode() >= 300) {
497 LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
498 return OperationState.ERROR;
501 JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
502 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
503 final String tookStr = getFieldValue(rootNode, "took");
504 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
506 JsonNode hitsNode = rootNode.get("hits");
509 * Check the result for success / failure, and enumerate all the index ids that resulted in
510 * success, and ignore the ones that failed or log them so we have a record of the failure.
513 if (rootNode != null) {
516 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
518 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
522 * Collect all object ids
525 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
528 JsonNode jsonNode = null;
530 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
532 while (nodeIterator.hasNext()) {
534 jsonNode = nodeIterator.next();
536 key = getFieldValue(jsonNode, "_id");
539 objectIds.addObjectId(key);
546 return OperationState.OK;
550 * Gets the field value.
552 * @param node the node
553 * @param fieldName the field name
554 * @return the field value
556 protected String getFieldValue(JsonNode node, String fieldName) {
558 JsonNode field = node.get(fieldName);
561 return field.asText();
571 * @param docIds the doc ids
572 * @return the operation result
573 * @throws IOException Signals that an I/O exception has occurred.
575 public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
577 if (docIds == null || docIds.size() == 0) {
578 LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
579 return new OperationResult(500,
580 "Skipping bulkDelete(); operation because docs to delete list is empty");
583 LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
585 StringBuilder sb = new StringBuilder(128);
587 for (String id : docIds) {
588 sb.append(String.format(BULK_OP_LINE_TEMPLATE,
589 buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
594 final String fullUrlStr = getFullUrl("/_bulk");
596 return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);