*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
-package org.onap.aai.sparky.synchronizer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+package org.onap.aai.sparky.sync;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.onap.aai.sparky.dal.rest.OperationResult;
-import org.onap.aai.sparky.dal.rest.RestDataProvider;
-import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.synchronizer.entity.ObjectIdCollection;
-import org.onap.aai.sparky.synchronizer.entity.SearchableEntity;
-import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
+import javax.ws.rs.core.MediaType;
+
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.restclient.client.OperationResult;
+import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.logging.AaiUiMsgs;
+import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
+import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
+import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
+import org.onap.aai.sparky.sync.entity.SearchableEntity;
+import org.onap.aai.sparky.sync.enumeration.OperationState;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* The Class ElasticSearchIndexCleaner.
private ObjectIdCollection before;
private ObjectIdCollection after;
- private String host;
- private String port;
-
- private String indexName;
- private String indexType;
- private int scrollContextTimeToLiveInMinutes;
- private int numItemsToGetBulkRequest;
-
- private RestDataProvider restDataProvider;
private ObjectMapper mapper;
+ private ElasticSearchAdapter esAdapter;
+ private ElasticSearchEndpointConfig endpointConfig;
+ private ElasticSearchSchemaConfig schemaConfig;
/**
* Instantiates a new elastic search index cleaner.
* @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
* @param numItemsToGetBulkRequest the num items to get bulk request
*/
- protected ElasticSearchIndexCleaner(RestDataProvider restDataProvider, String indexName,
- String indexType, String host, String port, int scrollContextTimeToLiveInMinutes,
- int numItemsToGetBulkRequest) {
- this.restDataProvider = restDataProvider;
+ public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter,
+ ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
+ this.esAdapter = esAdapter;
this.before = null;
this.after = null;
- this.indexName = indexName;
- this.indexType = indexType;
+ this.endpointConfig = endpointConfig;
+ this.schemaConfig = schemaConfig;
this.mapper = new ObjectMapper();
- this.host = host;
- this.port = port;
- this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
- this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
}
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
+ * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
*/
@Override
public OperationState populatePreOperationCollection() {
before = retrieveAllDocumentIdentifiers();
return OperationState.OK;
} catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
+ LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
return OperationState.ERROR;
}
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
+ * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
*/
@Override
public OperationState populatePostOperationCollection() {
after = retrieveAllDocumentIdentifiers();
return OperationState.OK;
} catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
+ LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
return OperationState.ERROR;
}
}
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexCleaner#performCleanup()
+ * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
*/
@Override
public OperationState performCleanup() {
// TODO Auto-generated method stub
- LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, indexName);
+ LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
int sizeBefore = before.getSize();
int sizeAfter = after.getSize();
if (sizeAfter > 0) {
- Collection<String> presyncIds = before.getImportedObjectIdsAsValues();
- presyncIds.removeAll(after.getImportedObjectIdsAsValues());
+ Collection<String> presyncIds = before.getImportedObjectIds();
+ presyncIds.removeAll(after.getImportedObjectIds());
try {
- LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, indexName, indexType,
- String.valueOf(presyncIds.size()));
+ LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(),
+ schemaConfig.getIndexDocType(), String.valueOf(presyncIds.size()));
ObjectIdCollection bulkIds = new ObjectIdCollection();
bulkIds.addObjectId(it.next());
numItemsInBulkRequest++;
- if (numItemsInBulkRequest >= this.numItemsToGetBulkRequest) {
- LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
- OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues());
- // pegCountersForElasticBulkDelete(bulkDeleteResult);
+ if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
+ LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(),
+ String.valueOf(bulkIds.getSize()));
+ bulkDelete(bulkIds.getImportedObjectIds());
numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
numItemsInBulkRequest = 0;
bulkIds.clear();
}
if (numItemsRemainingToBeDeleted > 0) {
- LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
- OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues());
- // pegCountersForElasticBulkDelete(bulkDeleteResult);
+ LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(),
+ String.valueOf(bulkIds.getSize()));
+ bulkDelete(bulkIds.getImportedObjectIds());
}
} catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, indexName, exc.getLocalizedMessage());
+ LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(),
+ exc.getLocalizedMessage());
}
}
@Override
public String getIndexName() {
- return indexName;
- }
-
- public void setIndexName(String indexName) {
- this.indexName = indexName;
+ return schemaConfig.getIndexName();
}
/**
* @return the full url
*/
private String getFullUrl(String resourceUrl) {
- return String.format("http://%s:%s%s", host, port, resourceUrl);
+ return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
+ endpointConfig.getEsServerPort(), resourceUrl);
}
/**
// fields.add("entityType");
String scrollRequestPayload =
- buildInitialScrollRequestPayload(this.numItemsToGetBulkRequest, fields);
+ buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
- final String fullUrlStr = getFullUrl("/" + indexName + "/" + indexType + "/_search?scroll="
- + this.scrollContextTimeToLiveInMinutes + "m");
+ final String fullUrlStr =
+ getFullUrl("/" + schemaConfig.getIndexName() + "/" + schemaConfig.getIndexDocType()
+ + "/_search?scroll=" + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
OperationResult result =
- restDataProvider.doPost(fullUrlStr, scrollRequestPayload, "application/json");
+ esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
if (result.wasSuccessful()) {
currentDocumentIds.addObjectId(key);
}
- /*
- * if (key != null) {
- *
- * JsonNode fieldsNode = jNode.get("fields");
- *
- * if (fieldsNode != null) {
- *
- * JsonNode entityTypeNode = fieldsNode.get("entityType");
- *
- * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
- *
- * if (aNode.size() > 0) { value = aNode.get(0).asText(); objAndtTypesMap.put(key, value);
- * numRecordsFetched++; } } } }
- */
-
}
int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
int numRequiredAdditionalFetches =
- (totalRecordsRemainingToFetch / this.numItemsToGetBulkRequest);
+ (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
/*
* Do an additional fetch for the remaining items (if needed)
*/
- if (totalRecordsRemainingToFetch % numItemsToGetBulkRequest != 0) {
+ if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
numRequiredAdditionalFetches += 1;
}
private OperationState collectItemsFromScrollContext(String scrollId,
ObjectIdCollection objectIds) throws IOException {
- // ObjectIdCollection documentIdCollection = new ObjectIdCollection();
-
- String requestPayload =
- buildSubsequentScrollContextRequestPayload(scrollId, scrollContextTimeToLiveInMinutes);
+ String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
+ endpointConfig.getScrollContextTimeToLiveInMinutes());
final String fullUrlStr = getFullUrl("/_search/scroll");
OperationResult opResult =
- restDataProvider.doPost(fullUrlStr, requestPayload, "application/json");
+ esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
if (opResult.getResultCode() >= 300) {
LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
}
JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
+ boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
+ final String tookStr = getFieldValue(rootNode, "took");
+ int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
+
+ JsonNode hitsNode = rootNode.get("hits");
/*
* Check the result for success / failure, and enumerate all the index ids that resulted in
*/
if (rootNode != null) {
- boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
- final String tookStr = getFieldValue(rootNode, "took");
- int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
-
- JsonNode hitsNode = rootNode.get("hits");
if (timedOut) {
LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
if (key != null) {
objectIds.addObjectId(key);
- /*
- * JsonNode fieldsNode = jNode.get("fields");
- *
- * if (fieldsNode != null) {
- *
- * JsonNode entityTypeNode = fieldsNode.get("entityType");
- *
- * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
- *
- * if (aNode.size() > 0) { value = aNode.get(0).asText(); objectIdsAndTypes.put(key,
- * value); } } } }
- */
-
}
}
- } else {
- // scroll context get failed, nothing else to do
- LOG.error(AaiUiMsgs.ERROR_GENERIC, opResult.toString());
}
return OperationState.OK;
StringBuilder sb = new StringBuilder(128);
for (String id : docIds) {
- sb.append(
- String.format(BULK_OP_LINE_TEMPLATE, buildDeleteDataObject(indexName, indexType, id)));
+ sb.append(String.format(BULK_OP_LINE_TEMPLATE,
+ buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
}
sb.append("\n");
final String fullUrlStr = getFullUrl("/_bulk");
- return restDataProvider.doPost(fullUrlStr, sb.toString(), "application/x-www-form-urlencoded");
-
- }
-
- /**
- * @return the before
- */
- public ObjectIdCollection getBefore() {
- return before;
- }
-
- /**
- * @param before the before to set
- */
- public void setBefore(ObjectIdCollection before) {
- this.before = before;
- }
-
- /**
- * @return the after
- */
- public ObjectIdCollection getAfter() {
- return after;
- }
-
- /**
- * @param after the after to set
- */
- public void setAfter(ObjectIdCollection after) {
- this.after = after;
- }
-
- /**
- * @return the host
- */
- public String getHost() {
- return host;
- }
-
- /**
- * @param host the host to set
- */
- public void setHost(String host) {
- this.host = host;
- }
-
- /**
- * @return the port
- */
- public String getPort() {
- return port;
- }
-
- /**
- * @param port the port to set
- */
- public void setPort(String port) {
- this.port = port;
- }
-
- /**
- * @return the indexType
- */
- public String getIndexType() {
- return indexType;
- }
-
- /**
- * @param indexType the indexType to set
- */
- public void setIndexType(String indexType) {
- this.indexType = indexType;
- }
-
- /**
- * @return the scrollContextTimeToLiveInMinutes
- */
- public int getScrollContextTimeToLiveInMinutes() {
- return scrollContextTimeToLiveInMinutes;
- }
-
- /**
- * @param scrollContextTimeToLiveInMinutes the scrollContextTimeToLiveInMinutes to set
- */
- public void setScrollContextTimeToLiveInMinutes(int scrollContextTimeToLiveInMinutes) {
- this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
- }
+ return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);
- /**
- * @return the numItemsToGetBulkRequest
- */
- public int getNumItemsToGetBulkRequest() {
- return numItemsToGetBulkRequest;
- }
-
- /**
- * @param numItemsToGetBulkRequest the numItemsToGetBulkRequest to set
- */
- public void setNumItemsToGetBulkRequest(int numItemsToGetBulkRequest) {
- this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
- }
-
- /**
- * @return the restDataProvider
- */
- public RestDataProvider getRestDataProvider() {
- return restDataProvider;
- }
-
- /**
- * @param restDataProvider the restDataProvider to set
- */
- public void setRestDataProvider(RestDataProvider restDataProvider) {
- this.restDataProvider = restDataProvider;
- }
-
- /**
- * @return the mapper
- */
- public ObjectMapper getMapper() {
- return mapper;
- }
-
- /**
- * @param mapper the mapper to set
- */
- public void setMapper(ObjectMapper mapper) {
- this.mapper = mapper;
- }
-
- /**
- * @return the log
- */
- public static Logger getLog() {
- return LOG;
- }
-
- /**
- * @return the bulkOpLineTemplate
- */
- public static String getBulkOpLineTemplate() {
- return BULK_OP_LINE_TEMPLATE;
- }
-
- /**
- * @return the timestampFormat
- */
- public static String getTimestampFormat() {
- return TIMESTAMP_FORMAT;
}
/*