update sparky with configurable features
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / crossentityreference / sync / CrossEntityReferenceSynchronizer.java
index d95e507..9481173 100644 (file)
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import org.onap.aai.cl.api.Logger;
@@ -44,7 +45,6 @@ import org.onap.aai.sparky.config.oxm.CrossEntityReferenceLookup;
 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
-import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
 import org.onap.aai.sparky.dal.NetworkTransaction;
 import org.onap.aai.sparky.dal.rest.HttpMethod;
@@ -60,9 +60,9 @@ import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
 import org.onap.aai.sparky.sync.enumeration.OperationState;
 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
+import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
 import org.onap.aai.sparky.util.NodeUtils;
 import org.slf4j.MDC;
 
@@ -123,7 +123,6 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
   /**
    * Instantiates a new cross entity reference synchronizer.
    *
-   * @param indexName the index name
    * @throws Exception the exception
    */
   public CrossEntityReferenceSynchronizer(ElasticSearchSchemaConfig schemaConfig,
@@ -321,7 +320,7 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
 
           aaiWorkOnHand.incrementAndGet();
 
-          supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
+          supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter,"sync"), aaiExecutor)
               .whenComplete((result, error) -> {
 
                 aaiWorkOnHand.decrementAndGet();
@@ -349,49 +348,42 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
    */
   private void processEntityTypeSelfLinks(OperationResult operationResult) {
 
-    JsonNode rootNode = null;
-
     final String jsonResult = operationResult.getResult();
 
     if (jsonResult != null && jsonResult.length() > 0) {
 
       try {
-        rootNode = mapper.readTree(jsonResult);
-      } catch (IOException exc) {
-        // TODO // TODO -> LOG, waht should be logged here?
-      }
+        JsonNode rootNode = mapper.readTree(jsonResult);
+        JsonNode resultData = rootNode.get("result-data");
 
-      JsonNode resultData = rootNode.get("result-data");
-      ArrayNode resultDataArrayNode = null;
+        if (resultData.isArray()) {
+          ArrayNode resultDataArrayNode = (ArrayNode) resultData;
 
-      if (resultData.isArray()) {
-        resultDataArrayNode = (ArrayNode) resultData;
+          Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
 
-        Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
-        JsonNode element = null;
+          while (elementIterator.hasNext()) {
+            JsonNode element = elementIterator.next();
 
-        while (elementIterator.hasNext()) {
-          element = elementIterator.next();
+            final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
+            final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
 
-          final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
-          final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
+            if (resourceType != null && resourceLink != null) {
+              CrossEntityReferenceDescriptor descriptor = crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().get(resourceType);
 
-          CrossEntityReferenceDescriptor descriptor = null;
-
-          if (resourceType != null && resourceLink != null) {
-            descriptor = crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().get(resourceType);
-
-            if (descriptor == null) {
-              LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
-              // go to next element in iterator
-              continue;
-            }
-            if (descriptor.hasCrossEntityReferences()) {
-              selflinks.add(new SelfLinkDescriptor(
-                  resourceLink,SynchronizerConstants.DEPTH_ALL_MODIFIER, resourceType));
+              if (descriptor == null) {
+                LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
+                // go to next element in iterator
+                continue;
+              }
+              if (descriptor.hasCrossEntityReferences()) {
+                selflinks.add(new SelfLinkDescriptor(
+                        resourceLink, SynchronizerConstants.DEPTH_ALL_MODIFIER, resourceType));
+              }
             }
           }
         }
+      } catch (IOException exc) {
+        // TODO // TODO -> LOG, waht should be logged here?
       }
     }
   }
@@ -542,7 +534,7 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
 
                       OperationResult aaiQueryResult = aaiAdapter.queryActiveInventoryWithRetries(
                           genericQueryStr, "application/json",
-                          aaiAdapter.getEndpointConfig().getNumRequestRetries());
+                          aaiAdapter.getEndpointConfig().getNumRequestRetries(),"sync");
 
                       aaiWorkOnHand.decrementAndGet();
 
@@ -583,8 +575,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
 
                             String link = null;
                             try {
-                              link = elasticSearchAdapter
-                                  .buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
+                              link = searchServiceAdapter
+                                  .buildSearchServiceDocUrl(getIndexName(), icer.getId());
                             } catch (Exception exc) {
                               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY,
                                   exc.getLocalizedMessage());
@@ -600,8 +592,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
                               esWorkOnHand.incrementAndGet();
 
                               supplyAsync(
-                                  new PerformElasticSearchRetrieval(n2, elasticSearchAdapter),
-                                  esExecutor).whenComplete((result, error) -> {
+                                      new PerformSearchServiceRetrieval(n2, searchServiceAdapter),
+                                      esExecutor).whenComplete((result, error) -> {
 
                                     esWorkOnHand.decrementAndGet();
 
@@ -625,7 +617,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
                         }
                         
                       } else {
-                        String message = "Entity sync failed because AAI query failed with error " + aaiQueryResult.getResult(); 
+                        String result = aaiQueryResult != null ? aaiQueryResult.getResult() : "unknown";
+                        String message = "Entity sync failed because AAI query failed with error " + result;
                         LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
                       }
                       
@@ -676,7 +669,7 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
      */
     String link = null;
     try {
-      link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
+      link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), icer.getId());
     } catch (Exception exc) {
       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
       return;
@@ -735,8 +728,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
       if (wasEntryDiscovered) {
         if (versionNumber != null && jsonPayload != null) {
 
-          String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(),
-              "default", icer.getId(), versionNumber, jsonPayload);
+               String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(),
+                    icer.getId(), jsonPayload);
 
           NetworkTransaction transactionTracker = new NetworkTransaction();
           transactionTracker.setEntityType(esGetResult.getEntityType());
@@ -744,9 +737,9 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
           transactionTracker.setOperationType(HttpMethod.PUT);
 
           esWorkOnHand.incrementAndGet();
-          supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
-              requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
-                  .whenComplete((result, error) -> {
+          supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
+                  requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
+                      .whenComplete((result, error) -> {
 
                     esWorkOnHand.decrementAndGet();
 
@@ -769,8 +762,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
           updateElasticTxn.setOperationType(HttpMethod.PUT);
 
           esWorkOnHand.incrementAndGet();
-          supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
-              esPutExecutor).whenComplete((result, error) -> {
+          supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
+                  esPutExecutor).whenComplete((result, error) -> {
 
                 esWorkOnHand.decrementAndGet();
 
@@ -831,40 +824,8 @@ public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
         IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
         NetworkTransaction txn = rsc.getNetworkTransaction();
 
-        String link = null;
-        try {
-          // In this retry flow the icer object has already
-          // derived its fields
-          link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
-        } catch (Exception exc) {
-          LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
-        }
-
-        if (link != null) {
-          NetworkTransaction retryTransaction = new NetworkTransaction();
-          retryTransaction.setLink(link);
-          retryTransaction.setEntityType(txn.getEntityType());
-          retryTransaction.setDescriptor(txn.getDescriptor());
-          retryTransaction.setOperationType(HttpMethod.GET);
-
-          /*
-           * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
-           * that for this request already when queuing the failed PUT!
-           */
-
-          supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
-              esExecutor).whenComplete((result, error) -> {
-
-                esWorkOnHand.decrementAndGet();
-
-                if (error != null) {
-                  LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
-                } else {
-                  updateElasticSearchCounters(result);
-                  performDocumentUpsert(result, icer);
-                }
-              });
-        }
+        final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result,icer);
+        performRetrySync(icer.getId(), networkTransactionConsumer, txn);
 
       }
     }