X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Faai%2Fsparky%2Fsync%2FElasticSearchIndexCleaner.java;fp=src%2Fmain%2Fjava%2Forg%2Fonap%2Faai%2Fsparky%2Fsynchronizer%2FElasticSearchIndexCleaner.java;h=e1785d465cdf9f0f3c830601688036242a4528a9;hb=7b42006c51d4f014f72ae39170544a85d2f09fb5;hp=59942dc0ce7d6a521f9e43203cb015eaebfdb313;hpb=1249ee87c69c9c4566f2c52c5cbb56e4089044c4;p=aai%2Fsparky-be.git diff --git a/src/main/java/org/onap/aai/sparky/synchronizer/ElasticSearchIndexCleaner.java b/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java similarity index 65% rename from src/main/java/org/onap/aai/sparky/synchronizer/ElasticSearchIndexCleaner.java rename to src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java index 59942dc..e1785d4 100644 --- a/src/main/java/org/onap/aai/sparky/synchronizer/ElasticSearchIndexCleaner.java +++ b/src/main/java/org/onap/aai/sparky/sync/ElasticSearchIndexCleaner.java @@ -20,13 +20,7 @@ * * ECOMP is a trademark and service mark of AT&T Intellectual Property. */ -package org.onap.aai.sparky.synchronizer; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; +package org.onap.aai.sparky.sync; import java.io.IOException; import java.util.ArrayList; @@ -34,14 +28,24 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import org.onap.aai.sparky.dal.rest.OperationResult; -import org.onap.aai.sparky.dal.rest.RestDataProvider; -import org.onap.aai.sparky.logging.AaiUiMsgs; -import org.onap.aai.sparky.synchronizer.entity.ObjectIdCollection; -import org.onap.aai.sparky.synchronizer.entity.SearchableEntity; -import org.onap.aai.sparky.synchronizer.enumeration.OperationState; +import javax.ws.rs.core.MediaType; + import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.restclient.client.OperationResult; +import org.onap.aai.sparky.dal.ElasticSearchAdapter; +import org.onap.aai.sparky.logging.AaiUiMsgs; +import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig; +import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig; +import org.onap.aai.sparky.sync.entity.ObjectIdCollection; +import org.onap.aai.sparky.sync.entity.SearchableEntity; +import org.onap.aai.sparky.sync.enumeration.OperationState; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * The Class ElasticSearchIndexCleaner. @@ -57,16 +61,10 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { private ObjectIdCollection before; private ObjectIdCollection after; - private String host; - private String port; - - private String indexName; - private String indexType; - private int scrollContextTimeToLiveInMinutes; - private int numItemsToGetBulkRequest; - - private RestDataProvider restDataProvider; private ObjectMapper mapper; + private ElasticSearchAdapter esAdapter; + private ElasticSearchEndpointConfig endpointConfig; + private ElasticSearchSchemaConfig schemaConfig; /** * Instantiates a new elastic search index cleaner. @@ -79,25 +77,20 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes * @param numItemsToGetBulkRequest the num items to get bulk request */ - protected ElasticSearchIndexCleaner(RestDataProvider restDataProvider, String indexName, - String indexType, String host, String port, int scrollContextTimeToLiveInMinutes, - int numItemsToGetBulkRequest) { - this.restDataProvider = restDataProvider; + public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter, + ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) { + this.esAdapter = esAdapter; this.before = null; this.after = null; - this.indexName = indexName; - this.indexType = indexType; + this.endpointConfig = endpointConfig; + this.schemaConfig = schemaConfig; this.mapper = new ObjectMapper(); - this.host = host; - this.port = port; - this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes; - this.numItemsToGetBulkRequest = numItemsToGetBulkRequest; } /* * (non-Javadoc) * - * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePreOperationCollection() + * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection() */ @Override public OperationState populatePreOperationCollection() { @@ -106,7 +99,7 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { before = retrieveAllDocumentIdentifiers(); return OperationState.OK; } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage()); + LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage()); return OperationState.ERROR; } @@ -115,7 +108,7 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { /* * (non-Javadoc) * - * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePostOperationCollection() + * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection() */ @Override public OperationState populatePostOperationCollection() { @@ -123,7 +116,7 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { after = retrieveAllDocumentIdentifiers(); return OperationState.OK; } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage()); + LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage()); return OperationState.ERROR; } } @@ -131,12 +124,12 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { /* * (non-Javadoc) * - * @see org.onap.aai.sparky.synchronizer.IndexCleaner#performCleanup() + * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup() */ @Override public OperationState performCleanup() { // TODO Auto-generated method stub - LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, indexName); + LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName()); int sizeBefore = before.getSize(); int sizeAfter = after.getSize(); @@ -151,12 +144,12 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { if (sizeAfter > 0) { - Collection presyncIds = before.getImportedObjectIdsAsValues(); - presyncIds.removeAll(after.getImportedObjectIdsAsValues()); + Collection presyncIds = before.getImportedObjectIds(); + presyncIds.removeAll(after.getImportedObjectIds()); try { - LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, indexName, indexType, - String.valueOf(presyncIds.size())); + LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(), + schemaConfig.getIndexDocType(), String.valueOf(presyncIds.size())); ObjectIdCollection bulkIds = new ObjectIdCollection(); @@ -169,10 +162,10 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { bulkIds.addObjectId(it.next()); numItemsInBulkRequest++; - if (numItemsInBulkRequest >= this.numItemsToGetBulkRequest) { - LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize())); - OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues()); - // pegCountersForElasticBulkDelete(bulkDeleteResult); + if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) { + LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), + String.valueOf(bulkIds.getSize())); + bulkDelete(bulkIds.getImportedObjectIds()); numItemsRemainingToBeDeleted -= numItemsInBulkRequest; numItemsInBulkRequest = 0; bulkIds.clear(); @@ -180,14 +173,15 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { } if (numItemsRemainingToBeDeleted > 0) { - LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize())); - OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues()); - // pegCountersForElasticBulkDelete(bulkDeleteResult); + LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), + String.valueOf(bulkIds.getSize())); + bulkDelete(bulkIds.getImportedObjectIds()); } } catch (Exception exc) { - LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, indexName, exc.getLocalizedMessage()); + LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(), + exc.getLocalizedMessage()); } } @@ -197,11 +191,7 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { @Override public String getIndexName() { - return indexName; - } - - public void setIndexName(String indexName) { - this.indexName = indexName; + return schemaConfig.getIndexName(); } /** @@ -352,7 +342,8 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { * @return the full url */ private String getFullUrl(String resourceUrl) { - return String.format("http://%s:%s%s", host, port, resourceUrl); + return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(), + endpointConfig.getEsServerPort(), resourceUrl); } /** @@ -372,13 +363,14 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { // fields.add("entityType"); String scrollRequestPayload = - buildInitialScrollRequestPayload(this.numItemsToGetBulkRequest, fields); + buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields); - final String fullUrlStr = getFullUrl("/" + indexName + "/" + indexType + "/_search?scroll=" - + this.scrollContextTimeToLiveInMinutes + "m"); + final String fullUrlStr = + getFullUrl("/" + schemaConfig.getIndexName() + "/" + schemaConfig.getIndexDocType() + + "/_search?scroll=" + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m"); OperationResult result = - restDataProvider.doPost(fullUrlStr, scrollRequestPayload, "application/json"); + esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE); if (result.wasSuccessful()) { @@ -435,33 +427,18 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { currentDocumentIds.addObjectId(key); } - /* - * if (key != null) { - * - * JsonNode fieldsNode = jNode.get("fields"); - * - * if (fieldsNode != null) { - * - * JsonNode entityTypeNode = fieldsNode.get("entityType"); - * - * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode; - * - * if (aNode.size() > 0) { value = aNode.get(0).asText(); objAndtTypesMap.put(key, value); - * numRecordsFetched++; } } } } - */ - } int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched); int numRequiredAdditionalFetches = - (totalRecordsRemainingToFetch / this.numItemsToGetBulkRequest); + (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize()); /* * Do an additional fetch for the remaining items (if needed) */ - if (totalRecordsRemainingToFetch % numItemsToGetBulkRequest != 0) { + if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) { numRequiredAdditionalFetches += 1; } @@ -511,15 +488,13 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { private OperationState collectItemsFromScrollContext(String scrollId, ObjectIdCollection objectIds) throws IOException { - // ObjectIdCollection documentIdCollection = new ObjectIdCollection(); - - String requestPayload = - buildSubsequentScrollContextRequestPayload(scrollId, scrollContextTimeToLiveInMinutes); + String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId, + endpointConfig.getScrollContextTimeToLiveInMinutes()); final String fullUrlStr = getFullUrl("/_search/scroll"); OperationResult opResult = - restDataProvider.doPost(fullUrlStr, requestPayload, "application/json"); + esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE); if (opResult.getResultCode() >= 300) { LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult()); @@ -527,6 +502,11 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { } JsonNode rootNode = parseElasticSearchResult(opResult.getResult()); + boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out")); + final String tookStr = getFieldValue(rootNode, "took"); + int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr); + + JsonNode hitsNode = rootNode.get("hits"); /* * Check the result for success / failure, and enumerate all the index ids that resulted in @@ -534,11 +514,6 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { */ if (rootNode != null) { - boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out")); - final String tookStr = getFieldValue(rootNode, "took"); - int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr); - - JsonNode hitsNode = rootNode.get("hits"); if (timedOut) { LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs)); @@ -566,25 +541,9 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { if (key != null) { objectIds.addObjectId(key); - /* - * JsonNode fieldsNode = jNode.get("fields"); - * - * if (fieldsNode != null) { - * - * JsonNode entityTypeNode = fieldsNode.get("entityType"); - * - * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode; - * - * if (aNode.size() > 0) { value = aNode.get(0).asText(); objectIdsAndTypes.put(key, - * value); } } } } - */ - } } - } else { - // scroll context get failed, nothing else to do - LOG.error(AaiUiMsgs.ERROR_GENERIC, opResult.toString()); } return OperationState.OK; @@ -629,163 +588,16 @@ public class ElasticSearchIndexCleaner implements IndexCleaner { StringBuilder sb = new StringBuilder(128); for (String id : docIds) { - sb.append( - String.format(BULK_OP_LINE_TEMPLATE, buildDeleteDataObject(indexName, indexType, id))); + sb.append(String.format(BULK_OP_LINE_TEMPLATE, + buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id))); } sb.append("\n"); final String fullUrlStr = getFullUrl("/_bulk"); - return restDataProvider.doPost(fullUrlStr, sb.toString(), "application/x-www-form-urlencoded"); - - } - - /** - * @return the before - */ - public ObjectIdCollection getBefore() { - return before; - } - - /** - * @param before the before to set - */ - public void setBefore(ObjectIdCollection before) { - this.before = before; - } - - /** - * @return the after - */ - public ObjectIdCollection getAfter() { - return after; - } - - /** - * @param after the after to set - */ - public void setAfter(ObjectIdCollection after) { - this.after = after; - } - - /** - * @return the host - */ - public String getHost() { - return host; - } - - /** - * @param host the host to set - */ - public void setHost(String host) { - this.host = host; - } - - /** - * @return the port - */ - public String getPort() { - return port; - } - - /** - * @param port the port to set - */ - public void setPort(String port) { - this.port = port; - } - - /** - * @return the indexType - */ - public String getIndexType() { - return indexType; - } - - /** - * @param indexType the indexType to set - */ - public void setIndexType(String indexType) { - this.indexType = indexType; - } - - /** - * @return the scrollContextTimeToLiveInMinutes - */ - public int getScrollContextTimeToLiveInMinutes() { - return scrollContextTimeToLiveInMinutes; - } - - /** - * @param scrollContextTimeToLiveInMinutes the scrollContextTimeToLiveInMinutes to set - */ - public void setScrollContextTimeToLiveInMinutes(int scrollContextTimeToLiveInMinutes) { - this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes; - } + return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE); - /** - * @return the numItemsToGetBulkRequest - */ - public int getNumItemsToGetBulkRequest() { - return numItemsToGetBulkRequest; - } - - /** - * @param numItemsToGetBulkRequest the numItemsToGetBulkRequest to set - */ - public void setNumItemsToGetBulkRequest(int numItemsToGetBulkRequest) { - this.numItemsToGetBulkRequest = numItemsToGetBulkRequest; - } - - /** - * @return the restDataProvider - */ - public RestDataProvider getRestDataProvider() { - return restDataProvider; - } - - /** - * @param restDataProvider the restDataProvider to set - */ - public void setRestDataProvider(RestDataProvider restDataProvider) { - this.restDataProvider = restDataProvider; - } - - /** - * @return the mapper - */ - public ObjectMapper getMapper() { - return mapper; - } - - /** - * @param mapper the mapper to set - */ - public void setMapper(ObjectMapper mapper) { - this.mapper = mapper; - } - - /** - * @return the log - */ - public static Logger getLog() { - return LOG; - } - - /** - * @return the bulkOpLineTemplate - */ - public static String getBulkOpLineTemplate() { - return BULK_OP_LINE_TEMPLATE; - } - - /** - * @return the timestampFormat - */ - public static String getTimestampFormat() { - return TIMESTAMP_FORMAT; } /*