Remove duplicated code
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / sync / AbstractEntitySynchronizer.java
index 52de8e4..444eafb 100644 (file)
@@ -23,8 +23,10 @@ package org.onap.aai.sparky.sync;
 import java.util.EnumSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.cl.mdc.MdcContext;
 import org.onap.aai.restclient.client.OperationResult;
 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
@@ -37,10 +39,13 @@ import org.onap.aai.sparky.dal.rest.HttpMethod;
 import org.onap.aai.sparky.dal.rest.RestOperationalStatistics;
 import org.onap.aai.sparky.logging.AaiUiMsgs;
 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
 import org.onap.aai.sparky.util.NodeUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+
 /**
  * The Class AbstractEntitySynchronizer.
  *
@@ -48,6 +53,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
  */
 public abstract class AbstractEntitySynchronizer {
 
+  protected static final Logger LOG =
+          LoggerFactory.getInstance().getLogger(AbstractEntitySynchronizer.class);
   protected static final int VERSION_CONFLICT_EXCEPTION_CODE = 409;
   protected static final Integer RETRY_COUNT_PER_ENTITY_LIMIT = new Integer(3);
 
@@ -517,4 +524,43 @@ public abstract class AbstractEntitySynchronizer {
     esEntityStats.reset();
   }
 
+
+  protected void performRetrySync(String id, Consumer<NetworkTransaction> networkTransactionConsumer, NetworkTransaction txn) {
+    String link = null;
+    try {
+      /*
+       * In this retry flow the se object has already derived its fields
+       */
+      link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), id);
+    } catch (Exception exc) {
+      LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
+    }
+
+    if (link != null) {
+      NetworkTransaction retryTransaction = new NetworkTransaction();
+      retryTransaction.setLink(link);
+      retryTransaction.setEntityType(txn.getEntityType());
+      retryTransaction.setDescriptor(txn.getDescriptor());
+      retryTransaction.setOperationType(HttpMethod.GET);
+
+      /*
+       * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
+       * called incrementAndGet when queuing the failed PUT!
+       */
+
+      supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
+              esExecutor).whenComplete((result, error) -> {
+
+        esWorkOnHand.decrementAndGet();
+
+        if (error != null) {
+          LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
+        } else {
+          updateElasticSearchCounters(result);
+          networkTransactionConsumer.accept(result);
+        }
+      });
+    }
+  }
+
 }