Merge "Update Sparky README files"
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / aggregation / sync / AggregationSynchronizer.java
index 2a115db..3b02ff0 100644 (file)
@@ -42,9 +42,8 @@ import org.onap.aai.cl.mdc.MdcContext;
 import org.onap.aai.restclient.client.OperationResult;
 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
+import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
 import org.onap.aai.sparky.dal.NetworkTransaction;
-import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
-import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
 import org.onap.aai.sparky.dal.rest.HttpMethod;
 import org.onap.aai.sparky.logging.AaiUiMsgs;
 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
@@ -101,7 +100,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
       return ae;
     }
   }
-
+  
   private static final Logger LOG =
       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
@@ -116,7 +115,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
   private Map<String, String> contextMap;
   private String entityType;
   private ElasticSearchSchemaConfig schemaConfig;
-
+  private OxmEntityLookup oxmEntityLookup;
+  
   /**
    * Instantiates a new entity aggregation synchronizer.
    *
@@ -125,16 +125,14 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
    */
   public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
       int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
-      NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
-      throws Exception {
-
+      NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
+      OxmEntityLookup oxmEntityLookup) throws Exception {
+    
     super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
-        numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(), aaiStatConfig,
-        esStatConfig); // multiple
-    // Autosuggestion
-    // Entity Synchronizer will
-    // run for different indices
-
+        numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig); 
+    
+    this.oxmEntityLookup = oxmEntityLookup;
+    
     this.schemaConfig = schemaConfig;
     this.entityType = entityType;
     this.allWorkEnumerated = false;
@@ -146,12 +144,12 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
-
+    
     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
-
+    
     this.aaiEntityStats.intializeEntityCounters(entityType);
     this.esEntityStats.intializeEntityCounters(entityType);
-
+        
     this.contextMap = MDC.getCopyOfContextMap();
   }
 
@@ -178,9 +176,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
             aaiWorkOnHand.decrementAndGet();
             processEntityTypeSelfLinks(typeLinksResult);
           } catch (Exception exc) {
-            // TODO -> LOG, what should be logged here?
-
-            exc.printStackTrace();
+                 LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing execption while building working set.  Error:" 
+                       + exc.getMessage());
           }
 
           return null;
@@ -224,8 +221,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
     return OperationState.OK;
   }
-
-
+  
+  
   /**
    * Perform retry sync.
    */
@@ -243,7 +240,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
           /*
            * In this retry flow the se object has already derived its fields
            */
-          link = getElasticFullUrl("/" + ae.getId(), getIndexName());
+          link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
         } catch (Exception exc) {
           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
         }
@@ -277,7 +274,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
       }
     }
   }
-
+  
   /**
    * Perform document upsert.
    *
@@ -300,7 +297,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
      */
     String link = null;
     try {
-      link = getElasticFullUrl("/" + ae.getId(), getIndexName());
+      link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); 
     } catch (Exception exc) {
       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
       return;
@@ -373,7 +370,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
           transactionTracker.setOperationType(HttpMethod.PUT);
 
           esWorkOnHand.incrementAndGet();
-          supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
+          supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
                   .whenComplete((result, error) -> {
 
@@ -400,8 +397,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
           updateElasticTxn.setOperationType(HttpMethod.PUT);
 
           esWorkOnHand.incrementAndGet();
-          supplyAsync(
-              new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
+          supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
               esPutExecutor).whenComplete((result, error) -> {
 
                 esWorkOnHand.decrementAndGet();
@@ -423,7 +419,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
     }
   }
-
+  
   /**
    * Should allow retry.
    *
@@ -450,7 +446,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
     return isRetryAllowed;
   }
-
+  
   /**
    * Process store document result.
    *
@@ -485,7 +481,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
       }
     }
   }
-
+  
   /**
    * Sync entity types.
    */
@@ -500,8 +496,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
 
-        descriptor = OxmEntityLookup.getInstance().getEntityDescriptors()
-            .get(linkDescriptor.getEntityType());
+        descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
 
         if (descriptor == null) {
           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
@@ -539,7 +534,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
     }
 
   }
-
+  
   /**
    * Fetch document for upsert.
    *
@@ -554,53 +549,56 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
     }
 
     try {
-      final String jsonResult = txn.getOperationResult().getResult();
-      if (jsonResult != null && jsonResult.length() > 0) {
+        final String jsonResult = txn.getOperationResult().getResult();
+        if (jsonResult != null && jsonResult.length() > 0) {
 
-        AggregationEntity ae = new AggregationEntity();
-        ae.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
-        populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
-        ae.deriveFields();
+          AggregationEntity ae = new AggregationEntity();
+          ae.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
+          populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
+          ae.deriveFields();
 
-        String link = null;
-        try {
-          link = getElasticFullUrl("/" + ae.getId(), getIndexName());
-        } catch (Exception exc) {
-          LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
-        }
+          String link = null;
+          try {
+            link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
+          } catch (Exception exc) {
+            LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
+          }
 
-        if (link != null) {
-          NetworkTransaction n2 = new NetworkTransaction();
-          n2.setLink(link);
-          n2.setEntityType(txn.getEntityType());
-          n2.setDescriptor(txn.getDescriptor());
-          n2.setOperationType(HttpMethod.GET);
+          if (link != null) {
+            NetworkTransaction n2 = new NetworkTransaction();
+            n2.setLink(link);
+            n2.setEntityType(txn.getEntityType());
+            n2.setDescriptor(txn.getDescriptor());
+            n2.setOperationType(HttpMethod.GET);
 
-          esWorkOnHand.incrementAndGet();
+            esWorkOnHand.incrementAndGet();
 
-          supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
-              .whenComplete((result, error) -> {
+            supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
+                .whenComplete((result, error) -> {
 
-                esWorkOnHand.decrementAndGet();
+                  esWorkOnHand.decrementAndGet();
 
-                if (error != null) {
-                  LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
-                } else {
-                  updateElasticSearchCounters(result);
-                  performDocumentUpsert(result, ae);
-                }
-              });
+                  if (error != null) {
+                    LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
+                  } else {
+                    updateElasticSearchCounters(result);
+                    performDocumentUpsert(result, ae);
+                  }
+                });
+          }
         }
-      }
-
-    } catch (JsonProcessingException exc) {
-      // TODO -> LOG, waht should be logged here?
-    } catch (IOException exc) {
-      // TODO -> LOG, waht should be logged here?
-    }
-  }
-
 
+               } catch (JsonProcessingException exc) {
+                       LOG.error(AaiUiMsgs.ERROR_GENERIC,
+                                       "There was a JSON processing error fetching the elastic document for upsert.  Error: "
+                                                       + exc.getMessage());
+               } catch (IOException exc) {
+                       LOG.error(AaiUiMsgs.ERROR_GENERIC,
+                                       "There was an IO error fetching the elastic document for upsert.  Error: " + exc.getMessage());
+               }
+         }
+  
+  
   /**
    * Populate aggregation entity document.
    *
@@ -617,7 +615,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
     doc.copyAttributeKeyValuePair(map);
   }
-
+  
   /**
    * Process entity type self links.
    *
@@ -626,6 +624,10 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
   private void processEntityTypeSelfLinks(OperationResult operationResult) {
 
     JsonNode rootNode = null;
+    
+    if ( operationResult == null ) {
+       return;
+    }
 
     final String jsonResult = operationResult.getResult();
 
@@ -634,8 +636,9 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
       try {
         rootNode = mapper.readTree(jsonResult);
       } catch (IOException exc) {
-        String message = "Could not deserialize JSON (representing operation result) as node tree. "
-            + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
+        String message =
+            "Could not deserialize JSON (representing operation result) as node tree. " +
+            "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
       }
 
@@ -658,7 +661,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
           if (resourceType != null && resourceLink != null) {
 
-            descriptor = OxmEntityLookup.getInstance().getEntityDescriptors().get(resourceType);
+            descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
 
             if (descriptor == null) {
               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
@@ -666,9 +669,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
               continue;
             }
 
-            selflinks.add(new SelfLinkDescriptor(resourceLink,
-                SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
-
+            selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
+            
 
           }
         }
@@ -688,7 +690,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
     syncStartedTimeStampInMs = System.currentTimeMillis();
     String txnID = NodeUtils.getRandomTxnId();
     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
-
+    
     return collectAllTheWork();
   }