*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
-package org.onap.aai.sparky.synchronizer;
+package org.onap.aai.sparky.aggregation.sync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Deque;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
-import javax.json.Json;
-
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.cl.mdc.MdcContext;
+import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
+import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
import org.onap.aai.sparky.dal.rest.HttpMethod;
-import org.onap.aai.sparky.dal.rest.OperationResult;
import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.synchronizer.config.SynchronizerConfiguration;
-import org.onap.aai.sparky.synchronizer.entity.AggregationEntity;
-import org.onap.aai.sparky.synchronizer.entity.MergableEntity;
-import org.onap.aai.sparky.synchronizer.entity.SelfLinkDescriptor;
-import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
-import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
-import org.onap.aai.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
+import org.onap.aai.sparky.sync.IndexSynchronizer;
+import org.onap.aai.sparky.sync.SynchronizerConstants;
+import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
+import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
+import org.onap.aai.sparky.sync.entity.AggregationEntity;
+import org.onap.aai.sparky.sync.entity.MergableEntity;
+import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
+import org.onap.aai.sparky.sync.enumeration.OperationState;
+import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
+import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
import org.onap.aai.sparky.util.NodeUtils;
-import org.onap.aai.cl.api.Logger;
-import org.onap.aai.cl.eelf.LoggerFactory;
import org.slf4j.MDC;
-import org.onap.aai.cl.mdc.MdcContext;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
private boolean syncInProgress;
private Map<String, String> contextMap;
private String entityType;
+ private ElasticSearchSchemaConfig schemaConfig;
/**
* Instantiates a new entity aggregation synchronizer.
* @param indexName the index name
* @throws Exception the exception
*/
- public AggregationSynchronizer(String entityType, String indexName) throws Exception {
- super(LOG, "AGGES-" + indexName.toUpperCase(), 2, 5, 5, indexName); // multiple Autosuggestion
- // Entity Synchronizer will
- // run for different indices
-
+ public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
+ int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
+ NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
+ throws Exception {
+
+ super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
+ numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(), aaiStatConfig,
+ esStatConfig); // multiple
+ // Autosuggestion
+ // Entity Synchronizer will
+ // run for different indices
+
+ this.schemaConfig = schemaConfig;
this.entityType = entityType;
this.allWorkEnumerated = false;
this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
- Map<String, OxmEntityDescriptor> descriptor = new HashMap<String, OxmEntityDescriptor>();
- descriptor.put(entityType, oxmModelLoader.getEntityDescriptors().get(entityType));
- this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(descriptor);
- this.esEntityStats.initializeCountersFromOxmEntityDescriptors(descriptor);
+
+ this.aaiEntityStats.intializeEntityCounters(entityType);
+ this.esEntityStats.intializeEntityCounters(entityType);
+
this.contextMap = MDC.getCopyOfContextMap();
}
MDC.setContextMap(contextMap);
OperationResult typeLinksResult = null;
try {
- typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(entity);
+ typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entity);
aaiWorkOnHand.decrementAndGet();
processEntityTypeSelfLinks(typeLinksResult);
} catch (Exception exc) {
// TODO -> LOG, what should be logged here?
+
+ exc.printStackTrace();
}
return null;
* called incrementAndGet when queuing the failed PUT!
*/
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
+ supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
esExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
ObjectReader updater = mapper.readerForUpdating(me);
- MergableEntity merged = updater.readValue(ae.getIndexDocumentJson());
+ MergableEntity merged = updater.readValue(ae.getAsJson());
jsonPayload = mapper.writeValueAsString(merged);
}
} catch (IOException exc) {
return;
}
} else {
- jsonPayload = ae.getIndexDocumentJson();
+ jsonPayload = ae.getAsJson();
}
if (wasEntryDiscovered) {
if (versionNumber != null && jsonPayload != null) {
- String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
- ElasticSearchConfig.getConfig().getType(), ae.getId(), versionNumber, jsonPayload);
+ String requestPayload =
+ elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
+ schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload);
NetworkTransaction transactionTracker = new NetworkTransaction();
transactionTracker.setEntityType(esGetTxn.getEntityType());
esWorkOnHand.incrementAndGet();
supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
- requestPayload, esDataProvider, transactionTracker), esPutExecutor)
+ requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
.whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
+ supplyAsync(
+ new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
- descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
+ descriptor = OxmEntityLookup.getInstance().getEntityDescriptors()
+ .get(linkDescriptor.getEntityType());
if (descriptor == null) {
LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
aaiWorkOnHand.incrementAndGet();
- supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
+ supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
.whenComplete((result, error) -> {
aaiWorkOnHand.decrementAndGet();
final String jsonResult = txn.getOperationResult().getResult();
if (jsonResult != null && jsonResult.length() > 0) {
- AggregationEntity ae = new AggregationEntity(oxmModelLoader);
+ AggregationEntity ae = new AggregationEntity();
ae.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
ae.deriveFields();
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
+ supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
.whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
String message = "Could not deserialize JSON (representing operation result) as node tree. "
+ "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
- return;
}
JsonNode resultData = rootNode.get("result-data");
if (resourceType != null && resourceLink != null) {
- descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
+ descriptor = OxmEntityLookup.getInstance().getEntityDescriptors().get(resourceType);
if (descriptor == null) {
LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
}
selflinks.add(new SelfLinkDescriptor(resourceLink,
- SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
+ SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
}
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
+ * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
*/
@Override
public OperationState doSync() {
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
+ * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
*/
@Override
public String getStatReport(boolean showFinalReport) {
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
+ * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
*/
@Override
public void shutdown() {
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
+ * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
*/
@Override
public void clearCache() {