Adding UI extensibility
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / sync / ElasticSearchIndexCleaner.java
  *
  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
  */
-package org.onap.aai.sparky.synchronizer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+package org.onap.aai.sparky.sync;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -34,14 +28,24 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import org.onap.aai.sparky.dal.rest.OperationResult;
-import org.onap.aai.sparky.dal.rest.RestDataProvider;
-import org.onap.aai.sparky.logging.AaiUiMsgs;
-import org.onap.aai.sparky.synchronizer.entity.ObjectIdCollection;
-import org.onap.aai.sparky.synchronizer.entity.SearchableEntity;
-import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
+import javax.ws.rs.core.MediaType;
+
 import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.restclient.client.OperationResult;
+import org.onap.aai.sparky.dal.ElasticSearchAdapter;
+import org.onap.aai.sparky.logging.AaiUiMsgs;
+import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
+import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
+import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
+import org.onap.aai.sparky.sync.entity.SearchableEntity;
+import org.onap.aai.sparky.sync.enumeration.OperationState;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * The Class ElasticSearchIndexCleaner.
@@ -57,16 +61,10 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
   private ObjectIdCollection before;
   private ObjectIdCollection after;
 
-  private String host;
-  private String port;
-
-  private String indexName;
-  private String indexType;
-  private int scrollContextTimeToLiveInMinutes;
-  private int numItemsToGetBulkRequest;
-
-  private RestDataProvider restDataProvider;
   private ObjectMapper mapper;
+  private ElasticSearchAdapter esAdapter;
+  private ElasticSearchEndpointConfig endpointConfig;
+  private ElasticSearchSchemaConfig schemaConfig;
 
   /**
    * Instantiates a new elastic search index cleaner.
@@ -79,25 +77,20 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
    * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
    * @param numItemsToGetBulkRequest the num items to get bulk request
    */
-  protected ElasticSearchIndexCleaner(RestDataProvider restDataProvider, String indexName,
-      String indexType, String host, String port, int scrollContextTimeToLiveInMinutes,
-      int numItemsToGetBulkRequest) {
-    this.restDataProvider = restDataProvider;
+  public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter,
+      ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
+    this.esAdapter = esAdapter;
     this.before = null;
     this.after = null;
-    this.indexName = indexName;
-    this.indexType = indexType;
+    this.endpointConfig = endpointConfig;
+    this.schemaConfig = schemaConfig;
     this.mapper = new ObjectMapper();
-    this.host = host;
-    this.port = port;
-    this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
-    this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
   }
 
   /*
    * (non-Javadoc)
    * 
-   * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
+   * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
    */
   @Override
   public OperationState populatePreOperationCollection() {
@@ -106,7 +99,7 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
       before = retrieveAllDocumentIdentifiers();
       return OperationState.OK;
     } catch (Exception exc) {
-      LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
+      LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
       return OperationState.ERROR;
     }
 
@@ -115,7 +108,7 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
   /*
    * (non-Javadoc)
    * 
-   * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
+   * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
    */
   @Override
   public OperationState populatePostOperationCollection() {
@@ -123,7 +116,7 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
       after = retrieveAllDocumentIdentifiers();
       return OperationState.OK;
     } catch (Exception exc) {
-      LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
+      LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
       return OperationState.ERROR;
     }
   }
@@ -131,12 +124,12 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
   /*
    * (non-Javadoc)
    * 
-   * @see org.onap.aai.sparky.synchronizer.IndexCleaner#performCleanup()
+   * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
    */
   @Override
   public OperationState performCleanup() {
     // TODO Auto-generated method stub
-    LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, indexName);
+    LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
 
     int sizeBefore = before.getSize();
     int sizeAfter = after.getSize();
@@ -151,12 +144,12 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
 
     if (sizeAfter > 0) {
 
-      Collection<String> presyncIds = before.getImportedObjectIdsAsValues();
-      presyncIds.removeAll(after.getImportedObjectIdsAsValues());
+      Collection<String> presyncIds = before.getImportedObjectIds();
+      presyncIds.removeAll(after.getImportedObjectIds());
 
       try {
-        LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, indexName, indexType,
-            String.valueOf(presyncIds.size()));
+        LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(),
+            schemaConfig.getIndexDocType(), String.valueOf(presyncIds.size()));
 
         ObjectIdCollection bulkIds = new ObjectIdCollection();
 
@@ -169,10 +162,10 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
           bulkIds.addObjectId(it.next());
           numItemsInBulkRequest++;
 
-          if (numItemsInBulkRequest >= this.numItemsToGetBulkRequest) {
-            LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
-            OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues());
-            // pegCountersForElasticBulkDelete(bulkDeleteResult);
+          if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
+            LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(),
+                String.valueOf(bulkIds.getSize()));
+            bulkDelete(bulkIds.getImportedObjectIds());
             numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
             numItemsInBulkRequest = 0;
             bulkIds.clear();
@@ -180,14 +173,15 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
         }
 
         if (numItemsRemainingToBeDeleted > 0) {
-          LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
-          OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues());
-          // pegCountersForElasticBulkDelete(bulkDeleteResult);
+          LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(),
+              String.valueOf(bulkIds.getSize()));
+          bulkDelete(bulkIds.getImportedObjectIds());
         }
 
 
       } catch (Exception exc) {
-        LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, indexName, exc.getLocalizedMessage());
+        LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(),
+            exc.getLocalizedMessage());
 
       }
     }
@@ -197,11 +191,7 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
 
   @Override
   public String getIndexName() {
-    return indexName;
-  }
-
-  public void setIndexName(String indexName) {
-    this.indexName = indexName;
+    return schemaConfig.getIndexName();
   }
 
   /**
@@ -352,7 +342,8 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
    * @return the full url
    */
   private String getFullUrl(String resourceUrl) {
-    return String.format("http://%s:%s%s", host, port, resourceUrl);
+    return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
+        endpointConfig.getEsServerPort(), resourceUrl);
   }
 
   /**
@@ -372,13 +363,14 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
     // fields.add("entityType");
 
     String scrollRequestPayload =
-        buildInitialScrollRequestPayload(this.numItemsToGetBulkRequest, fields);
+        buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
 
-    final String fullUrlStr = getFullUrl("/" + indexName + "/" + indexType + "/_search?scroll="
-        + this.scrollContextTimeToLiveInMinutes + "m");
+    final String fullUrlStr =
+        getFullUrl("/" + schemaConfig.getIndexName() + "/" + schemaConfig.getIndexDocType()
+            + "/_search?scroll=" + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
 
     OperationResult result =
-        restDataProvider.doPost(fullUrlStr, scrollRequestPayload, "application/json");
+        esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
 
     if (result.wasSuccessful()) {
 
@@ -435,33 +427,18 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
             currentDocumentIds.addObjectId(key);
           }
 
-          /*
-           * if (key != null) {
-           * 
-           * JsonNode fieldsNode = jNode.get("fields");
-           * 
-           * if (fieldsNode != null) {
-           * 
-           * JsonNode entityTypeNode = fieldsNode.get("entityType");
-           * 
-           * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
-           * 
-           * if (aNode.size() > 0) { value = aNode.get(0).asText(); objAndtTypesMap.put(key, value);
-           * numRecordsFetched++; } } } }
-           */
-
         }
 
         int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
 
         int numRequiredAdditionalFetches =
-            (totalRecordsRemainingToFetch / this.numItemsToGetBulkRequest);
+            (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
 
         /*
          * Do an additional fetch for the remaining items (if needed)
          */
 
-        if (totalRecordsRemainingToFetch % numItemsToGetBulkRequest != 0) {
+        if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
           numRequiredAdditionalFetches += 1;
         }
 
@@ -511,15 +488,13 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
   private OperationState collectItemsFromScrollContext(String scrollId,
       ObjectIdCollection objectIds) throws IOException {
 
-    // ObjectIdCollection documentIdCollection = new ObjectIdCollection();
-
-    String requestPayload =
-        buildSubsequentScrollContextRequestPayload(scrollId, scrollContextTimeToLiveInMinutes);
+    String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
+        endpointConfig.getScrollContextTimeToLiveInMinutes());
 
     final String fullUrlStr = getFullUrl("/_search/scroll");
 
     OperationResult opResult =
-        restDataProvider.doPost(fullUrlStr, requestPayload, "application/json");
+        esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
 
     if (opResult.getResultCode() >= 300) {
       LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
@@ -527,6 +502,11 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
     }
 
     JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
+    boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
+    final String tookStr = getFieldValue(rootNode, "took");
+    int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
+
+    JsonNode hitsNode = rootNode.get("hits");
 
     /*
      * Check the result for success / failure, and enumerate all the index ids that resulted in
@@ -534,11 +514,6 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
      */
 
     if (rootNode != null) {
-      boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
-      final String tookStr = getFieldValue(rootNode, "took");
-      int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
-
-      JsonNode hitsNode = rootNode.get("hits");
 
       if (timedOut) {
         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
@@ -566,25 +541,9 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
         if (key != null) {
           objectIds.addObjectId(key);
 
-          /*
-           * JsonNode fieldsNode = jNode.get("fields");
-           * 
-           * if (fieldsNode != null) {
-           * 
-           * JsonNode entityTypeNode = fieldsNode.get("entityType");
-           * 
-           * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
-           * 
-           * if (aNode.size() > 0) { value = aNode.get(0).asText(); objectIdsAndTypes.put(key,
-           * value); } } } }
-           */
-
         }
 
       }
-    } else {
-      // scroll context get failed, nothing else to do
-      LOG.error(AaiUiMsgs.ERROR_GENERIC, opResult.toString());
     }
 
     return OperationState.OK;
@@ -629,163 +588,16 @@ public class ElasticSearchIndexCleaner implements IndexCleaner {
     StringBuilder sb = new StringBuilder(128);
 
     for (String id : docIds) {
-      sb.append(
-          String.format(BULK_OP_LINE_TEMPLATE, buildDeleteDataObject(indexName, indexType, id)));
+      sb.append(String.format(BULK_OP_LINE_TEMPLATE,
+          buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
     }
 
     sb.append("\n");
 
     final String fullUrlStr = getFullUrl("/_bulk");
 
-    return restDataProvider.doPost(fullUrlStr, sb.toString(), "application/x-www-form-urlencoded");
-
-  }
-
-  /**
-   * @return the before
-   */
-  public ObjectIdCollection getBefore() {
-    return before;
-  }
-
-  /**
-   * @param before the before to set
-   */
-  public void setBefore(ObjectIdCollection before) {
-    this.before = before;
-  }
-
-  /**
-   * @return the after
-   */
-  public ObjectIdCollection getAfter() {
-    return after;
-  }
-
-  /**
-   * @param after the after to set
-   */
-  public void setAfter(ObjectIdCollection after) {
-    this.after = after;
-  }
-
-  /**
-   * @return the host
-   */
-  public String getHost() {
-    return host;
-  }
-
-  /**
-   * @param host the host to set
-   */
-  public void setHost(String host) {
-    this.host = host;
-  }
-
-  /**
-   * @return the port
-   */
-  public String getPort() {
-    return port;
-  }
-
-  /**
-   * @param port the port to set
-   */
-  public void setPort(String port) {
-    this.port = port;
-  }
-
-  /**
-   * @return the indexType
-   */
-  public String getIndexType() {
-    return indexType;
-  }
-
-  /**
-   * @param indexType the indexType to set
-   */
-  public void setIndexType(String indexType) {
-    this.indexType = indexType;
-  }
-
-  /**
-   * @return the scrollContextTimeToLiveInMinutes
-   */
-  public int getScrollContextTimeToLiveInMinutes() {
-    return scrollContextTimeToLiveInMinutes;
-  }
-
-  /**
-   * @param scrollContextTimeToLiveInMinutes the scrollContextTimeToLiveInMinutes to set
-   */
-  public void setScrollContextTimeToLiveInMinutes(int scrollContextTimeToLiveInMinutes) {
-    this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
-  }
+    return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);
 
-  /**
-   * @return the numItemsToGetBulkRequest
-   */
-  public int getNumItemsToGetBulkRequest() {
-    return numItemsToGetBulkRequest;
-  }
-
-  /**
-   * @param numItemsToGetBulkRequest the numItemsToGetBulkRequest to set
-   */
-  public void setNumItemsToGetBulkRequest(int numItemsToGetBulkRequest) {
-    this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
-  }
-
-  /**
-   * @return the restDataProvider
-   */
-  public RestDataProvider getRestDataProvider() {
-    return restDataProvider;
-  }
-
-  /**
-   * @param restDataProvider the restDataProvider to set
-   */
-  public void setRestDataProvider(RestDataProvider restDataProvider) {
-    this.restDataProvider = restDataProvider;
-  }
-
-  /**
-   * @return the mapper
-   */
-  public ObjectMapper getMapper() {
-    return mapper;
-  }
-
-  /**
-   * @param mapper the mapper to set
-   */
-  public void setMapper(ObjectMapper mapper) {
-    this.mapper = mapper;
-  }
-
-  /**
-   * @return the log
-   */
-  public static Logger getLog() {
-    return LOG;
-  }
-
-  /**
-   * @return the bulkOpLineTemplate
-   */
-  public static String getBulkOpLineTemplate() {
-    return BULK_OP_LINE_TEMPLATE;
-  }
-
-  /**
-   * @return the timestampFormat
-   */
-  public static String getTimestampFormat() {
-    return TIMESTAMP_FORMAT;
   }
 
   /*