update sparky with configurable features
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / viewinspect / sync / ViewInspectEntitySynchronizer.java
index 285a76b..4877ad0 100644 (file)
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import org.onap.aai.cl.api.Logger;
@@ -57,9 +58,9 @@ import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
 import org.onap.aai.sparky.sync.enumeration.OperationState;
 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
 import org.onap.aai.sparky.util.NodeUtils;
 import org.slf4j.MDC;
 
@@ -332,7 +333,7 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
 
         aaiWorkOnHand.incrementAndGet();
 
-        supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
+        supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter,"sync"), aaiExecutor)
             .whenComplete((result, error) -> {
 
               aaiWorkOnHand.decrementAndGet();
@@ -377,7 +378,7 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
      */
     String link = null;
     try {
-      link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
+      link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId());
     } catch (Exception exc) {
       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
       return;
@@ -440,8 +441,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
       if (wasEntryDiscovered) {
         if (versionNumber != null && jsonPayload != null) {
 
-          String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(),
-              "default", se.getId(), versionNumber, jsonPayload);
+               String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(),
+                    se.getId(), jsonPayload);
 
           NetworkTransaction transactionTracker = new NetworkTransaction();
           transactionTracker.setEntityType(esGetTxn.getEntityType());
@@ -449,9 +450,9 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
           transactionTracker.setOperationType(HttpMethod.PUT);
 
           esWorkOnHand.incrementAndGet();
-          supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
-              requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
-                  .whenComplete((result, error) -> {
+          supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
+                  requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
+                      .whenComplete((result, error) -> {
 
                     esWorkOnHand.decrementAndGet();
 
@@ -477,8 +478,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
           updateElasticTxn.setOperationType(HttpMethod.PUT);
 
           esWorkOnHand.incrementAndGet();
-          supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
-              esPutExecutor).whenComplete((result, error) -> {
+          supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
+                  esPutExecutor).whenComplete((result, error) -> {
 
                 esWorkOnHand.decrementAndGet();
 
@@ -577,7 +578,7 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
 
           String link = null;
           try {
-            link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
+            link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId());
           } catch (Exception exc) {
             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
           }
@@ -591,8 +592,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
 
             esWorkOnHand.incrementAndGet();
 
-            supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
-                .whenComplete((result, error) -> {
+            supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor)
+            .whenComplete((result, error) -> {
 
                   esWorkOnHand.decrementAndGet();
 
@@ -661,41 +662,8 @@ public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
         SearchableEntity se = rsc.getSearchableEntity();
         NetworkTransaction txn = rsc.getNetworkTransaction();
 
-        String link = null;
-        try {
-          /*
-           * In this retry flow the se object has already derived its fields
-           */
-          link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
-        } catch (Exception exc) {
-          LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
-        }
-
-        if (link != null) {
-          NetworkTransaction retryTransaction = new NetworkTransaction();
-          retryTransaction.setLink(link);
-          retryTransaction.setEntityType(txn.getEntityType());
-          retryTransaction.setDescriptor(txn.getDescriptor());
-          retryTransaction.setOperationType(HttpMethod.GET);
-
-          /*
-           * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
-           * called incrementAndGet when queuing the failed PUT!
-           */
-
-          supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
-              esExecutor).whenComplete((result, error) -> {
-
-                esWorkOnHand.decrementAndGet();
-
-                if (error != null) {
-                  LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
-                } else {
-                  updateElasticSearchCounters(result);
-                  performDocumentUpsert(result, se);
-                }
-              });
-        }
+        final Consumer<NetworkTransaction> networkTransactionConsumer = (result) ->  performDocumentUpsert(result, se);
+        performRetrySync(se.getId(), networkTransactionConsumer, txn);
 
       }
     }