Adding UI extensibility
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / aggregation / sync / AggregationSynchronizer.java
  *
  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
  */
-package org.onap.aai.sparky.synchronizer;
+package org.onap.aai.sparky.aggregation.sync;
 
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Deque;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
-import javax.json.Json;
-
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.cl.mdc.MdcContext;
+import org.onap.aai.restclient.client.OperationResult;
 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
+import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
 import org.onap.aai.sparky.dal.NetworkTransaction;
 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
 import org.onap.aai.sparky.dal.rest.HttpMethod;
-import org.onap.aai.sparky.dal.rest.OperationResult;
 import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.synchronizer.config.SynchronizerConfiguration;
-import org.onap.aai.sparky.synchronizer.entity.AggregationEntity;
-import org.onap.aai.sparky.synchronizer.entity.MergableEntity;
-import org.onap.aai.sparky.synchronizer.entity.SelfLinkDescriptor;
-import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
-import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
-import org.onap.aai.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchPut;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchRetrieval;
-import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchUpdate;
+import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
+import org.onap.aai.sparky.sync.IndexSynchronizer;
+import org.onap.aai.sparky.sync.SynchronizerConstants;
+import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
+import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
+import org.onap.aai.sparky.sync.entity.AggregationEntity;
+import org.onap.aai.sparky.sync.entity.MergableEntity;
+import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
+import org.onap.aai.sparky.sync.enumeration.OperationState;
+import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
+import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
+import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
 import org.onap.aai.sparky.util.NodeUtils;
-import org.onap.aai.cl.api.Logger;
-import org.onap.aai.cl.eelf.LoggerFactory;
 import org.slf4j.MDC;
 
-import org.onap.aai.cl.mdc.MdcContext;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectReader;
@@ -121,6 +115,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
   private boolean syncInProgress;
   private Map<String, String> contextMap;
   private String entityType;
+  private ElasticSearchSchemaConfig schemaConfig;
 
   /**
    * Instantiates a new entity aggregation synchronizer.
@@ -128,11 +123,19 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
    * @param indexName the index name
    * @throws Exception the exception
    */
-  public AggregationSynchronizer(String entityType, String indexName) throws Exception {
-    super(LOG, "AGGES-" + indexName.toUpperCase(), 2, 5, 5, indexName); // multiple Autosuggestion
-                                                                        // Entity Synchronizer will
-                                                                        // run for different indices
-
+  public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
+      int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
+      NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig)
+      throws Exception {
+
+    super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
+        numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(), aaiStatConfig,
+        esStatConfig); // multiple
+    // Autosuggestion
+    // Entity Synchronizer will
+    // run for different indices
+
+    this.schemaConfig = schemaConfig;
     this.entityType = entityType;
     this.allWorkEnumerated = false;
     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
@@ -145,10 +148,10 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
 
     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
-    Map<String, OxmEntityDescriptor> descriptor = new HashMap<String, OxmEntityDescriptor>();
-    descriptor.put(entityType, oxmModelLoader.getEntityDescriptors().get(entityType));
-    this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(descriptor);
-    this.esEntityStats.initializeCountersFromOxmEntityDescriptors(descriptor);
+
+    this.aaiEntityStats.intializeEntityCounters(entityType);
+    this.esEntityStats.intializeEntityCounters(entityType);
+
     this.contextMap = MDC.getCopyOfContextMap();
   }
 
@@ -171,11 +174,13 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
           MDC.setContextMap(contextMap);
           OperationResult typeLinksResult = null;
           try {
-            typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(entity);
+            typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entity);
             aaiWorkOnHand.decrementAndGet();
             processEntityTypeSelfLinks(typeLinksResult);
           } catch (Exception exc) {
             // TODO -> LOG, what should be logged here?
+
+            exc.printStackTrace();
           }
 
           return null;
@@ -255,7 +260,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
            * called incrementAndGet when queuing the failed PUT!
            */
 
-          supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
+          supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
               esExecutor).whenComplete((result, error) -> {
 
                 esWorkOnHand.decrementAndGet();
@@ -341,7 +346,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
             ObjectReader updater = mapper.readerForUpdating(me);
-            MergableEntity merged = updater.readValue(ae.getIndexDocumentJson());
+            MergableEntity merged = updater.readValue(ae.getAsJson());
             jsonPayload = mapper.writeValueAsString(merged);
           }
         } catch (IOException exc) {
@@ -352,14 +357,15 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
           return;
         }
       } else {
-        jsonPayload = ae.getIndexDocumentJson();
+        jsonPayload = ae.getAsJson();
       }
 
       if (wasEntryDiscovered) {
         if (versionNumber != null && jsonPayload != null) {
 
-          String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
-              ElasticSearchConfig.getConfig().getType(), ae.getId(), versionNumber, jsonPayload);
+          String requestPayload =
+              elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
+                  schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload);
 
           NetworkTransaction transactionTracker = new NetworkTransaction();
           transactionTracker.setEntityType(esGetTxn.getEntityType());
@@ -368,7 +374,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
           esWorkOnHand.incrementAndGet();
           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
-              requestPayload, esDataProvider, transactionTracker), esPutExecutor)
+              requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
                   .whenComplete((result, error) -> {
 
                     esWorkOnHand.decrementAndGet();
@@ -394,7 +400,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
           updateElasticTxn.setOperationType(HttpMethod.PUT);
 
           esWorkOnHand.incrementAndGet();
-          supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
+          supplyAsync(
+              new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
               esPutExecutor).whenComplete((result, error) -> {
 
                 esWorkOnHand.decrementAndGet();
@@ -493,7 +500,8 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
 
-        descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
+        descriptor = OxmEntityLookup.getInstance().getEntityDescriptors()
+            .get(linkDescriptor.getEntityType());
 
         if (descriptor == null) {
           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
@@ -509,7 +517,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
         aaiWorkOnHand.incrementAndGet();
 
-        supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
+        supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
             .whenComplete((result, error) -> {
 
               aaiWorkOnHand.decrementAndGet();
@@ -549,7 +557,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
       final String jsonResult = txn.getOperationResult().getResult();
       if (jsonResult != null && jsonResult.length() > 0) {
 
-        AggregationEntity ae = new AggregationEntity(oxmModelLoader);
+        AggregationEntity ae = new AggregationEntity();
         ae.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
         populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
         ae.deriveFields();
@@ -570,7 +578,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
           esWorkOnHand.incrementAndGet();
 
-          supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
+          supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
               .whenComplete((result, error) -> {
 
                 esWorkOnHand.decrementAndGet();
@@ -629,7 +637,6 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
         String message = "Could not deserialize JSON (representing operation result) as node tree. "
             + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
-        return;
       }
 
       JsonNode resultData = rootNode.get("result-data");
@@ -651,7 +658,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
 
           if (resourceType != null && resourceLink != null) {
 
-            descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
+            descriptor = OxmEntityLookup.getInstance().getEntityDescriptors().get(resourceType);
 
             if (descriptor == null) {
               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
@@ -660,7 +667,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
             }
 
             selflinks.add(new SelfLinkDescriptor(resourceLink,
-                SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
+                SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
 
 
           }
@@ -673,7 +680,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
   /*
    * (non-Javadoc)
    * 
-   * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
+   * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
    */
   @Override
   public OperationState doSync() {
@@ -699,7 +706,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
   /*
    * (non-Javadoc)
    * 
-   * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
+   * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
    */
   @Override
   public String getStatReport(boolean showFinalReport) {
@@ -718,7 +725,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
   /*
    * (non-Javadoc)
    * 
-   * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
+   * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
    */
   @Override
   public void shutdown() {
@@ -747,7 +754,7 @@ public class AggregationSynchronizer extends AbstractEntitySynchronizer
   /*
    * (non-Javadoc)
    * 
-   * @see org.onap.aai.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
+   * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
    */
   @Override
   public void clearCache() {