*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
-package org.onap.aai.sparky.synchronizer;
+package org.onap.aai.sparky.viewinspect.sync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
-import org.onap.aai.cl.mdc.MdcContext;
-
-import org.onap.aai.cl.mdc.MdcContext;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.cl.mdc.MdcContext;
+import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
+import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
+import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
+import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
import org.onap.aai.sparky.dal.rest.HttpMethod;
-import org.onap.aai.sparky.dal.rest.OperationResult;
import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.synchronizer.config.SynchronizerConfiguration;
-import org.onap.aai.sparky.synchronizer.entity.MergableEntity;
-import org.onap.aai.sparky.synchronizer.entity.SearchableEntity;
-import org.onap.aai.sparky.synchronizer.entity.SelfLinkDescriptor;
-import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
-import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
-import org.onap.aai.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
+import org.onap.aai.sparky.sync.IndexSynchronizer;
+import org.onap.aai.sparky.sync.SynchronizerConstants;
+import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
+import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
+import org.onap.aai.sparky.sync.entity.MergableEntity;
+import org.onap.aai.sparky.sync.entity.SearchableEntity;
+import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
+import org.onap.aai.sparky.sync.enumeration.OperationState;
+import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
+import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
import org.onap.aai.sparky.util.NodeUtils;
-import org.onap.aai.cl.api.Logger;
-import org.onap.aai.cl.eelf.LoggerFactory;
import org.slf4j.MDC;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
/**
* The Class SearchableEntitySynchronizer.
*/
-public class SearchableEntitySynchronizer extends AbstractEntitySynchronizer
+public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
implements IndexSynchronizer {
/**
}
private static final Logger LOG =
- LoggerFactory.getInstance().getLogger(SearchableEntitySynchronizer.class);
+ LoggerFactory.getInstance().getLogger(ViewInspectEntitySynchronizer.class);
private boolean allWorkEnumerated;
private Deque<SelfLinkDescriptor> selflinks;
* @param indexName the index name
* @throws Exception the exception
*/
- public SearchableEntitySynchronizer(String indexName) throws Exception {
- super(LOG, "SES", 2, 5, 5, indexName);
+ public ViewInspectEntitySynchronizer(ElasticSearchSchemaConfig schemaConfig,
+ int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
+ NetworkStatisticsConfig esStatConfig) throws Exception {
+ super(LOG, "SES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
+ aaiStatConfig, esStatConfig);
this.allWorkEnumerated = false;
this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
this.retryQueue = new ConcurrentLinkedDeque<RetrySearchableEntitySyncContainer>();
this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
this.synchronizerName = "Searchable Entity Synchronizer";
this.esPutExecutor = NodeUtils.createNamedExecutor("SES-ES-PUT", 5, LOG);
- this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
- oxmModelLoader.getSearchableEntityDescriptors());
- this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
- oxmModelLoader.getSearchableEntityDescriptors());
+ this.aaiEntityStats.intializeEntityCounters(
+ SearchableEntityLookup.getInstance().getSearchableEntityDescriptors().keySet());
+ this.esEntityStats.intializeEntityCounters(
+ SearchableEntityLookup.getInstance().getSearchableEntityDescriptors().keySet());
this.syncDurationInMs = -1;
}
*/
private OperationState collectAllTheWork() {
final Map<String, String> contextMap = MDC.getCopyOfContextMap();
- Map<String, OxmEntityDescriptor> descriptorMap =
- oxmModelLoader.getSearchableEntityDescriptors();
+ Map<String, SearchableOxmEntityDescriptor> descriptorMap =
+ SearchableEntityLookup.getInstance().getSearchableEntityDescriptors();
if (descriptorMap.isEmpty()) {
LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SEARCHABLE_ENTITIES);
MDC.setContextMap(contextMap);
OperationResult typeLinksResult = null;
try {
- typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
+ typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
aaiWorkOnHand.decrementAndGet();
processEntityTypeSelfLinks(typeLinksResult);
} catch (Exception exc) {
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
+ * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
*/
@Override
public OperationState doSync() {
final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
- OxmEntityDescriptor descriptor = null;
+ SearchableOxmEntityDescriptor descriptor = null;
if (resourceType != null && resourceLink != null) {
- descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
+ descriptor = SearchableEntityLookup.getInstance().getSearchableEntityDescriptors()
+ .get(resourceType);
if (descriptor == null) {
LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
if (descriptor.hasSearchableAttributes()) {
selflinks.add(new SelfLinkDescriptor(resourceLink,
- SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
+ SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
}
}
if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
- descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
+ descriptor = OxmEntityLookup.getInstance().getEntityDescriptors()
+ .get(linkDescriptor.getEntityType());
if (descriptor == null) {
LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
aaiWorkOnHand.incrementAndGet();
- supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
+ supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
.whenComplete((result, error) -> {
aaiWorkOnHand.decrementAndGet();
String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
ObjectReader updater = mapper.readerForUpdating(me);
- MergableEntity merged = updater.readValue(se.getIndexDocumentJson());
+ MergableEntity merged = updater.readValue(NodeUtils.convertObjectToJson(se, false));
jsonPayload = mapper.writeValueAsString(merged);
}
} catch (IOException exc) {
return;
}
} else {
- jsonPayload = se.getIndexDocumentJson();
+ jsonPayload = se.getAsJson();
}
if (wasEntryDiscovered) {
if (versionNumber != null && jsonPayload != null) {
- String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
- ElasticSearchConfig.getConfig().getType(), se.getId(), versionNumber, jsonPayload);
+ String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(
+ getIndexName(), ElasticSearchConfig.getConfig().getType(), se.getId(), versionNumber,
+ jsonPayload);
NetworkTransaction transactionTracker = new NetworkTransaction();
transactionTracker.setEntityType(esGetTxn.getEntityType());
esWorkOnHand.incrementAndGet();
supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
- requestPayload, esDataProvider, transactionTracker), esPutExecutor)
+ requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
.whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
}
} else {
+
if (link != null && jsonPayload != null) {
NetworkTransaction updateElasticTxn = new NetworkTransaction();
updateElasticTxn.setOperationType(HttpMethod.PUT);
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
+ supplyAsync(
+ new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
esPutExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
List<String> primaryKeyValues = new ArrayList<String>();
String pkeyValue = null;
- for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
+ SearchableOxmEntityDescriptor searchableDescriptor = SearchableEntityLookup.getInstance()
+ .getSearchableEntityDescriptors().get(resultDescriptor.getEntityName());
+
+ for (String keyName : searchableDescriptor.getPrimaryKeyAttributeNames()) {
pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
if (pkeyValue != null) {
primaryKeyValues.add(pkeyValue);
final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
- final List<String> searchTagFields = resultDescriptor.getSearchableAttributes();
+ final List<String> searchTagFields = searchableDescriptor.getSearchableAttributes();
/*
* Based on configuration, use the configured field names for this entity-Type to build a
return;
}
+ SearchableOxmEntityDescriptor searchableDescriptor = SearchableEntityLookup.getInstance()
+ .getSearchableEntityDescriptors().get(txn.getDescriptor().getEntityName());
+
try {
- if (txn.getDescriptor().hasSearchableAttributes()) {
+ if (searchableDescriptor.hasSearchableAttributes()) {
final String jsonResult = txn.getOperationResult().getResult();
if (jsonResult != null && jsonResult.length() > 0) {
- SearchableEntity se = new SearchableEntity(oxmModelLoader);
+ SearchableEntity se = new SearchableEntity();
se.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
populateSearchableEntityDocument(se, jsonResult, txn.getDescriptor());
se.deriveFields();
esWorkOnHand.incrementAndGet();
- supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
+ supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
.whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
* called incrementAndGet when queuing the failed PUT!
*/
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
+ supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
esExecutor).whenComplete((result, error) -> {
esWorkOnHand.decrementAndGet();
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
+ * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
*/
@Override
public String getStatReport(boolean showFinalReport) {
/*
* (non-Javadoc)
*
- * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
+ * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
*/
@Override
public void shutdown() {