import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
+import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
import org.onap.aai.sparky.dal.NetworkTransaction;
-import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
-import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
import org.onap.aai.sparky.dal.rest.HttpMethod;
import org.onap.aai.sparky.logging.AaiUiMsgs;
import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
return ae;
}
}
-
+
private static final Logger LOG =
LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
private Map<String, String> contextMap;
private String entityType;
private ElasticSearchSchemaConfig schemaConfig;
-
+ private OxmEntityLookup oxmEntityLookup;
+
/**
* Instantiates a new entity aggregation synchronizer.
*
*/
public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
- NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
- throws Exception {
-
+ NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
+ OxmEntityLookup oxmEntityLookup) throws Exception {
+
super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
- numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(), aaiStatConfig,
- esStatConfig); // multiple
- // Autosuggestion
- // Entity Synchronizer will
- // run for different indices
-
+ numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig);
+
+ this.oxmEntityLookup = oxmEntityLookup;
+
this.schemaConfig = schemaConfig;
this.entityType = entityType;
this.allWorkEnumerated = false;
this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
-
+
this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
-
+
this.aaiEntityStats.intializeEntityCounters(entityType);
this.esEntityStats.intializeEntityCounters(entityType);
-
+
this.contextMap = MDC.getCopyOfContextMap();
}
aaiWorkOnHand.decrementAndGet();
processEntityTypeSelfLinks(typeLinksResult);
} catch (Exception exc) {
- // TODO -> LOG, what should be logged here?
-
- exc.printStackTrace();
+ LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing execption while building working set. Error:"
+ + exc.getMessage());
}
return null;
return OperationState.OK;
}
-
-
+
+
/**
* Perform retry sync.
*/
/*
* In this retry flow the se object has already derived its fields
*/
- link = getElasticFullUrl("/" + ae.getId(), getIndexName());
+ link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
}
}
}
}
-
+
/**
* Perform document upsert.
*
*/
String link = null;
try {
- link = getElasticFullUrl("/" + ae.getId(), getIndexName());
+ link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
} catch (Exception exc) {
LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
return;
transactionTracker.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
+ supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
.whenComplete((result, error) -> {
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(
- new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
+ supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
}
}
-
+
/**
* Should allow retry.
*
return isRetryAllowed;
}
-
+
/**
* Process store document result.
*
}
}
}
-
+
/**
* Sync entity types.
*/
if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
- descriptor = OxmEntityLookup.getInstance().getEntityDescriptors()
- .get(linkDescriptor.getEntityType());
+ descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
if (descriptor == null) {
LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
}
}
-
+
/**
* Fetch document for upsert.
*
}
try {
- final String jsonResult = txn.getOperationResult().getResult();
- if (jsonResult != null && jsonResult.length() > 0) {
+ final String jsonResult = txn.getOperationResult().getResult();
+ if (jsonResult != null && jsonResult.length() > 0) {
- AggregationEntity ae = new AggregationEntity();
- ae.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
- populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
- ae.deriveFields();
+ AggregationEntity ae = new AggregationEntity();
+ ae.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
+ populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
+ ae.deriveFields();
- String link = null;
- try {
- link = getElasticFullUrl("/" + ae.getId(), getIndexName());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
- }
+ String link = null;
+ try {
+ link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
+ } catch (Exception exc) {
+ LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
+ }
- if (link != null) {
- NetworkTransaction n2 = new NetworkTransaction();
- n2.setLink(link);
- n2.setEntityType(txn.getEntityType());
- n2.setDescriptor(txn.getDescriptor());
- n2.setOperationType(HttpMethod.GET);
+ if (link != null) {
+ NetworkTransaction n2 = new NetworkTransaction();
+ n2.setLink(link);
+ n2.setEntityType(txn.getEntityType());
+ n2.setDescriptor(txn.getDescriptor());
+ n2.setOperationType(HttpMethod.GET);
- esWorkOnHand.incrementAndGet();
+ esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
- .whenComplete((result, error) -> {
+ supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
+ .whenComplete((result, error) -> {
- esWorkOnHand.decrementAndGet();
+ esWorkOnHand.decrementAndGet();
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, ae);
- }
- });
+ if (error != null) {
+ LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
+ } else {
+ updateElasticSearchCounters(result);
+ performDocumentUpsert(result, ae);
+ }
+ });
+ }
}
- }
-
- } catch (JsonProcessingException exc) {
- // TODO -> LOG, waht should be logged here?
- } catch (IOException exc) {
- // TODO -> LOG, waht should be logged here?
- }
- }
-
+ } catch (JsonProcessingException exc) {
+ LOG.error(AaiUiMsgs.ERROR_GENERIC,
+ "There was a JSON processing error fetching the elastic document for upsert. Error: "
+ + exc.getMessage());
+ } catch (IOException exc) {
+ LOG.error(AaiUiMsgs.ERROR_GENERIC,
+ "There was an IO error fetching the elastic document for upsert. Error: " + exc.getMessage());
+ }
+ }
+
+
/**
* Populate aggregation entity document.
*
Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
doc.copyAttributeKeyValuePair(map);
}
-
+
/**
* Process entity type self links.
*
private void processEntityTypeSelfLinks(OperationResult operationResult) {
JsonNode rootNode = null;
+
+ if ( operationResult == null ) {
+ return;
+ }
final String jsonResult = operationResult.getResult();
try {
rootNode = mapper.readTree(jsonResult);
} catch (IOException exc) {
- String message = "Could not deserialize JSON (representing operation result) as node tree. "
- + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
+ String message =
+ "Could not deserialize JSON (representing operation result) as node tree. " +
+ "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
}
if (resourceType != null && resourceLink != null) {
- descriptor = OxmEntityLookup.getInstance().getEntityDescriptors().get(resourceType);
+ descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
if (descriptor == null) {
LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
continue;
}
- selflinks.add(new SelfLinkDescriptor(resourceLink,
- SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
-
+ selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
+
}
}
syncStartedTimeStampInMs = System.currentTimeMillis();
String txnID = NodeUtils.getRandomTxnId();
MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
-
+
return collectAllTheWork();
}