Handle missing search indexes 15/6115/1
authorgfraboni <gino.fraboni@amdocs.com>
Thu, 20 Jul 2017 18:17:12 +0000 (14:17 -0400)
committergfraboni <gino.fraboni@amdocs.com>
Thu, 20 Jul 2017 18:17:28 +0000 (14:17 -0400)
[AAI-63] Data Router must handle Search Service document create failures
if index does not exit.

Change-Id: Ic4412a6295ec9f84b223c80c0326c5ef2face99d
Signed-off-by: gfraboni <gino.fraboni@amdocs.com>
src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java
src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java
src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java [new file with mode: 0644]
src/main/resources/logging/DataRouterMsgs.properties
src/main/resources/logging/EntityEventPolicyMsgs.properties

index 8304c96..71a5d5d 100644 (file)
@@ -126,6 +126,11 @@ public enum DataRouterMsgs implements LogMessageEnum {
    */
   PROCESS_REST_REQUEST,
 
+  /**
+   * Index {0} may not exist in the search data store.  Attempting to create it now.
+   */
+  CREATE_MISSING_INDEX,
+  
   /**
    * Processed event {0}. Result: {1}
    * Arguments: {0} = event topic {1} = result
@@ -150,7 +155,16 @@ public enum DataRouterMsgs implements LogMessageEnum {
   
   INVALID_OXM_FILE,
   
-  INVALID_OXM_DIR;
+  INVALID_OXM_DIR,
+  
+  /**
+   * Failed to create or update document in index {0}.  Cause: {1}
+   * 
+   * Arguments:
+   *    {0} = Index name
+   *    {1} = Failure cause
+   */
+  FAIL_TO_CREATE_UPDATE_DOC;
 
   /**
    * Static initializer to ensure the resource bundles for this class are loaded...
index afddad2..3c3990e 100644 (file)
@@ -73,6 +73,7 @@ import org.openecomp.datarouter.util.EntityOxmReferenceHelper;
 import org.openecomp.datarouter.util.ExternalOxmModelProcessor;
 import org.openecomp.datarouter.util.OxmModelLoader;
 import org.openecomp.datarouter.util.RouterServiceUtil;
+import org.openecomp.datarouter.util.SearchServiceAgent;
 import org.openecomp.datarouter.util.SearchSuggestionPermutation;
 import org.openecomp.datarouter.util.Version;
 import org.openecomp.datarouter.util.VersionedOxmEntities;
@@ -89,7 +90,6 @@ public class EntityEventPolicy implements Processor {
   private static final String entitySearchSchema = "entitysearch_schema.json";
   private static final String topographicalSearchSchema   = "topographysearch_schema.json";
   private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
-  RestClient searchClient = null;
 
   private final String EVENT_HEADER = "event-header";
   private final String ENTITY_HEADER = "entity";
@@ -106,9 +106,19 @@ public class EntityEventPolicy implements Processor {
   Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
   private String oxmVersion = null;
 
-  private String entityIndexTarget = null;
+  /** Agent for communicating with the Search Service. */
+  private SearchServiceAgent searchAgent = null;
+  
+  /** Search index name for storing AAI event entities. */
+  private String entitySearchIndex;
+
+  /** Search index name for storing topographical search data. */
+  private String topographicalSearchIndex;
+  
+  /** Search index name for suggestive search data. */
+  private String aggregateGenericVnfIndex;
+  
   private String entitySearchTarget = null;
-  private String topographicalIndexTarget = null;
   private String topographicalSearchTarget = null;
   private String autoSuggestSearchTarget = null;
   private String aggregationSearchVnfTarget = null;
@@ -131,27 +141,27 @@ public class EntityEventPolicy implements Processor {
 
     srcDomain = config.getSourceDomain();
 
-    entityIndexTarget =
-      EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
-        config.getSearchEntitySearchIndex());
+    // Populate the index names.
+    entitySearchIndex        = config.getSearchEntitySearchIndex();
+    topographicalSearchIndex = config.getSearchTopographySearchIndex();
+    aggregateGenericVnfIndex = config.getSearchAggregationVnfIndex();
+       
+    // Instantiate the agent that we will use for interacting with the Search Service.
+    searchAgent = new SearchServiceAgent(config.getSearchCertName(),
+                                         config.getSearchKeystore(),
+                                         config.getSearchKeystorePwd(),
+                                         EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(),
+                                                                        config.getSearchEndpoint()),
+                                         config.getSearchEndpointDocuments(),
+                                         logger);
 
     entitySearchTarget =
         EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
             config.getSearchEntitySearchIndex(), config.getSearchEndpointDocuments());
 
-    topographicalIndexTarget =
-      EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
-        config.getSearchTopographySearchIndex());
-
     topographicalSearchTarget = EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(),
         config.getSearchEndpoint(), config.getSearchTopographySearchIndex());
 
-    // Create REST client for search service
-    searchClient = new RestClient().validateServerHostname(false).validateServerCertChain(true)
-        .clientCertFile(DataRouterConstants.DR_HOME_AUTH + config.getSearchCertName())
-        .clientCertPassword(Password.deobfuscate(config.getSearchKeystorePwd()))
-        .trustStore(DataRouterConstants.DR_HOME_AUTH + config.getSearchKeystore());
-
     autoSuggestSearchTarget =
         EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
             config.getSearchEntityAutoSuggestIndex(), config.getSearchEndpointDocuments());
@@ -193,73 +203,12 @@ public class EntityEventPolicy implements Processor {
   public void startup() {
     
     // Create the indexes in the search service if they do not already exist.
-    createSearchIndex(entityIndexTarget, entitySearchSchema);
-    createSearchIndex(topographicalIndexTarget, topographicalSearchSchema);
+    searchAgent.createSearchIndex(entitySearchIndex, entitySearchSchema);
+    searchAgent.createSearchIndex(topographicalSearchIndex, topographicalSearchSchema);
     
     logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED);
   }
 
-  /**
-   * Creates an index through the search db abstraction
-   * 
-   * @param searchRESTClient
-   *            the REST client configured to contact the search db
-   *            abstraction
-   * @param searchTarget
-   *            the URL to attempt to create the search index
-   * @param schemaLocation
-   *            the location of the mappings file for the index
-   */
-  private void createSearchIndex(String searchTarget, String schemaLocation) {
-     
-    logger.debug("Creating search index, searchTarget = " + searchTarget + ", schemaLocation = " + schemaLocation);
-           
-    MultivaluedMap<String, String> headers = new MultivaluedMapImpl();
-    headers.put("Accept", Arrays.asList("application/json"));
-    headers.put(Headers.FROM_APP_ID, Arrays.asList("DL"));
-    headers.put(Headers.TRANSACTION_ID, Arrays.asList(UUID.randomUUID().toString()));
-      
-    try {
-
-      OperationResult result = searchClient.put(searchTarget, loadFileData(schemaLocation), headers,
-                                                MediaType.APPLICATION_JSON_TYPE, null);
-
-      if (!HttpUtil.isHttpResponseClassSuccess(result.getResultCode())) {
-        logger.error(EntityEventPolicyMsgs.FAIL_TO_CREATE_SEARCH_INDEX, searchTarget, result.getFailureCause());
-      } else {
-        logger.info(EntityEventPolicyMsgs.SEARCH_INDEX_CREATE_SUCCESS, searchTarget);
-      }
-
-    } catch (Exception e) {
-      logger.error(EntityEventPolicyMsgs.FAIL_TO_CREATE_SEARCH_INDEX, searchTarget, e.getLocalizedMessage());
-    }
-  }
-
-  /**
-   * Convenience method to load up all the data from a file into a string
-   * 
-   * @param filename the filename to read from disk
-   * @return the data contained within the file
-   * @throws Exception
-   */
-  protected String loadFileData(String filename) throws Exception {
-    StringBuilder data = new StringBuilder();
-    try {
-      BufferedReader in = new BufferedReader(new InputStreamReader(
-          EntityEventPolicy.class.getClassLoader().getResourceAsStream("/" + filename),
-          StandardCharsets.UTF_8));
-      String line;
-
-      while ((line = in.readLine()) != null) {
-        data.append(line);
-      }
-    } catch (Exception e) {
-      throw new Exception("Failed to read from file = " + filename + ".", e);
-    }
-
-    return data.toString();
-  }
-
 
   /**
    * Convert object to json.
@@ -456,7 +405,7 @@ public class EntityEventPolicy implements Processor {
       return;
     }
 
-    handleSearchServiceOperation(aaiEventEntity, action, this.entitySearchTarget);
+    handleSearchServiceOperation(aaiEventEntity, action, entitySearchIndex);
 
     handleTopographicalData(uebPayload, action, entityType, oxmEntityType, oxmJaxbContext,
         entityPrimaryKeyFieldName, entityPrimaryKeyFieldValue);
@@ -955,8 +904,7 @@ public class EntityEventPolicy implements Processor {
       String jsonPayload = aaiEventEntity.getAsJson();
 
       // Run the GET to retrieve the ETAG from the search service
-      OperationResult storedEntity =
-          searchClient.get(entitySearchTarget+entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+      OperationResult storedEntity = searchAgent.getDocument(entitySearchIndex, entityId);
 
       if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
         /*
@@ -997,16 +945,14 @@ public class EntityEventPolicy implements Processor {
             // Do the PUT with new CER
             ((ObjectNode)node).put("crossEntityReferenceValues", newCer);
             jsonPayload = NodeUtils.convertObjectToJson(node, false);
-            searchClient.put(entitySearchTarget + entityId, jsonPayload, headers,
-                MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+            searchAgent.putDocument(entitySearchIndex, entityId, jsonPayload, headers);
           }
          }
       } else {
 
         if (storedEntity.getResultCode() == 404) {
           // entity not found, so attempt to do a PUT
-          searchClient.put(entitySearchTarget + entityId, aaiEventEntity.getAsJson(), headers,
-              MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+          searchAgent.putDocument(entitySearchIndex, entityId, aaiEventEntity.getAsJson(), headers);
         } else {
           logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE,
               aaiEventEntity.getId(), "SYNC_ENTITY");
@@ -1039,8 +985,7 @@ public class EntityEventPolicy implements Processor {
       if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
           || action.equalsIgnoreCase(ACTION_UPDATE)) {
         // Run the GET to retrieve the ETAG from the search service
-        OperationResult storedEntity =
-            searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+        OperationResult storedEntity = searchAgent.getDocument(aggregateGenericVnfIndex, entityId);
 
         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1056,20 +1001,23 @@ public class EntityEventPolicy implements Processor {
         String eventEntityStr = eventEntity.getAsJson();
 
         if (eventEntityStr != null) {
-          searchClient.put(target + entityId, eventEntity.getAsJson(), headers,
-              MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+          List<String> createIndex = new ArrayList<String>();
+          createIndex.add("true");
+          headers.put("X-CreateIndex", createIndex);
+          searchAgent.putDocument(aggregateGenericVnfIndex, entityId, eventEntity.getAsJson(), headers);
         }
       } else if (action.equalsIgnoreCase(ACTION_CREATE)) {
         String eventEntityStr = eventEntity.getAsJson();
 
         if (eventEntityStr != null) {
-          searchClient.post(target, eventEntityStr, headers, MediaType.APPLICATION_JSON_TYPE,
-              MediaType.APPLICATION_JSON_TYPE);
+          List<String> createIndex = new ArrayList<String>();
+          createIndex.add("true");
+          headers.put("X-CreateIndex", createIndex);
+          searchAgent.postDocument(aggregateGenericVnfIndex, eventEntityStr, headers);
         }
       } else if (action.equalsIgnoreCase(ACTION_DELETE)) {
         // Run the GET to retrieve the ETAG from the search service
-        OperationResult storedEntity =
-            searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+        OperationResult storedEntity = searchAgent.getDocument(aggregateGenericVnfIndex, entityId);
 
         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1081,7 +1029,7 @@ public class EntityEventPolicy implements Processor {
                 entityId);
           }
 
-          searchClient.delete(target + eventEntity.getId(), headers, null);
+          searchAgent.deleteDocument(aggregateGenericVnfIndex, eventEntity.getId(), headers);
         } else {
           logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
               entityId);
@@ -1095,8 +1043,9 @@ public class EntityEventPolicy implements Processor {
     }
   }
 
-  private void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action,
-      String target) {
+  private void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, 
+                                            String                  action,
+                                            String                  index) {
     try {
 
       Map<String, List<String>> headers = new HashMap<>();
@@ -1111,8 +1060,7 @@ public class EntityEventPolicy implements Processor {
           || action.equalsIgnoreCase(ACTION_UPDATE)) {
 
         // Run the GET to retrieve the ETAG from the search service
-        OperationResult storedEntity =
-            searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+        OperationResult storedEntity = searchAgent.getDocument(index, entityId);
 
         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1120,20 +1068,22 @@ public class EntityEventPolicy implements Processor {
           if (etag != null && etag.size() > 0) {
             headers.put(Headers.IF_MATCH, etag);
           } else {
-            logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+            logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
                 entityId);
           }
         }
 
-        searchClient.put(target + entityId, eventEntity.getAsJson(), headers,
-            MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+        // Write the entity to the search service.
+        // PUT
+        searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
+        
       } else if (action.equalsIgnoreCase(ACTION_CREATE)) {
-        searchClient.post(target, eventEntity.getAsJson(), headers, MediaType.APPLICATION_JSON_TYPE,
-            MediaType.APPLICATION_JSON_TYPE);
+        // Write the entry to the search service.
+        searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
+        
       } else if (action.equalsIgnoreCase(ACTION_DELETE)) {
         // Run the GET to retrieve the ETAG from the search service
-        OperationResult storedEntity =
-            searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+        OperationResult storedEntity = searchAgent.getDocument(index, entityId);
 
         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1141,13 +1091,13 @@ public class EntityEventPolicy implements Processor {
           if (etag != null && etag.size() > 0) {
             headers.put(Headers.IF_MATCH, etag);
           } else {
-            logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+            logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
                 entityId);
           }
 
-          searchClient.delete(target + eventEntity.getId(), headers, null);
+          searchAgent.deleteDocument(index, eventEntity.getId(), headers);
         } else {
-          logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+          logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
               entityId);
         }
       } else {
diff --git a/src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java b/src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java
new file mode 100644 (file)
index 0000000..3d27425
--- /dev/null
@@ -0,0 +1,368 @@
+/**\r
+ * ============LICENSE_START=======================================================\r
+ * DataRouter\r
+ * ================================================================================\r
+ * Copyright © 2017 AT&T Intellectual Property.\r
+ * Copyright © 2017 Amdocs\r
+ * All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *    http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ *\r
+ * ECOMP and OpenECOMP are trademarks\r
+ * and service marks of AT&T Intellectual Property.\r
+ */\r
+package org.openecomp.datarouter.util;\r
+\r
+import java.io.BufferedReader;\r
+import java.io.InputStreamReader;\r
+import java.nio.charset.StandardCharsets;\r
+import java.util.Arrays;\r
+import java.util.HashMap;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.UUID;\r
+\r
+import javax.ws.rs.core.MediaType;\r
+import javax.ws.rs.core.MultivaluedMap;\r
+import javax.ws.rs.core.Response.Status;\r
+\r
+import org.eclipse.jetty.util.security.Password;\r
+import org.openecomp.cl.api.Logger;\r
+import org.openecomp.cl.mdc.MdcContext;\r
+import org.openecomp.datarouter.logging.DataRouterMsgs;\r
+import org.openecomp.datarouter.policy.EntityEventPolicy;\r
+import org.openecomp.restclient.client.Headers;\r
+import org.openecomp.restclient.client.OperationResult;\r
+import org.openecomp.restclient.client.RestClient;\r
+import org.openecomp.restclient.enums.RestAuthenticationMode;\r
+import org.openecomp.restclient.rest.HttpUtil;\r
+import org.slf4j.MDC;\r
+\r
+import com.sun.jersey.core.util.MultivaluedMapImpl;\r
+\r
+public class SearchServiceAgent {\r
+\r
+  private Logger logger;\r
+  \r
+  private RestClient searchClient = null;\r
+  private Map<String, String> indexSchemaMapping = new HashMap<String, String>();\r
+  \r
+  private String searchUrl = null;\r
+  private String documentEndpoint = null;\r
+  \r
+  \r
+  /**\r
+   * Creates a new instance of the search service agent.\r
+   * \r
+   * @param certName         - Certificate to use for talking to the Search Service.\r
+   * @param keystore         - Keystore to use for talking to the Search Service.\r
+   * @param keystorePwd      - Keystore password for talking to the Search Service.\r
+   * @param searchUrl        - URL at which the Search Service can be reached.\r
+   * @param documentEndpoint - Endpoint for accessing document resources on the Search Service.\r
+   * @param logger           - Logger to use for system logs.\r
+   */\r
+  public SearchServiceAgent(String certName, \r
+                            String keystore, \r
+                            String keystorePwd,\r
+                            String searchUrl,\r
+                            String documentEndpoint,\r
+                            Logger logger) {\r
+    \r
+    initialize(certName, keystore, keystorePwd, searchUrl, documentEndpoint, logger);\r
+  }\r
+  \r
+  \r
+  /**\r
+   * Performs all one-time initialization required for the search agent.\r
+   * \r
+   * @param certName         - Certificate to use for talking to the Search Service.\r
+   * @param keystore         - Keystore to use for talking to the Search Service.\r
+   * @param keystorePwd      - Keystore password for talking to the Search Service.\r
+   * @param searchUrl        - URL at which the Search Service can be reached.\r
+   * @param documentEndpoint - Endpoint for accessing document resources on the Search Service.\r
+   * @param logger           - Logger to use for system logs.\r
+   */\r
+  private void initialize(String certName, \r
+                          String keystore, \r
+                          String keystorePwd, \r
+                          String searchUrl, \r
+                          String documentEndpoint, \r
+                          Logger logger) {\r
+    \r
+    // Create REST client for search service\r
+    searchClient = new RestClient()\r
+                    .authenticationMode(RestAuthenticationMode.SSL_CERT)\r
+                    .validateServerHostname(false)\r
+                    .validateServerCertChain(true)\r
+                    .clientCertFile(DataRouterConstants.DR_HOME_AUTH + certName)\r
+                    .clientCertPassword(Password.deobfuscate(keystorePwd))\r
+                    .trustStore(DataRouterConstants.DR_HOME_AUTH + keystore);\r
+    \r
+    this.searchUrl        = searchUrl;\r
+    this.documentEndpoint = documentEndpoint;\r
+    \r
+    this.logger           = logger;\r
+  }\r
+  \r
+  \r
+  /**\r
+   * Creates an index through the search db abstraction\r
+   * \r
+   * @param index          - The name of the index to be created.\r
+   * @param schemaLocation - The name of the schema file for the index.\r
+   */\r
+  public void createSearchIndex(String index, String schemaLocation) {\r
+     \r
+    // Create a mapping of the index name to schema location \r
+    indexSchemaMapping.put(index, schemaLocation);\r
+    \r
+    // Now, create the index.\r
+    createIndex(index, schemaLocation);\r
+  }\r
+  \r
+  \r
+  /**\r
+   * This method performs the actual work of creating a search index.\r
+   * \r
+   * @param index          - The name of the index to be created.\r
+   * @param schemaLocation - The name of the schema file for the index.\r
+   */\r
+  private void createIndex(String index, String schemaLocation) {\r
+    \r
+    logger.debug("Creating search index, index name: = " + index + ", schemaLocation = " + schemaLocation);\r
+    \r
+    MultivaluedMap<String, String> headers = new MultivaluedMapImpl();\r
+    headers.put("Accept", Arrays.asList("application/json"));\r
+    headers.put(Headers.FROM_APP_ID, Arrays.asList("DL"));\r
+    headers.put(Headers.TRANSACTION_ID, Arrays.asList(UUID.randomUUID().toString()));\r
+      \r
+    String url = concatSubUri(searchUrl, index);\r
+    try {\r
+\r
+      OperationResult result = searchClient.put(url, loadFileData(schemaLocation), headers,\r
+                                                MediaType.APPLICATION_JSON_TYPE, null);\r
+\r
+      if (!HttpUtil.isHttpResponseClassSuccess(result.getResultCode())) {\r
+        logger.error(DataRouterMsgs.FAIL_TO_CREATE_SEARCH_INDEX, index, result.getFailureCause());\r
+      } else {\r
+        logger.info(DataRouterMsgs.SEARCH_INDEX_CREATE_SUCCESS, index);\r
+      }\r
+\r
+    } catch (Exception e) {\r
+      logger.error(DataRouterMsgs.FAIL_TO_CREATE_SEARCH_INDEX, index, e.getLocalizedMessage());\r
+    }\r
+  }\r
+  \r
+  \r
+  /**\r
+   * Retrieves a document from the search service.\r
+   * \r
+   * @param index - The index to retrieve the document from.\r
+   * @param id    - The unique identifier for the document.\r
+   * \r
+   * @return - The REST response returned from the Search Service.\r
+   */\r
+  public OperationResult getDocument(String index, String id) {\r
+    \r
+    Map<String, List<String>> headers = new HashMap<>();\r
+    headers.put(Headers.FROM_APP_ID, Arrays.asList("Data Router"));\r
+    headers.put(Headers.TRANSACTION_ID, Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID)));\r
+    \r
+    String url = concatSubUri(searchUrl, index, documentEndpoint, id);\r
+    return searchClient.get(url, headers, MediaType.APPLICATION_JSON_TYPE);    \r
+  }\r
+  \r
+  \r
+  /**\r
+   * Creates or updates a document in the Search Service.\r
+   * \r
+   * @param index   - The index to create or update the document in.\r
+   * @param id      - The identifier for the document.\r
+   * @param payload - The document contents.\r
+   * @param headers - HTTP headers.\r
+   */\r
+  public void putDocument(String index, String id, String payload, Map<String, List<String>> headers) {\r
+        \r
+    // Try to post the document to the search service.\r
+    OperationResult result = doDocumentPut(index, id, payload, headers);\r
+    \r
+    // A 404 response from the Search Service may indicate that the index we are writing\r
+    // to does not actually exist.  We will try creating it now.\r
+    if(result.getResultCode() == Status.NOT_FOUND.getStatusCode()) {\r
+            \r
+      // Lookup the location of the schema that we want to create.\r
+      String indexSchemaLocation = indexSchemaMapping.get(index);\r
+      if(indexSchemaLocation != null) {\r
+        \r
+        // Try creating the index now...\r
+        logger.info(DataRouterMsgs.CREATE_MISSING_INDEX, index);\r
+        createIndex(index, indexSchemaLocation);\r
+        \r
+        // ...and retry the document post.\r
+        result = doDocumentPut(index, id, payload, headers);\r
+      }\r
+    }\r
+    \r
+    if(!resultSuccessful(result)) {\r
+      logger.error(DataRouterMsgs.FAIL_TO_CREATE_UPDATE_DOC, index, result.getFailureCause());\r
+    }\r
+  }\r
+  \r
+  \r
+  /**\r
+   * This method does the actual work of submitting a document PUT request to the Search Service.\r
+   * \r
+   * @param index   - The index to create or update the document in.\r
+   * @param id      - The identifier for the document.\r
+   * @param payload - The document contents.\r
+   * @param headers - HTTP headers.\r
+   * \r
+   * @return - The HTTP response returned by the Search Service.\r
+   */\r
+  private OperationResult doDocumentPut(String index, String id, String payload, Map<String, List<String>> headers) {\r
+    \r
+    String url = concatSubUri(searchUrl, index, documentEndpoint, id);\r
+    return searchClient.put(url, payload, headers, MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);\r
+  }\r
+  \r
+  \r
+  /**\r
+   * Creates a document in the Search Service.\r
+   * \r
+   * @param index   - The index to create the document in.\r
+   * @param payload - The document contents.\r
+   * @param headers - HTTP headers.\r
+   */\r
+  public void postDocument(String index, String payload, Map<String, List<String>> headers) {\r
+    \r
+    // Try to post the document to the search service.\r
+    OperationResult result = doDocumentPost(index, payload, headers);\r
+    \r
+    // A 404 response from the Search Service may indicate that the index we are writing\r
+    // to does not actually exist.  We will try creating it now.\r
+    if(result.getResultCode() == Status.NOT_FOUND.getStatusCode()) {\r
+      \r
+      // Lookup the location of the schema that we want to create.\r
+      String indexSchemaLocation = indexSchemaMapping.get(index);\r
+      if(indexSchemaLocation != null) {\r
+        \r
+        // Try creating the index now...\r
+        logger.info(DataRouterMsgs.CREATE_MISSING_INDEX, index);\r
+        createIndex(index, indexSchemaLocation);\r
+        \r
+        // ...and retry the document post.\r
+        result = doDocumentPost(index, payload, headers);\r
+      }\r
+    }\r
+    \r
+    if(!resultSuccessful(result)) {\r
+      logger.error(DataRouterMsgs.FAIL_TO_CREATE_UPDATE_DOC, index, result.getFailureCause());\r
+    }\r
+  }\r
+  \r
+  \r
+  /**\r
+   * This method does the actual work of submitting a document PUT request to the Search Service.\r
+   * \r
+   * @param index   - The index to create or update the document in.\r
+   * @param payload - The document contents.\r
+   * @param headers - HTTP headers.\r
+   * \r
+   * @return - The HTTP response returned by the Search Service.\r
+   */\r
+  private OperationResult doDocumentPost(String index, String payload, Map<String, List<String>> headers) {\r
+    \r
+    String url = concatSubUri(searchUrl, index, documentEndpoint);\r
+    return searchClient.post(url, payload, headers, MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);\r
+  }\r
+  \r
+  \r
+  /**\r
+   * Removes a document from the Search Service.\r
+   * \r
+   * @param index   - The index to create the document in.\r
+   * @param id      - The identifier for the document.\r
+   * @param payload - The document contents.\r
+   * @param headers - HTTP headers.\r
+   */\r
+  public void deleteDocument(String index, String documentId, Map<String, List<String>> headers) {\r
+    \r
+    String url = concatSubUri(searchUrl, index, documentEndpoint, documentId);\r
+    searchClient.delete(url, headers, null);\r
+  }\r
+  \r
+  \r
+  /**\r
+   * Convenience method to load up all the data from a file into a string\r
+   * \r
+   * @param filename the filename to read from disk\r
+   * @return the data contained within the file\r
+   * @throws Exception\r
+   */\r
+  protected String loadFileData(String filename) throws Exception {\r
+    StringBuilder data = new StringBuilder();\r
+    try {\r
+      BufferedReader in = new BufferedReader(new InputStreamReader(\r
+          EntityEventPolicy.class.getClassLoader().getResourceAsStream("/" + filename),\r
+          StandardCharsets.UTF_8));\r
+      String line;\r
+\r
+      while ((line = in.readLine()) != null) {\r
+        data.append(line);\r
+      }\r
+    } catch (Exception e) {\r
+      throw new Exception("Failed to read from file = " + filename + ".", e);\r
+    }\r
+\r
+    return data.toString();\r
+  }\r
+  \r
+  \r
+  /**\r
+   * Helper utility to concatenate substrings of a URI together to form a proper URI.\r
+   * \r
+   * @param suburis the list of substrings to concatenate together\r
+   * @return the concatenated list of substrings\r
+   */\r
+  public static String concatSubUri(String... suburis) {\r
+    String finalUri = "";\r
+\r
+    for (String suburi : suburis) {\r
+\r
+      if (suburi != null) {\r
+        // Remove any leading / since we only want to append /\r
+        suburi = suburi.replaceFirst("^/*", "");\r
+\r
+        // Add a trailing / if one isn't already there\r
+        finalUri += suburi.endsWith("/") ? suburi : suburi + "/";\r
+      }\r
+    }\r
+\r
+    return finalUri;\r
+  }\r
+  \r
+  \r
+  /**\r
+   * Helper utility to check the response code of an HTTP response.\r
+   * \r
+   * @param aResult - The response that we want to check.\r
+   * \r
+   * @return - true if the response contains a success code,\r
+   *           false otherwise.\r
+   */\r
+  private boolean resultSuccessful(OperationResult aResult) {\r
+    \r
+    return (aResult.getResultCode() >= 200) && (aResult.getResultCode() < 300);\r
+  }\r
+}\r
index 352f239..9650b22 100644 (file)
@@ -77,6 +77,10 @@ PROCESS_REST_REQUEST=\
             DR0009I|\
             Received request {0} {1} from {2}.  Sending response: {3}
             
+CREATE_MISSING_INDEX=\
+                       DR0013I|\
+                       Index {0} may not exist in the search data store.  Attempting to create it now.
+
 PROCESS_EVENT=\
             DR0010I|\
             Processed event {0}.  Result: {1}
@@ -133,3 +137,6 @@ SYSTEM_ERROR=\
             DR3011E|\
             System Error: {0}\ 
             
+FAIL_TO_CREATE_UPDATE_DOC=\
+                   DR3015E|\
+                   Failed to create or update document in index {0}.  Cause: {1}
index 3fac391..85444c9 100644 (file)
@@ -39,7 +39,7 @@ FAILED_TO_PARSE_UEB_PAYLOAD=\
 
 NO_ETAG_AVAILABLE_FAILURE=\
          EEP0005E|\
-         Unable to retrieve etag at {0} for entity with id {1}
+         Unable to retrieve etag at index {0} for entity with id {1}
 
 ENTITY_OPERATION_NOT_SUPPORTED=\
          EEP0006E|\