import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
-import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.onap.aai.sparky.dal.rest.HttpMethod;
import org.onap.aai.sparky.sync.enumeration.OperationState;
import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
import org.onap.aai.sparky.util.NodeUtils;
import org.slf4j.MDC;
/**
* Instantiates a new cross entity reference synchronizer.
*
- * @param indexName the index name
* @throws Exception the exception
*/
public CrossEntityReferenceSynchronizer(ElasticSearchSchemaConfig schemaConfig,
aaiWorkOnHand.incrementAndGet();
- supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
+ supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter,"sync"), aaiExecutor)
.whenComplete((result, error) -> {
aaiWorkOnHand.decrementAndGet();
*/
private void processEntityTypeSelfLinks(OperationResult operationResult) {
- JsonNode rootNode = null;
-
final String jsonResult = operationResult.getResult();
if (jsonResult != null && jsonResult.length() > 0) {
try {
- rootNode = mapper.readTree(jsonResult);
- } catch (IOException exc) {
- // TODO // TODO -> LOG, waht should be logged here?
- }
+ JsonNode rootNode = mapper.readTree(jsonResult);
+ JsonNode resultData = rootNode.get("result-data");
- JsonNode resultData = rootNode.get("result-data");
- ArrayNode resultDataArrayNode = null;
+ if (resultData.isArray()) {
+ ArrayNode resultDataArrayNode = (ArrayNode) resultData;
- if (resultData.isArray()) {
- resultDataArrayNode = (ArrayNode) resultData;
+ Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
- Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
- JsonNode element = null;
+ while (elementIterator.hasNext()) {
+ JsonNode element = elementIterator.next();
- while (elementIterator.hasNext()) {
- element = elementIterator.next();
+ final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
+ final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
- final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
- final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
+ if (resourceType != null && resourceLink != null) {
+ CrossEntityReferenceDescriptor descriptor = crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().get(resourceType);
- CrossEntityReferenceDescriptor descriptor = null;
-
- if (resourceType != null && resourceLink != null) {
- descriptor = crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().get(resourceType);
-
- if (descriptor == null) {
- LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
- // go to next element in iterator
- continue;
- }
- if (descriptor.hasCrossEntityReferences()) {
- selflinks.add(new SelfLinkDescriptor(
- resourceLink,SynchronizerConstants.DEPTH_ALL_MODIFIER, resourceType));
+ if (descriptor == null) {
+ LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
+ // go to next element in iterator
+ continue;
+ }
+ if (descriptor.hasCrossEntityReferences()) {
+ selflinks.add(new SelfLinkDescriptor(
+ resourceLink, SynchronizerConstants.DEPTH_ALL_MODIFIER, resourceType));
+ }
}
}
}
+ } catch (IOException exc) {
+ // TODO // TODO -> LOG, waht should be logged here?
}
}
}
OperationResult aaiQueryResult = aaiAdapter.queryActiveInventoryWithRetries(
genericQueryStr, "application/json",
- aaiAdapter.getEndpointConfig().getNumRequestRetries());
+ aaiAdapter.getEndpointConfig().getNumRequestRetries(),"sync");
aaiWorkOnHand.decrementAndGet();
String link = null;
try {
- link = elasticSearchAdapter
- .buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
+ link = searchServiceAdapter
+ .buildSearchServiceDocUrl(getIndexName(), icer.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY,
exc.getLocalizedMessage());
esWorkOnHand.incrementAndGet();
supplyAsync(
- new PerformElasticSearchRetrieval(n2, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
+ new PerformSearchServiceRetrieval(n2, searchServiceAdapter),
+ esExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
}
} else {
- String message = "Entity sync failed because AAI query failed with error " + aaiQueryResult.getResult();
+ String result = aaiQueryResult != null ? aaiQueryResult.getResult() : "unknown";
+ String message = "Entity sync failed because AAI query failed with error " + result;
LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
}
*/
String link = null;
try {
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
+ link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), icer.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
return;
if (wasEntryDiscovered) {
if (versionNumber != null && jsonPayload != null) {
- String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(),
- "default", icer.getId(), versionNumber, jsonPayload);
+ String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(),
+ icer.getId(), jsonPayload);
NetworkTransaction transactionTracker = new NetworkTransaction();
transactionTracker.setEntityType(esGetResult.getEntityType());
transactionTracker.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
- requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
+ requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
+ .whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
- esPutExecutor).whenComplete((result, error) -> {
+ supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
+ esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
NetworkTransaction txn = rsc.getNetworkTransaction();
- String link = null;
- try {
- // In this retry flow the icer object has already
- // derived its fields
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
- * that for this request already when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, icer);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result,icer);
+ performRetrySync(icer.getId(), networkTransactionConsumer, txn);
}
}