2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright © 2017 AT&T Intellectual Property.
 
   6  * Copyright © 2017 Amdocs
 
   8  * ================================================================================
 
   9  * Licensed under the Apache License, Version 2.0 (the "License");
 
  10  * you may not use this file except in compliance with the License.
 
  11  * You may obtain a copy of the License ati
 
  13  *    http://www.apache.org/licenses/LICENSE-2.0
 
  15  * Unless required by applicable law or agreed to in writing, software
 
  16  * distributed under the License is distributed on an "AS IS" BASIS,
 
  17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  18  * See the License for the specific language governing permissions and
 
  19  * limitations under the License.
 
  20  * ============LICENSE_END=========================================================
 
  22  * ECOMP and OpenECOMP are trademarks
 
  23  * and service marks of AT&T Intellectual Property.
 
  25 package org.openecomp.sa.searchdbabstraction.elasticsearch.dao;
 
  27 import com.att.aft.dme2.internal.google.common.base.Throwables;
 
  28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
 
  29 import com.fasterxml.jackson.core.JsonParseException;
 
  30 import com.fasterxml.jackson.core.JsonProcessingException;
 
  31 import com.fasterxml.jackson.databind.JsonMappingException;
 
  32 import com.fasterxml.jackson.databind.ObjectMapper;
 
  33 import edu.emory.mathcs.backport.java.util.Arrays;
 
  34 import org.json.simple.JSONArray;
 
  35 import org.json.simple.JSONObject;
 
  36 import org.json.simple.parser.JSONParser;
 
  37 import org.json.simple.parser.ParseException;
 
  38 import org.openecomp.cl.api.LogFields;
 
  39 import org.openecomp.cl.api.LogLine;
 
  40 import org.openecomp.cl.api.Logger;
 
  41 import org.openecomp.cl.eelf.LoggerFactory;
 
  42 import org.openecomp.cl.mdc.MdcContext;
 
  43 import org.openecomp.cl.mdc.MdcOverride;
 
  44 import org.openecomp.sa.rest.AnalysisConfiguration;
 
  45 import org.openecomp.sa.rest.ApiUtils;
 
  46 import org.openecomp.sa.rest.BulkRequest;
 
  47 import org.openecomp.sa.rest.BulkRequest.OperationType;
 
  48 import org.openecomp.sa.rest.DocumentSchema;
 
  49 import org.openecomp.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
 
  50 import org.openecomp.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
 
  51 import org.openecomp.sa.searchdbabstraction.entity.AggregationResult;
 
  52 import org.openecomp.sa.searchdbabstraction.entity.AggregationResults;
 
  53 import org.openecomp.sa.searchdbabstraction.entity.Document;
 
  54 import org.openecomp.sa.searchdbabstraction.entity.DocumentOperationResult;
 
  55 import org.openecomp.sa.searchdbabstraction.entity.ErrorResult;
 
  56 import org.openecomp.sa.searchdbabstraction.entity.OperationResult;
 
  57 import org.openecomp.sa.searchdbabstraction.entity.SearchHit;
 
  58 import org.openecomp.sa.searchdbabstraction.entity.SearchHits;
 
  59 import org.openecomp.sa.searchdbabstraction.entity.SearchOperationResult;
 
  60 import org.openecomp.sa.searchdbabstraction.logging.SearchDbMsgs;
 
  61 import org.openecomp.sa.searchdbabstraction.util.AggregationParsingUtil;
 
  62 import org.openecomp.sa.searchdbabstraction.util.DocumentSchemaUtil;
 
  63 import org.openecomp.sa.searchdbabstraction.util.SearchDbConstants;
 
  65 import java.io.BufferedReader;
 
  67 import java.io.FileInputStream;
 
  68 import java.io.IOException;
 
  69 import java.io.InputStream;
 
  70 import java.io.InputStreamReader;
 
  71 import java.io.OutputStream;
 
  72 import java.io.OutputStreamWriter;
 
  73 import java.io.PrintWriter;
 
  74 import java.io.StringWriter;
 
  75 import java.net.HttpURLConnection;
 
  76 import java.net.MalformedURLException;
 
  77 import java.net.ProtocolException;
 
  79 import java.text.SimpleDateFormat;
 
  80 import java.util.ArrayList;
 
  81 import java.util.List;
 
  82 import java.util.Properties;
 
  83 import java.util.concurrent.atomic.AtomicBoolean;
 
  84 import javax.ws.rs.core.Response.Status;
 
  88  * This class has the Elasticsearch implementation of the
 
  89  * DB operations defined in DocumentStoreInterface.
 
  91 public class ElasticSearchHttpController implements DocumentStoreInterface {
 
  93   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
 
  94       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
 
  95   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
 
  96       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
 
  97   private static final String BULK_IMPORT_INDEX_TEMPLATE =
 
  98       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
 
  99   private static final String BULK_DELETE_TEMPLATE =
 
 100       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
 
 102   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
 
 103       "Internal Error: ElasticSearch operation fault occurred";
 
 104   private static final Logger logger = LoggerFactory.getInstance()
 
 105       .getLogger(ElasticSearchHttpController.class.getName());
 
 106   private static final Logger metricsLogger = LoggerFactory.getInstance()
 
 107       .getMetricsLogger(ElasticSearchHttpController.class.getName());
 
 108   private final ElasticSearchConfig config;
 
 110   private static final String DEFAULT_TYPE = "default";
 
 112   private static ElasticSearchHttpController instance = null;
 
 114   protected AnalysisConfiguration analysisConfig;
 
 116   public static ElasticSearchHttpController getInstance() {
 
 118     synchronized (ElasticSearchHttpController.class) {
 
 120       if (instance == null) {
 
 122         Properties properties = new Properties();
 
 123         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
 
 125           properties.load(new FileInputStream(file));
 
 126         } catch (Exception e) {
 
 127           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
 
 128             "ElasticSearchHTTPController.getInstance",
 
 129             e.getLocalizedMessage());
 
 132         ElasticSearchConfig config = new ElasticSearchConfig(properties);
 
 133         instance = new ElasticSearchHttpController(config);
 
 140   public ElasticSearchHttpController(ElasticSearchConfig config) {
 
 141     this.config = config;
 
 142     analysisConfig = new AnalysisConfiguration();
 
 145       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
 
 147       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
 
 148     } catch (Exception e) {
 
 149       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
 
 150           getFullUrl("", false), e.getMessage());
 
 155   public AnalysisConfiguration getAnalysisConfig() {
 
 156     return analysisConfig;
 
 160   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
 
 162     OperationResult result = new OperationResult();
 
 163     result.setResultCode(500);
 
 167       // Submit the request to ElasticSearch to create the index using a
 
 168       // default document type.
 
 169       result = createTable(index,
 
 171           analysisConfig.getEsIndexSettings(),
 
 172           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
 
 174       // ElasticSearch will return us a 200 code on success when we
 
 175       // want to report a 201, so translate the result here.
 
 176       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
 
 177       if (isSuccess(result)) {
 
 178         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
 
 179         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
 
 182     } catch (DocumentStoreOperationException e) {
 
 184       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
 
 192   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
 
 194     //Initialize operation result with a failure codes / fault string
 
 195     OperationResult opResult = new OperationResult();
 
 196     opResult.setResultCode(500);
 
 197     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 199     // Grab the current time so we can use it to generate a metrics log.
 
 200     MdcOverride override = getStartTime(new MdcOverride());
 
 202     String fullUrl = getFullUrl("/" + indexName + "/", false);
 
 203     HttpURLConnection conn = initializeConnection(fullUrl);
 
 205     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
 
 208       conn.setRequestMethod("DELETE");
 
 209     } catch (ProtocolException e) {
 
 210       shutdownConnection(conn);
 
 211       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
 
 214     handleResponse(conn, opResult);
 
 216     // Generate a metrics log so we can track how long the operation took.
 
 217     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
 
 219             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
 220             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
 
 224     shutdownConnection(conn);
 
 230   private OperationResult checkConnection() throws Exception {
 
 232     String fullUrl = getFullUrl("/_cluster/health", false);
 
 234     HttpURLConnection conn = null;
 
 236     url = new URL(fullUrl);
 
 237     conn = (HttpURLConnection) url.openConnection();
 
 238     conn.setRequestMethod("GET");
 
 239     conn.setDoOutput(true);
 
 240     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
 
 242     int resultCode = conn.getResponseCode();
 
 243     logger.debug("getClusterHealth() response Code : " + resultCode);
 
 244     OperationResult opResult = new OperationResult();
 
 245     opResult.setResultCode(resultCode);
 
 247     shutdownConnection(conn);
 
 252   private String getFullUrl(String resourceUrl, boolean isSecure) {
 
 254     final String host = config.getIpAddress();
 
 255     final String port = config.getHttpPort();
 
 258       return String.format("https://%s:%s%s", host, port, resourceUrl);
 
 260       return String.format("http://%s:%s%s", host, port, resourceUrl);
 
 264   private void shutdownConnection(HttpURLConnection connection) {
 
 265     if (connection == null) {
 
 269     InputStream inputstream = null;
 
 270     OutputStream outputstream = null;
 
 273       inputstream = connection.getInputStream();
 
 274     } catch (IOException e) {
 
 275       logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
 
 277       if (inputstream != null) {
 
 280         } catch (IOException e) {
 
 281           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
 
 282               e.getLocalizedMessage());
 
 288       outputstream = connection.getOutputStream();
 
 289     } catch (IOException e) {
 
 290       logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
 
 292       if (outputstream != null) {
 
 294           outputstream.close();
 
 295         } catch (IOException e) {
 
 296           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
 
 297               e.getLocalizedMessage());
 
 302     connection.disconnect();
 
 306   protected OperationResult createTable(String indexName, String typeName,
 
 307                                         String indexSettings, String indexMappings)
 
 308       throws DocumentStoreOperationException {
 
 310     if (indexSettings == null) {
 
 311       logger.debug("No settings provided.");
 
 314     if (indexMappings == null) {
 
 315       logger.debug("No mappings provided.");
 
 318     OperationResult opResult = new OperationResult();
 
 320     // Initialize operation result with a failure codes / fault string
 
 321     opResult.setResultCode(500);
 
 322     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 324     // Grab the current time so we can use it to generate a metrics log.
 
 325     MdcOverride override = getStartTime(new MdcOverride());
 
 327     String fullUrl = getFullUrl("/" + indexName + "/", false);
 
 328     HttpURLConnection conn = initializeConnection(fullUrl);
 
 331       conn.setRequestMethod("PUT");
 
 332     } catch (ProtocolException e) {
 
 333       shutdownConnection(conn);
 
 334       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
 
 337     StringBuilder sb = new StringBuilder(128);
 
 338     sb.append("{ \"settings\" : ");
 
 339     sb.append(indexSettings);
 
 342     sb.append("\"mappings\" : {");
 
 343     sb.append("\"" + typeName + "\" :");
 
 344     sb.append(indexMappings);
 
 347     attachContent(conn, sb.toString());
 
 349     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
 
 350     logger.debug("Request content: " + sb.toString());
 
 352     handleResponse(conn, opResult);
 
 354     shutdownConnection(conn);
 
 356     // Generate a metrics log so we can track how long the operation took.
 
 357     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
 
 359             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
 360             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
 
 368   public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document)
 
 369       throws DocumentStoreOperationException {
 
 370     if (document.getId() == null || document.getId().isEmpty()) {
 
 371       return createDocumentWithoutId(indexName, document);
 
 373       return createDocumentWithId(indexName, document);
 
 377   private DocumentOperationResult createDocumentWithId(String indexName,
 
 378                                                        DocumentStoreDataEntity document)
 
 379       throws DocumentStoreOperationException {
 
 380     // check if the document already exists
 
 381     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
 
 384     if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
 
 385       if (opResult.getResultCode() == Status.OK.getStatusCode()) {
 
 386         opResult.setFailureCause("A document with the same id already exists.");
 
 388         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
 
 390       opResult.setResultCode(Status.CONFLICT.getStatusCode());
 
 394     opResult = new DocumentOperationResult();
 
 395     // Initialize operation result with a failure codes / fault string
 
 396     opResult.setResultCode(500);
 
 397     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 399     // Grab the current time so we can use it to generate a metrics log.
 
 400     MdcOverride override = getStartTime(new MdcOverride());
 
 402     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
 
 403         + "/" + document.getId(), false);
 
 404     HttpURLConnection conn = initializeConnection(fullUrl);
 
 407       conn.setRequestMethod("PUT");
 
 408     } catch (ProtocolException e) {
 
 409       shutdownConnection(conn);
 
 410       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
 
 413     attachDocument(conn, document);
 
 415     logger.debug("Sending 'PUT' request to: " + conn.getURL());
 
 417     handleResponse(conn, opResult);
 
 418     buildDocumentResult(opResult, indexName);
 
 420     // Generate a metrics log so we can track how long the operation took.
 
 421     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
 
 423             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
 424             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
 
 428     shutdownConnection(conn);
 
 434   private DocumentOperationResult createDocumentWithoutId(String indexName,
 
 435                                                           DocumentStoreDataEntity document)
 
 436       throws DocumentStoreOperationException {
 
 438     DocumentOperationResult response = new DocumentOperationResult();
 
 439     // Initialize operation result with a failure codes / fault string
 
 440     response.setResultCode(500);
 
 441     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 443     // Grab the current time so we can use it to generate a metrics log.
 
 444     MdcOverride override = getStartTime(new MdcOverride());
 
 446     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
 
 447     HttpURLConnection conn = initializeConnection(fullUrl);
 
 450       conn.setRequestMethod("POST");
 
 451     } catch (ProtocolException e) {
 
 452       shutdownConnection(conn);
 
 453       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
 
 456     attachDocument(conn, document);
 
 458     logger.debug("Sending 'POST' request to: " + conn.getURL());
 
 460     handleResponse(conn, response);
 
 461     buildDocumentResult(response, indexName);
 
 463     // Generate a metrics log so we can track how long the operation took.
 
 464     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
 
 466             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
 
 467             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
 
 471     shutdownConnection(conn);
 
 476   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
 
 477       throws DocumentStoreOperationException {
 
 478     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
 
 479     conn.setRequestProperty("Connection", "Close");
 
 481     attachContent(conn, doc.getContentInJson());
 
 484   private DocumentOperationResult checkDocumentExistence(String indexName,
 
 486       throws DocumentStoreOperationException {
 
 487     DocumentOperationResult opResult = new DocumentOperationResult();
 
 489     // Initialize operation result with a failure codes / fault string
 
 490     opResult.setResultCode(500);
 
 492     // Grab the current time so we can use it to generate a metrics log.
 
 493     MdcOverride override = getStartTime(new MdcOverride());
 
 495     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
 
 496     HttpURLConnection conn = initializeConnection(fullUrl);
 
 499       conn.setRequestMethod("HEAD");
 
 500     } catch (ProtocolException e) {
 
 501       shutdownConnection(conn);
 
 502       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
 
 505     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
 
 509       resultCode = conn.getResponseCode();
 
 510     } catch (IOException e) {
 
 511       shutdownConnection(conn);
 
 512       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
 
 515     logger.debug("Response Code : " + resultCode);
 
 517     opResult.setResultCode(resultCode);
 
 519     // Generate a metrics log so we can track how long the operation took.
 
 520     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
 
 522             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
 523             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
 
 528     shutdownConnection(conn);
 
 534   public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document)
 
 535       throws DocumentStoreOperationException {
 
 536     DocumentOperationResult opResult = new DocumentOperationResult();
 
 538     // Initialize operation result with a failure codes / fault string
 
 539     opResult.setResultCode(500);
 
 540     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 542     // Grab the current time so we can use it to generate a metrics log.
 
 543     MdcOverride override = getStartTime(new MdcOverride());
 
 545     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
 
 546         + "?version=" + document.getVersion(), false);
 
 547     HttpURLConnection conn = initializeConnection(fullUrl);
 
 550       conn.setRequestMethod("PUT");
 
 551     } catch (ProtocolException e) {
 
 552       shutdownConnection(conn);
 
 553       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
 
 556     attachDocument(conn, document);
 
 558     logger.debug("Sending 'PUT' request to: " + conn.getURL());
 
 560     handleResponse(conn, opResult);
 
 561     buildDocumentResult(opResult, indexName);
 
 563     // Generate a metrics log so we can track how long the operation took.
 
 564     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
 
 566             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
 567             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
 
 572     shutdownConnection(conn);
 
 578   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
 
 579       throws DocumentStoreOperationException {
 
 580     DocumentOperationResult opResult = new DocumentOperationResult();
 
 582     // Initialize operation result with a failure codes / fault string
 
 583     opResult.setResultCode(500);
 
 584     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 586     // Grab the current time so we can use it to generate a metrics log.
 
 587     MdcOverride override = getStartTime(new MdcOverride());
 
 589     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
 
 590         + "?version=" + document.getVersion(), false);
 
 591     HttpURLConnection conn = initializeConnection(fullUrl);
 
 594       conn.setRequestMethod("DELETE");
 
 595     } catch (ProtocolException e) {
 
 596       shutdownConnection(conn);
 
 597       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
 
 600     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
 
 602     handleResponse(conn, opResult);
 
 603     buildDocumentResult(opResult, indexName);
 
 604     //supress the etag and url in response for delete as they are not required
 
 605     if (opResult.getDocument() != null) {
 
 606       opResult.getDocument().setEtag(null);
 
 607       opResult.getDocument().setUrl(null);
 
 610     // Generate a metrics log so we can track how long the operation took.
 
 611     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
 
 613             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
 
 614             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
 
 619     shutdownConnection(conn);
 
 625   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
 
 626       throws DocumentStoreOperationException {
 
 627     DocumentOperationResult opResult = new DocumentOperationResult();
 
 629     // Initialize operation result with a failure codes / fault string
 
 630     opResult.setResultCode(500);
 
 631     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 633     // Grab the current time so we can use it to generate a metrics log.
 
 634     MdcOverride override = getStartTime(new MdcOverride());
 
 636     String fullUrl = null;
 
 637     if (document.getVersion() == null) {
 
 638       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
 
 640       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
 
 641           + "?version=" + document.getVersion(), false);
 
 643     HttpURLConnection conn = initializeConnection(fullUrl);
 
 645     logger.debug("\nSending 'GET' request to: " + conn.getURL());
 
 647     handleResponse(conn, opResult);
 
 648     buildDocumentResult(opResult, indexName);
 
 650     // Generate a metrics log so we can track how long the operation took.
 
 651     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
 
 653             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
 654             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
 
 659     shutdownConnection(conn);
 
 664   public SearchOperationResult search(String indexName, String queryString)
 
 665       throws DocumentStoreOperationException {
 
 666     SearchOperationResult opResult = new SearchOperationResult();
 
 668     // Initialize operation result with a failure codes / fault string
 
 669     opResult.setResultCode(500);
 
 670     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 672     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
 
 674     // Grab the current time so we can use it to generate a metrics log.
 
 675     MdcOverride override = getStartTime(new MdcOverride());
 
 677     HttpURLConnection conn = initializeConnection(fullUrl);
 
 680       conn.setRequestMethod("GET");
 
 681     } catch (ProtocolException e) {
 
 682       shutdownConnection(conn);
 
 683       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
 
 686     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
 
 688     handleResponse(conn, opResult);
 
 689     buildSearchResult(opResult, indexName);
 
 692     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
 
 694             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
 695             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
 
 703   public SearchOperationResult searchWithPayload(String indexName, String query)
 
 704       throws DocumentStoreOperationException {
 
 705     SearchOperationResult opResult = new SearchOperationResult();
 
 707     if (logger.isDebugEnabled()) {
 
 708       logger.debug("Querying index: " + indexName + " with query string: " + query);
 
 711     // Initialize operation result with a failure codes / fault string
 
 712     opResult.setResultCode(500);
 
 713     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
 
 715     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
 
 717     // Grab the current time so we can use it to generate a metrics log.
 
 718     MdcOverride override = getStartTime(new MdcOverride());
 
 720     HttpURLConnection conn = initializeConnection(fullUrl);
 
 723       conn.setRequestMethod("POST");
 
 724     } catch (ProtocolException e) {
 
 725       shutdownConnection(conn);
 
 726       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
 
 729     attachContent(conn, query);
 
 731     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
 
 732     logger.debug("Request body =  Elasticsearch query = " + query);
 
 734     handleResponse(conn, opResult);
 
 735     buildSearchResult(opResult, indexName);
 
 737     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
 
 739             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
 740             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
 
 745     shutdownConnection(conn);
 
 750   private void attachContent(HttpURLConnection conn, String content)
 
 751       throws DocumentStoreOperationException {
 
 752     OutputStream outputStream = null;
 
 753     OutputStreamWriter out = null;
 
 756       outputStream = conn.getOutputStream();
 
 757     } catch (IOException e) {
 
 758       shutdownConnection(conn);
 
 759       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
 
 762     out = new OutputStreamWriter(outputStream);
 
 767     } catch (IOException e) {
 
 768       shutdownConnection(conn);
 
 769       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
 
 773   private HttpURLConnection initializeConnection(String fullUrl)
 
 774       throws DocumentStoreOperationException {
 
 776     HttpURLConnection conn = null;
 
 779       url = new URL(fullUrl);
 
 780     } catch (MalformedURLException e) {
 
 781       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
 
 785       conn = (HttpURLConnection) url.openConnection();
 
 786       conn.setDoOutput(true);
 
 787     } catch (IOException e) {
 
 788       shutdownConnection(conn);
 
 789       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
 
 795   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
 
 796       throws DocumentStoreOperationException {
 
 797     int resultCode = 200;
 
 800       resultCode = conn.getResponseCode();
 
 801     } catch (IOException e) {
 
 802       shutdownConnection(conn);
 
 803       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
 
 806     logger.debug("Response Code : " + resultCode);
 
 808     InputStream inputStream = null;
 
 810     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
 
 811       inputStream = conn.getErrorStream();
 
 814         inputStream = conn.getInputStream();
 
 815       } catch (IOException e) {
 
 816         shutdownConnection(conn);
 
 817         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
 
 821     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
 
 822     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
 
 824     StringBuilder result = new StringBuilder(128);
 
 825     String string = null;
 
 828       while ((string = bufferedreader.readLine()) != null) {
 
 829         result.append(string).append("\n");
 
 831     } catch (IOException e) {
 
 832       shutdownConnection(conn);
 
 833       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
 
 836     if (resultCode == Status.CONFLICT.getStatusCode()) {
 
 837       opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
 
 839       opResult.setResultCode(resultCode);
 
 841     if (logger.isDebugEnabled()) {
 
 842       logger.debug("Raw result string from ElasticSearch = " + result.toString());
 
 844     opResult.setResult(result.toString());
 
 845     opResult.setResultVersion(extractVersion(result.toString()));
 
 848   private String extractVersion(String result) throws DocumentStoreOperationException {
 
 850     JSONParser parser = new JSONParser();
 
 851     String version = null;
 
 853       JSONObject root = (JSONObject) parser.parse(result);
 
 854       if (root.get("_version") != null) {
 
 855         version = root.get("_version").toString();
 
 858     } catch (ParseException e) {
 
 860       // Not all responses from ElasticSearch include a version, so
 
 861       // if we don't get one back, just return an empty string rather
 
 862       // than trigger a false failure.
 
 869    * This convenience method gets the current system time and stores
 
 870    * it in an attribute in the supplied {@link MdcOverride} object so
 
 871    * that it can be used later by the metrics logger.
 
 873    * @param override - The {@link MdcOverride} object to update.
 
 874    * @return - The supplied {@link MdcOverride} object.
 
 876   private MdcOverride getStartTime(MdcOverride override) {
 
 878     // Grab the current time...
 
 879     long startTimeInMs = System.currentTimeMillis();
 
 881     // ...and add it as an attribute to the supplied MDC Override
 
 883     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
 
 884     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
 
 886     // Return the MdcOverride object that we were passed.
 
 887     // This looks odd, but it allows us to do stuff like:
 
 889     //    MdcOverride ov = getStartTime(new MdcOverride())
 
 891     // which is quite handy, but also allows us to pass in an existing
 
 892     // MdcOverride object which already has some attributes set.
 
 896   private boolean isSuccess(OperationResult result) {
 
 898     return isSuccessCode(result.getResultCode());
 
 902   private boolean isSuccessCode(int statusCode) {
 
 903     return ((statusCode >= 200) && (statusCode < 300));
 
 908   public OperationResult performBulkOperations(BulkRequest[] requests)
 
 909       throws DocumentStoreOperationException {
 
 911     if (logger.isDebugEnabled()) {
 
 912       String dbgString = "ESController: performBulkOperations - Operations: ";
 
 914       for (BulkRequest request : requests) {
 
 915         dbgString += "[" + request.toString() + "] ";
 
 918       logger.debug(dbgString);
 
 921     // Grab the current time so we can use it to generate a metrics log.
 
 922     MdcOverride override = getStartTime(new MdcOverride());
 
 924     // Parse the supplied set of operations.
 
 925     // Iterate over the list of operations which we were provided and
 
 926     // translate them into a format that ElasticSearh understands.
 
 928     StringBuilder esOperationSet = new StringBuilder(128);
 
 929     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
 
 930     for (BulkRequest request : requests) {
 
 932       // Convert the request to the syntax ElasticSearch likes.
 
 933       if (buildEsOperation(request, esOperationSet, rejected)) {
 
 938     ElasticSearchBulkOperationResult opResult = null;
 
 941       // Open an HTTP connection to the ElasticSearch back end.
 
 942       String fullUrl = getFullUrl("/_bulk", false);
 
 944       HttpURLConnection conn;
 
 947         url = new URL(fullUrl);
 
 948         conn = (HttpURLConnection) url.openConnection();
 
 949         conn.setRequestMethod("PUT");
 
 950         conn.setDoOutput(true);
 
 951         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
 
 952         conn.setRequestProperty("Connection", "Close");
 
 954       } catch (IOException e) {
 
 956         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
 
 957         if (logger.isDebugEnabled()) {
 
 958           logger.debug(Throwables.getStackTraceAsString(e));
 
 961         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
 
 962             + e.getMessage(), e);
 
 965       StringBuilder bulkResult = new StringBuilder(128);
 
 967         // Create an output stream to write our request to.
 
 968         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
 
 971         if (logger.isDebugEnabled()) {
 
 972           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
 
 973           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
 
 977         // Write the resulting request string to our output stream. (this sends the request to ES?)
 
 978         out.write(esOperationSet.toString());
 
 981         // Open an input stream on our connection in order to read back the results.
 
 982         InputStream is = conn.getInputStream();
 
 983         InputStreamReader inputstreamreader = new InputStreamReader(is);
 
 984         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
 
 986         // Read the contents of the input stream into our result string...
 
 987         String esResponseString = null;
 
 989         while ((esResponseString = bufferedreader.readLine()) != null) {
 
 990           bulkResult.append(esResponseString).append("\n");
 
 993       } catch (IOException e) {
 
 995         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
 
 996         if (logger.isDebugEnabled()) {
 
 997           StringWriter sw = new StringWriter();
 
 998           e.printStackTrace(new PrintWriter(sw));
 
 999           logger.debug(sw.toString());
 
1002         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
 
1003             + e.getMessage(), e);
 
1006       if (logger.isDebugEnabled()) {
 
1007         logger.debug("ESController: Received result string from ElasticSearch: = "
 
1008             + bulkResult.toString());
 
1011       // ...and marshal the resulting string into a Java object.
 
1013         opResult = marshallEsBulkResult(bulkResult.toString());
 
1015       } catch (IOException e) {
 
1017         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
 
1018         if (logger.isDebugEnabled()) {
 
1019           logger.debug(Throwables.getStackTraceAsString(e));
 
1022         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
 
1023             + e.getMessage(), e);
 
1027     // Finally, build the operation result and return it to the caller.
 
1028     OperationResult result = new OperationResult();
 
1029     result.setResultCode(207);
 
1030     result.setResult(buildGenericBulkResultSet(opResult, rejected));
 
1032     // In the success case we don't want the entire result string to be
 
1033     // dumped into the metrics log, so concatenate it.
 
1034     String resultStringForMetricsLog = result.getResult();
 
1035     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
 
1036       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
 
1037           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
 
1040     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
 
1042             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
 
1043             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
 
1051    * This method converts a {@link BulkRequest} object into a json structure
 
1052    * which can be understood by ElasticSearch.
 
1054    * @param request - The request to be performed.
 
1055    * @param sb      - The string builder to append the json data to
 
1056    * @throws DocumentStoreOperationException
 
1058   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
 
1059                                    List<ElasticSearchResultItem> fails)
 
1060       throws DocumentStoreOperationException {
 
1062     boolean retVal = true;
 
1063     OperationResult indexExistsResult = null;
 
1065     // What kind of operation are we performing?
 
1066     switch (request.getOperationType()) {
 
1068       // Create a new document.
 
1071         // Make sure that we were supplied a document payload.
 
1072         if (request.getOperation().getDocument() == null) {
 
1074           fails.add(generateRejectionEntry(request.getOperationType(),
 
1075               "Missing document payload",
 
1079               request.getOperation().getMetaData().getUrl()));
 
1083         // Make sure that the supplied document URL is formatted
 
1085         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
 
1086           fails.add(generateRejectionEntry(request.getOperationType(),
 
1087               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
 
1091               request.getOperation().getMetaData().getUrl()));
 
1095         // Validate that the specified index actually exists before we
 
1096         // try to perform the create.
 
1097         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
 
1099           fails.add(generateRejectionEntry(request.getOperationType(),
 
1100               "Specified resource does not exist: "
 
1101                   + request.getOperation().getMetaData().getUrl(),
 
1105               request.getOperation().getMetaData().getUrl()));
 
1109         // If we were supplied an id for the new document, then
 
1110         // include it in the bulk operation to Elastic Search
 
1111         if (request.getId() == null) {
 
1113           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
 
1117           // Otherwise, we just leave that parameter off and ElasticSearch
 
1118           // will generate one for us.
 
1120           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
 
1127           // Append the document that we want to create.
 
1128           sb.append(request.getOperation().getDocument().toJson()).append("\n");
 
1129         } catch (JsonProcessingException e) {
 
1130           throw new DocumentStoreOperationException("Failure parsing document to json", e);
 
1135       // Update an existing document.
 
1138         // Make sure that we were supplied a document payload.
 
1139         if (request.getOperation().getDocument() == null) {
 
1141           fails.add(generateRejectionEntry(request.getOperationType(),
 
1142               "Missing document payload",
 
1146               request.getOperation().getMetaData().getUrl()));
 
1150         // Make sure that the supplied document URL is formatted
 
1152         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
 
1153           fails.add(generateRejectionEntry(request.getOperationType(),
 
1154               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
 
1158               request.getOperation().getMetaData().getUrl()));
 
1162         // Validate that the specified index actually exists before we
 
1163         // try to perform the update.
 
1164         if (!indexExists(request.getIndex())) {
 
1166           fails.add(generateRejectionEntry(request.getOperationType(),
 
1167               "Specified resource does not exist: "
 
1168                   + request.getOperation().getMetaData().getUrl(),
 
1172               request.getOperation().getMetaData().getUrl()));
 
1176         // Validate that the document we are trying to update actually
 
1177         // exists before we try to perform the update.
 
1178         if (!documentExists(request.getIndex(), request.getId())) {
 
1180           fails.add(generateRejectionEntry(request.getOperationType(),
 
1181               "Specified resource does not exist: "
 
1182                   + request.getOperation().getMetaData().getUrl(),
 
1186               request.getOperation().getMetaData().getUrl()));
 
1190         // It is mandatory that a version be supplied for an update operation,
 
1191         // so validate that now.
 
1192         if (request.getOperation().getMetaData().getEtag() == null) {
 
1194           fails.add(generateRejectionEntry(request.getOperationType(),
 
1195               "Missing mandatory ETag field",
 
1199               request.getOperation().getMetaData().getUrl()));
 
1203         // Generate the update request...
 
1204         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
 
1208             request.getOperation().getMetaData().getEtag()));
 
1210         // ...and append the document that we want to update.
 
1212           sb.append(request.getOperation().getDocument().toJson()).append("\n");
 
1213         } catch (JsonProcessingException e) {
 
1214           throw new DocumentStoreOperationException("Failure parsing document to json", e);
 
1218       // Delete an existing document.
 
1221         // Make sure that the supplied document URL is formatted
 
1223         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
 
1224           fails.add(generateRejectionEntry(request.getOperationType(),
 
1225               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
 
1229               request.getOperation().getMetaData().getUrl()));
 
1233         // Validate that the specified index actually exists before we
 
1234         // try to perform the delete.
 
1235         if (!indexExists(request.getIndex())) {
 
1237           fails.add(generateRejectionEntry(request.getOperationType(),
 
1238               "Specified resource does not exist: "
 
1239                   + request.getOperation().getMetaData().getUrl(),
 
1243               request.getOperation().getMetaData().getUrl()));
 
1247         // Validate that the document we are trying to update actually
 
1248         // exists before we try to perform the delete.
 
1249         if (!documentExists(request.getIndex(), request.getId())) {
 
1251           fails.add(generateRejectionEntry(request.getOperationType(),
 
1252               "Specified resource does not exist: "
 
1253                   + request.getOperation().getMetaData().getUrl(),
 
1257               request.getOperation().getMetaData().getUrl()));
 
1261         // It is mandatory that a version be supplied for a delete operation,
 
1262         // so validate that now.
 
1263         if (request.getOperation().getMetaData().getEtag() == null) {
 
1265           fails.add(generateRejectionEntry(request.getOperationType(),
 
1266               "Missing mandatory ETag field",
 
1270               request.getOperation().getMetaData().getUrl()));
 
1274         // Generate the delete request.
 
1275         sb.append(String.format(BULK_DELETE_TEMPLATE,
 
1279             request.getOperation().getMetaData().getEtag()));
 
1287   private boolean indexExists(String index) throws DocumentStoreOperationException {
 
1289     OperationResult indexExistsResult = checkIndexExistence(index);
 
1291     return ((indexExistsResult.getResultCode() >= 200)
 
1292         && (indexExistsResult.getResultCode() < 300));
 
1295   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
 
1297     OperationResult docExistsResult = checkDocumentExistence(index, id);
 
1299     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
 
1303    * This method constructs a status entry for a bulk operation which has
 
1304    * been rejected before even sending it to the document store.
 
1306    * @param rejectReason - A message describing why the operation was rejected.
 
1307    * @param anId         - The identifier associated with the document being
 
1309    * @param statusCode  - An HTTP status code.
 
1310    * @return - A result set item.
 
1312   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
 
1313                                                          String rejectReason,
 
1317                                                          String originalUrl) {
 
1319     ElasticSearchError err = new ElasticSearchError();
 
1320     err.setReason(rejectReason);
 
1322     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
 
1325     op.setStatus(statusCode);
 
1327     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
 
1329     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
 
1333         rejectionResult.setCreate(op);
 
1336         rejectionResult.setIndex(op);
 
1339         rejectionResult.setDelete(op);
 
1344     return rejectionResult;
 
1349    * This method takes the json structure returned from ElasticSearch in
 
1350    * response to a bulk operations request and marshals it into a Java
 
1353    * @param jsonResult - The bulk operations response returned from
 
1355    * @return - The marshalled response.
 
1356    * @throws JsonParseException
 
1357    * @throws JsonMappingException
 
1358    * @throws IOException
 
1360   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
 
1361       throws JsonParseException, JsonMappingException, IOException {
 
1363     if (jsonResult != null) {
 
1364       if (logger.isDebugEnabled()) {
 
1365         logger.debug("ESController: Marshalling ES result set from json: "
 
1366             + jsonResult.replaceAll("\n", ""));
 
1369       ObjectMapper mapper = new ObjectMapper();
 
1370       mapper.setSerializationInclusion(Include.NON_EMPTY);
 
1372       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
 
1380    * This method takes the marshalled ElasticSearch bulk response and
 
1381    * converts it into a generic response payload.
 
1383    * @param esResult - ElasticSearch bulk operations response.
 
1384    * @return - A generic result set.
 
1386   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
 
1387                                            List<ElasticSearchResultItem> rejectedOps) {
 
1390     int totalSuccess = 0;
 
1393     if (logger.isDebugEnabled()) {
 
1395       logger.debug("ESController: Build generic result set.  ES Results: "
 
1396           + ((esResult != null) ? esResult.toString() : "[]")
 
1397           + " Rejected Ops: " + rejectedOps.toString());
 
1400     // Build a combined list of result items from the results returned
 
1401     // from ElasticSearch and the list of operations that we rejected
 
1402     // without sending to ElasticSearch.
 
1403     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
 
1404     if (esResult != null) {
 
1405       combinedResults.addAll(Arrays.asList(esResult.getItems()));
 
1407     combinedResults.addAll(rejectedOps);
 
1409     // Iterate over the individual results in the resulting result set.
 
1410     StringBuilder resultsBuilder = new StringBuilder();
 
1411     AtomicBoolean firstItem = new AtomicBoolean(true);
 
1412     for (ElasticSearchResultItem item : combinedResults) {
 
1414       // Increment the operation counts.
 
1416       if (isSuccessCode(item.operationStatus().getStatus())) {
 
1422       // Prepend a comma to our response string unless this it the
 
1423       // first result in the set.
 
1424       if (!firstItem.compareAndSet(true, false)) {
 
1425         resultsBuilder.append(", ");
 
1428       // Append the current result as a generic json structure.
 
1429       resultsBuilder.append(item.toJson());
 
1432     // Now, build the result string and return it.
 
1433     String responseBody = "{ \"total_operations\": " + totalOps + ", "
 
1434         + "\"total_success\": " + totalSuccess + ", "
 
1435         + "\"total_fails\": " + totalFails + ", "
 
1437         + resultsBuilder.toString()
 
1440     return responseBody;
 
1445    * This method queryies ElasticSearch to determine if the supplied
 
1446    * index is present in the document store.
 
1448    * @param indexName - The index to look for.
 
1449    * @return - An operation result indicating the success or failure of
 
1451    * @throws DocumentStoreOperationException
 
1453   public OperationResult checkIndexExistence(String indexName)
 
1454       throws DocumentStoreOperationException {
 
1456     // Initialize operation result with a failure codes / fault string
 
1457     OperationResult opResult = new OperationResult();
 
1458     opResult.setResultCode(500);
 
1460     // Grab the current time so we can use it to generate a metrics log.
 
1461     MdcOverride override = getStartTime(new MdcOverride());
 
1463     String fullUrl = getFullUrl("/" + indexName, false);
 
1464     HttpURLConnection conn = initializeConnection(fullUrl);
 
1467       conn.setRequestMethod("HEAD");
 
1469     } catch (ProtocolException e) {
 
1470       shutdownConnection(conn);
 
1471       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
 
1474     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
 
1478       resultCode = conn.getResponseCode();
 
1479     } catch (IOException e) {
 
1480       shutdownConnection(conn);
 
1481       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
 
1483     logger.debug("Response Code : " + resultCode);
 
1485     opResult.setResultCode(resultCode);
 
1487     // Generate a metrics log so we can track how long the operation took.
 
1488     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
 
1490             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
 
1491             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
 
1495     shutdownConnection(conn);
 
1501   private void buildDocumentResult(DocumentOperationResult result, String index)
 
1502       throws DocumentStoreOperationException {
 
1504     JSONParser parser = new JSONParser();
 
1507       root = (JSONObject) parser.parse(result.getResult());
 
1509       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
 
1510         // Success response object
 
1511         Document doc = new Document();
 
1512         doc.setEtag(result.getResultVersion());
 
1513         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
 
1515         doc.setContent((JSONObject) root.get("_source"));
 
1516         result.setDocument(doc);
 
1519         // Error response object
 
1520         JSONObject error = (JSONObject) root.get("error");
 
1521         if (error != null) {
 
1522           result.setError(new ErrorResult(error.get("type").toString(),
 
1523               error.get("reason").toString()));
 
1527     } catch (Exception e) {
 
1528       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
 
1529           + result.getResult());
 
1535   private String buildDocumentResponseUrl(String index, String id) {
 
1536     return ApiUtils.buildDocumentUri(index, id);
 
1539   private void buildSearchResult(SearchOperationResult result, String index)
 
1540       throws DocumentStoreOperationException {
 
1542     JSONParser parser = new JSONParser();
 
1546       root = (JSONObject) parser.parse(result.getResult());
 
1547       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
 
1548         JSONObject hits = (JSONObject) root.get("hits");
 
1549         JSONArray hitArray = (JSONArray) hits.get("hits");
 
1550         SearchHits searchHits = new SearchHits();
 
1551         searchHits.setTotalHits(hits.get("total").toString());
 
1552         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
 
1554         for (int i = 0; i < hitArray.size(); i++) {
 
1555           JSONObject hit = (JSONObject) hitArray.get(i);
 
1556           SearchHit searchHit = new SearchHit();
 
1557           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
 
1558           Document doc = new Document();
 
1559           if (hit.get("_version") != null) {
 
1560             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
 
1563           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
 
1564               ? hit.get("_id").toString() : ""));
 
1565           doc.setContent((JSONObject) hit.get("_source"));
 
1566           searchHit.setDocument(doc);
 
1567           searchHitArray.add(searchHit);
 
1569         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
 
1570         result.setSearchResult(searchHits);
 
1572         JSONObject aggregations = (JSONObject) root.get("aggregations");
 
1573         if (aggregations != null) {
 
1574           AggregationResult[] aggResults =
 
1575               AggregationParsingUtil.parseAggregationResults(aggregations);
 
1576           AggregationResults aggs = new AggregationResults();
 
1577           aggs.setAggregations(aggResults);
 
1578           result.setAggregationResult(aggs);
 
1583         JSONObject error = (JSONObject) root.get("error");
 
1584         if (error != null) {
 
1585           result.setError(new ErrorResult(error.get("type").toString(),
 
1586               error.get("reason").toString()));
 
1589     } catch (Exception e) {
 
1590       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
 
1591           + result.getResult());