import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
AggregationEntity ae = rsc.getAggregationEntity();
NetworkTransaction txn = rsc.getNetworkTransaction();
- String link = null;
- try {
- /*
- * In this retry flow the se object has already derived its fields
- */
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
- * called incrementAndGet when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, ae);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, ae);
+ performRetrySync(ae.getId(), networkTransactionConsumer, txn);
}
}
}
-
+
+
/**
* Perform document upsert.
*
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
NetworkTransaction txn = susc.getNetworkTransaction();
- String link = null;
- try {
- /*
- * In this retry flow the se object has already derived its fields
- */
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
- * called incrementAndGet when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, sus);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, sus);
+ performRetrySync(sus.getId(), networkTransactionConsumer, txn);
}
}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
-import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
import org.onap.aai.sparky.dal.NetworkTransaction;
import org.onap.aai.sparky.dal.rest.HttpMethod;
IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
NetworkTransaction txn = rsc.getNetworkTransaction();
- String link = null;
- try {
- // In this retry flow the icer object has already
- // derived its fields
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
- * that for this request already when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, icer);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result,icer);
+ performRetrySync(icer.getId(), networkTransactionConsumer, txn);
}
}
import java.util.EnumSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
import org.onap.aai.sparky.logging.AaiUiMsgs;
import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
import org.onap.aai.sparky.util.NodeUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+
/**
* The Class AbstractEntitySynchronizer.
*
*/
public abstract class AbstractEntitySynchronizer {
+ protected static final Logger LOG =
+ LoggerFactory.getInstance().getLogger(AbstractEntitySynchronizer.class);
protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
esEntityStats.reset();
}
+
+ protected void performRetrySync(String id, Consumer<NetworkTransaction> networkTransactionConsumer, NetworkTransaction txn) {
+ String link = null;
+ try {
+ /*
+ * In this retry flow the se object has already derived its fields
+ */
+ link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id);
+ } catch (Exception exc) {
+ LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
+ }
+
+ if (link != null) {
+ NetworkTransaction retryTransaction = new NetworkTransaction();
+ retryTransaction.setLink(link);
+ retryTransaction.setEntityType(txn.getEntityType());
+ retryTransaction.setDescriptor(txn.getDescriptor());
+ retryTransaction.setOperationType(HttpMethod.GET);
+
+ /*
+ * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
+ * called incrementAndGet when queuing the failed PUT!
+ */
+
+ supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
+ esExecutor).whenComplete((result, error) -> {
+
+ esWorkOnHand.decrementAndGet();
+
+ if (error != null) {
+ LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
+ } else {
+ updateElasticSearchCounters(result);
+ networkTransactionConsumer.accept(result);
+ }
+ });
+ }
+ }
+
}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onap.aai.cl.api.Logger;
SearchableEntity se = rsc.getSearchableEntity();
NetworkTransaction txn = rsc.getNetworkTransaction();
- String link = null;
- try {
- /*
- * In this retry flow the se object has already derived its fields
- */
- link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
- } catch (Exception exc) {
- LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
- }
-
- if (link != null) {
- NetworkTransaction retryTransaction = new NetworkTransaction();
- retryTransaction.setLink(link);
- retryTransaction.setEntityType(txn.getEntityType());
- retryTransaction.setDescriptor(txn.getDescriptor());
- retryTransaction.setOperationType(HttpMethod.GET);
-
- /*
- * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
- * called incrementAndGet when queuing the failed PUT!
- */
-
- supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
- esExecutor).whenComplete((result, error) -> {
-
- esWorkOnHand.decrementAndGet();
-
- if (error != null) {
- LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
- } else {
- updateElasticSearchCounters(result);
- performDocumentUpsert(result, se);
- }
- });
- }
+ final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, se);
+ performRetrySync(se.getId(), networkTransactionConsumer, txn);
}
}