2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sparky.sync;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.List;
29 import javax.ws.rs.core.MediaType;
31 import org.onap.aai.cl.api.Logger;
32 import org.onap.aai.cl.eelf.LoggerFactory;
33 import org.onap.aai.restclient.client.OperationResult;
34 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
35 import org.onap.aai.sparky.logging.AaiUiMsgs;
36 import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
37 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
38 import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
39 import org.onap.aai.sparky.sync.entity.SearchableEntity;
40 import org.onap.aai.sparky.sync.enumeration.OperationState;
42 import com.fasterxml.jackson.core.JsonProcessingException;
43 import com.fasterxml.jackson.databind.JsonNode;
44 import com.fasterxml.jackson.databind.ObjectMapper;
45 import com.fasterxml.jackson.databind.node.ArrayNode;
46 import com.fasterxml.jackson.databind.node.ObjectNode;
49 * The Class ElasticSearchIndexCleaner.
51 public class ElasticSearchIndexCleaner implements IndexCleaner {
53 private static final Logger LOG =
54 LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
56 private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
57 private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
59 private ObjectIdCollection before;
60 private ObjectIdCollection after;
62 private ObjectMapper mapper;
63 private ElasticSearchAdapter esAdapter;
64 private ElasticSearchEndpointConfig endpointConfig;
65 private ElasticSearchSchemaConfig schemaConfig;
68 * Instantiates a new elastic search index cleaner.
70 * @param restDataProvider the rest data provider
71 * @param indexName the index name
72 * @param indexType the index type
73 * @param host the host
74 * @param port the port
75 * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
76 * @param numItemsToGetBulkRequest the num items to get bulk request
78 public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter, ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
79 this.esAdapter = esAdapter;
82 this.endpointConfig = endpointConfig;
83 this.schemaConfig = schemaConfig;
84 this.mapper = new ObjectMapper();
90 * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
93 public OperationState populatePreOperationCollection() {
96 before = retrieveAllDocumentIdentifiers();
97 return OperationState.OK;
98 } catch (Exception exc) {
99 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
100 return OperationState.ERROR;
108 * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
111 public OperationState populatePostOperationCollection() {
113 after = retrieveAllDocumentIdentifiers();
114 return OperationState.OK;
115 } catch (Exception exc) {
116 LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
117 return OperationState.ERROR;
124 * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
127 public OperationState performCleanup() {
128 // TODO Auto-generated method stub
129 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
131 int sizeBefore = before.getSize();
132 int sizeAfter = after.getSize();
134 LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
135 String.valueOf(sizeAfter));
138 * If the processedImportIds size <= 0, then something has failed in the sync operation and we
139 * shouldn't do the selective delete right now.
144 Collection<String> presyncIds = before.getImportedObjectIds();
145 presyncIds.removeAll(after.getImportedObjectIds());
148 LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(), schemaConfig.getIndexDocType(),
149 String.valueOf(presyncIds.size()));
151 ObjectIdCollection bulkIds = new ObjectIdCollection();
153 Iterator<String> it = presyncIds.iterator();
154 int numItemsInBulkRequest = 0;
155 int numItemsRemainingToBeDeleted = presyncIds.size();
157 while (it.hasNext()) {
159 bulkIds.addObjectId(it.next());
160 numItemsInBulkRequest++;
162 if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
163 LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
164 bulkDelete(bulkIds.getImportedObjectIds());
165 numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
166 numItemsInBulkRequest = 0;
171 if (numItemsRemainingToBeDeleted > 0) {
172 LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
173 bulkDelete(bulkIds.getImportedObjectIds());
177 } catch (Exception exc) {
178 LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(), exc.getLocalizedMessage());
183 return OperationState.OK;
187 public String getIndexName() {
188 return schemaConfig.getIndexName();
192 * Builds the initial scroll request payload.
194 * @param numItemsToGetPerRequest the num items to get per request
195 * @param fieldList the field list
197 * @throws JsonProcessingException the json processing exception
199 protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
200 List<String> fieldList) throws JsonProcessingException {
202 ObjectNode rootNode = mapper.createObjectNode();
203 rootNode.put("size", numItemsToGetPerRequest);
205 ArrayNode fields = mapper.createArrayNode();
207 for (String f : fieldList) {
211 rootNode.set("fields", fields);
213 ObjectNode queryNode = mapper.createObjectNode();
214 queryNode.set("match_all", mapper.createObjectNode());
216 rootNode.set("query", queryNode);
218 return mapper.writeValueAsString(rootNode);
223 * Builds the subsequent scroll context request payload.
225 * @param scrollId the scroll id
226 * @param contextTimeToLiveInMinutes the context time to live in minutes
228 * @throws JsonProcessingException the json processing exception
230 protected String buildSubsequentScrollContextRequestPayload(String scrollId,
231 int contextTimeToLiveInMinutes) throws JsonProcessingException {
233 ObjectNode rootNode = mapper.createObjectNode();
235 rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
236 rootNode.put("scroll_id", scrollId);
238 return mapper.writeValueAsString(rootNode);
243 * Parses the elastic search result.
245 * @param jsonResult the json result
246 * @return the json node
247 * @throws JsonProcessingException the json processing exception
248 * @throws IOException Signals that an I/O exception has occurred.
250 protected JsonNode parseElasticSearchResult(String jsonResult)
251 throws JsonProcessingException, IOException {
252 ObjectMapper mapper = new ObjectMapper();
253 return mapper.readTree(jsonResult);
260 * @param docs the docs
261 * @return the array list
263 protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
264 List<SearchableEntity> docs) {
265 ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
267 if (ids != null && docs != null) {
268 for (SearchableEntity d : docs) {
269 if (ids.contains(d.getId())) {
279 * Builds the delete data object.
281 * @param index the index
282 * @param type the type
284 * @return the object node
286 protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
288 ObjectNode indexDocProperties = mapper.createObjectNode();
290 indexDocProperties.put("_index", index);
291 indexDocProperties.put("_type", type);
292 indexDocProperties.put("_id", id);
294 ObjectNode rootNode = mapper.createObjectNode();
295 rootNode.set("delete", indexDocProperties);
301 * This method might appear to be a little strange, and is simply an optimization to take an
302 * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
304 * @param startNode the start node
305 * @param fieldPath the field path
306 * @return the node path
308 protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
310 JsonNode jsonNode = null;
312 for (String field : fieldPath) {
313 if (jsonNode == null) {
314 jsonNode = startNode.get(field);
316 jsonNode = jsonNode.get(field);
320 * This is our safety net in case any intermediate path returns a null
323 if (jsonNode == null) {
335 * @param resourceUrl the resource url
336 * @return the full url
338 private String getFullUrl(String resourceUrl) {
339 return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
340 endpointConfig.getEsServerPort(), resourceUrl);
344 * Retrieve all document identifiers.
346 * @return the object id collection
347 * @throws IOException Signals that an I/O exception has occurred.
349 public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
351 ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
353 long opStartTimeInMs = System.currentTimeMillis();
355 List<String> fields = new ArrayList<String>();
357 // fields.add("entityType");
359 String scrollRequestPayload =
360 buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
362 final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName()+ "/" + schemaConfig.getIndexDocType() + "/_search?scroll="
363 + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
365 OperationResult result =
366 esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
368 if (result.wasSuccessful()) {
370 JsonNode rootNode = parseElasticSearchResult(result.getResult());
373 * Check the result for success / failure, and enumerate all the index ids that resulted in
374 * success, and ignore the ones that failed or log them so we have a record of the failure.
376 int totalRecordsAvailable = 0;
377 String scrollId = null;
378 int numRecordsFetched = 0;
380 if (rootNode != null) {
382 scrollId = getFieldValue(rootNode, "_scroll_id");
383 final String tookStr = getFieldValue(rootNode, "took");
384 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
385 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
388 LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
389 String.valueOf(tookInMs));
391 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
392 String.valueOf(tookInMs));
395 JsonNode hitsNode = rootNode.get("hits");
396 totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
398 LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
399 String.valueOf(totalRecordsAvailable));
402 * Collect all object ids
405 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
407 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
411 JsonNode jsonNode = null;
413 while (nodeIterator.hasNext()) {
415 jsonNode = nodeIterator.next();
417 key = getFieldValue(jsonNode, "_id");
420 currentDocumentIds.addObjectId(key);
425 int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
427 int numRequiredAdditionalFetches =
428 (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
431 * Do an additional fetch for the remaining items (if needed)
434 if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
435 numRequiredAdditionalFetches += 1;
438 if (LOG.isDebugEnabled()) {
439 LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
440 String.valueOf(numRequiredAdditionalFetches));
444 for (int x = 0; x < numRequiredAdditionalFetches; x++) {
446 if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
447 // abort the whole thing because now we can't reliably cleanup the orphans.
448 throw new IOException(
449 "Failed to collect pre-sync doc collection from index. Aborting operation");
451 if (LOG.isDebugEnabled()) {
452 LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
453 String.valueOf(currentDocumentIds.getSize()),
454 String.valueOf(totalRecordsAvailable));
462 // scroll context get failed, nothing else to do
463 LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
466 LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
467 String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
469 return currentDocumentIds;
474 * Collect items from scroll context.
476 * @param scrollId the scroll id
477 * @param objectIds the object ids
478 * @return the operation state
479 * @throws IOException Signals that an I/O exception has occurred.
481 private OperationState collectItemsFromScrollContext(String scrollId,
482 ObjectIdCollection objectIds) throws IOException {
484 String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
485 endpointConfig.getScrollContextTimeToLiveInMinutes());
487 final String fullUrlStr = getFullUrl("/_search/scroll");
489 OperationResult opResult =
490 esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
492 if (opResult.getResultCode() >= 300) {
493 LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
494 return OperationState.ERROR;
497 JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
498 boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
499 final String tookStr = getFieldValue(rootNode, "took");
500 int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
502 JsonNode hitsNode = rootNode.get("hits");
505 * Check the result for success / failure, and enumerate all the index ids that resulted in
506 * success, and ignore the ones that failed or log them so we have a record of the failure.
509 if (rootNode != null) {
512 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
514 LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
518 * Collect all object ids
521 ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
524 JsonNode jsonNode = null;
526 Iterator<JsonNode> nodeIterator = hitsArray.iterator();
528 while (nodeIterator.hasNext()) {
530 jsonNode = nodeIterator.next();
532 key = getFieldValue(jsonNode, "_id");
535 objectIds.addObjectId(key);
542 return OperationState.OK;
546 * Gets the field value.
548 * @param node the node
549 * @param fieldName the field name
550 * @return the field value
552 protected String getFieldValue(JsonNode node, String fieldName) {
554 JsonNode field = node.get(fieldName);
557 return field.asText();
567 * @param docIds the doc ids
568 * @return the operation result
569 * @throws IOException Signals that an I/O exception has occurred.
571 public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
573 if (docIds == null || docIds.size() == 0) {
574 LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
575 return new OperationResult(500,
576 "Skipping bulkDelete(); operation because docs to delete list is empty");
579 LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
581 StringBuilder sb = new StringBuilder(128);
583 for (String id : docIds) {
584 sb.append(String.format(BULK_OP_LINE_TEMPLATE,
585 buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
590 final String fullUrlStr = getFullUrl("/_bulk");
592 return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);