2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
 
   6  * Copyright © 2017-2018 Amdocs
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *       http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
 
  24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
 
  25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
 
  26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
 
  28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
 
  29 import com.fasterxml.jackson.core.JsonParseException;
 
  30 import com.fasterxml.jackson.core.JsonProcessingException;
 
  31 import com.fasterxml.jackson.databind.JsonMappingException;
 
  32 import com.fasterxml.jackson.databind.ObjectMapper;
 
  33 import com.google.common.base.Throwables;
 
  34 import java.io.BufferedReader;
 
  36 import java.io.FileInputStream;
 
  37 import java.io.IOException;
 
  38 import java.io.InputStream;
 
  39 import java.io.InputStreamReader;
 
  40 import java.io.OutputStream;
 
  41 import java.io.OutputStreamWriter;
 
  42 import java.io.PrintWriter;
 
  43 import java.io.StringWriter;
 
  44 import java.net.HttpURLConnection;
 
  45 import java.net.MalformedURLException;
 
  46 import java.net.ProtocolException;
 
  48 import java.text.SimpleDateFormat;
 
  49 import java.util.ArrayList;
 
  50 import java.util.Arrays;
 
  51 import java.util.List;
 
  52 import java.util.Properties;
 
  53 import java.util.concurrent.atomic.AtomicBoolean;
 
  54 import javax.ws.rs.core.Response.Status;
 
  55 import javax.ws.rs.core.Response.Status.Family;
 
  56 import org.eclipse.jetty.http.HttpStatus;
 
  57 import org.json.simple.JSONArray;
 
  58 import org.json.simple.JSONObject;
 
  59 import org.json.simple.parser.JSONParser;
 
  60 import org.json.simple.parser.ParseException;
 
  61 import org.onap.aai.cl.api.LogFields;
 
  62 import org.onap.aai.cl.api.LogLine;
 
  63 import org.onap.aai.cl.api.Logger;
 
  64 import org.onap.aai.cl.eelf.LoggerFactory;
 
  65 import org.onap.aai.cl.mdc.MdcContext;
 
  66 import org.onap.aai.cl.mdc.MdcOverride;
 
  67 import org.onap.aai.sa.rest.AnalysisConfiguration;
 
  68 import org.onap.aai.sa.rest.ApiUtils;
 
  69 import org.onap.aai.sa.rest.BulkRequest;
 
  70 import org.onap.aai.sa.rest.BulkRequest.OperationType;
 
  71 import org.onap.aai.sa.rest.DocumentSchema;
 
  72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
 
  73 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
 
  74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
 
  75 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
 
  76 import org.onap.aai.sa.searchdbabstraction.entity.Document;
 
  77 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
 
  78 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
 
  79 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
 
  80 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
 
  81 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
 
  82 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
 
  83 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
 
  84 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
 
  85 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
 
  86 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
 
  87 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
 
  88 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
 
  89 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
 
  90 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
 
  91 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
 
  94  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
 
  96 public class ElasticSearchHttpController implements DocumentStoreInterface {
 
  98     private static ElasticSearchHttpController instance = null;
 
 100     private static final Logger logger =
 
 101             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
 
 102     private static final Logger metricsLogger =
 
 103             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
 
 105     private static final String JSON_ATTR_VERSION = "_version";
 
 106     private static final String JSON_ATTR_ERROR = "error";
 
 107     private static final String JSON_ATTR_REASON = "reason";
 
 109     private static final String DEFAULT_TYPE = "default";
 
 110     private static final String QUERY_PARAM_VERSION = "?version=";
 
 112     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
 
 113     private static final String MSG_RESPONSE_CODE = "Response Code : ";
 
 114     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
 
 115     private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
 
 116     private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
 
 117     private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
 
 118             "Failed to get the response code from the connection.";
 
 119     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
 
 121     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
 
 122             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
 
 123     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
 
 124             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
 
 125     private static final String BULK_IMPORT_INDEX_TEMPLATE =
 
 126             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
 
 127     private static final String BULK_DELETE_TEMPLATE =
 
 128             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
 
 130     private final ElasticSearchConfig config;
 
 132     protected AnalysisConfiguration analysisConfig;
 
 135     public ElasticSearchHttpController(ElasticSearchConfig config) {
 
 136         this.config = config;
 
 137         analysisConfig = new AnalysisConfiguration();
 
 140             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
 
 142             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
 
 143         } catch (Exception e) {
 
 144             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
 
 149     public static ElasticSearchHttpController getInstance() {
 
 150         synchronized (ElasticSearchHttpController.class) {
 
 151             if (instance == null) {
 
 152                 Properties properties = new Properties();
 
 153                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
 
 155                     properties.load(new FileInputStream(file));
 
 156                 } catch (Exception e) {
 
 157                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
 
 158                             e.getLocalizedMessage());
 
 161                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
 
 162                 instance = new ElasticSearchHttpController(config);
 
 169     public AnalysisConfiguration getAnalysisConfig() {
 
 170         return analysisConfig;
 
 174     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
 
 176             // Submit the request to ElasticSearch to create the index using a default document type.
 
 177             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
 
 178                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
 
 180             // ElasticSearch will return us a 200 code on success when we
 
 181             // want to report a 201, so translate the result here.
 
 182             if (result.getResultCode() == Status.OK.getStatusCode()) {
 
 183                 result.setResultCode(Status.CREATED.getStatusCode());
 
 186             if (isSuccess(result)) {
 
 187                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
 
 190         } catch (DocumentStoreOperationException | IOException e) {
 
 191             return new OperationResultBuilder().useDefaults()
 
 192                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
 
 198     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
 
 200             OperationResult result = createTable(index, dynamicSchema);
 
 202             // ElasticSearch will return us a 200 code on success when we
 
 203             // want to report a 201, so translate the result here.
 
 204             if (result.getResultCode() == Status.OK.getStatusCode()) {
 
 205                 result.setResultCode(Status.CREATED.getStatusCode());
 
 207             if (isSuccess(result)) {
 
 208                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
 
 211         } catch (DocumentStoreOperationException e) {
 
 212             return new OperationResultBuilder().useDefaults()
 
 213                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
 
 218     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
 
 219         // Grab the current time so we can use it to generate a metrics log.
 
 220         MdcOverride override = getStartTime(new MdcOverride());
 
 222         String fullUrl = getFullUrl("/" + indexName + "/", false);
 
 223         HttpURLConnection conn = initializeConnection(fullUrl);
 
 225         logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
 
 228             conn.setRequestMethod("DELETE");
 
 229         } catch (ProtocolException e) {
 
 230             shutdownConnection(conn);
 
 231             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
 
 234         OperationResult opResult = handleResponse(conn);
 
 235         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
 
 236         shutdownConnection(conn);
 
 241     private OperationResult checkConnection() throws IOException {
 
 242         String fullUrl = getFullUrl("/_cluster/health", false);
 
 244         HttpURLConnection conn = null;
 
 246         url = new URL(fullUrl);
 
 247         conn = (HttpURLConnection) url.openConnection();
 
 248         conn.setRequestMethod("GET");
 
 249         conn.setDoOutput(true);
 
 250         logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
 
 252         int resultCode = conn.getResponseCode();
 
 253         logger.debug("getClusterHealth() response Code : " + resultCode);
 
 255         shutdownConnection(conn);
 
 257         return new OperationResultBuilder().resultCode(resultCode).build();
 
 260     private String getFullUrl(String resourceUrl, boolean isSecure) {
 
 262         final String host = config.getIpAddress();
 
 263         final String port = config.getHttpPort();
 
 266             return String.format("https://%s:%s%s", host, port, resourceUrl);
 
 268             return String.format("http://%s:%s%s", host, port, resourceUrl);
 
 272     private void shutdownConnection(HttpURLConnection connection) {
 
 273         if (connection == null) {
 
 277         final String methodName = "shutdownConnection";
 
 278         InputStream inputstream = null;
 
 279         OutputStream outputstream = null;
 
 282             inputstream = connection.getInputStream();
 
 283         } catch (IOException e) {
 
 284             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
 
 286             if (inputstream != null) {
 
 289                 } catch (IOException e) {
 
 290                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
 
 296             outputstream = connection.getOutputStream();
 
 297         } catch (IOException e) {
 
 298             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
 
 300             if (outputstream != null) {
 
 302                     outputstream.close();
 
 303                 } catch (IOException e) {
 
 304                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
 
 309         connection.disconnect();
 
 313     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
 
 314             throws DocumentStoreOperationException {
 
 315         if (indexSettings == null) {
 
 316             logger.debug("No settings provided.");
 
 319         if (indexMappings == null) {
 
 320             logger.debug("No mappings provided.");
 
 323         // Grab the current time so we can use it to generate a metrics log.
 
 324         MdcOverride override = getStartTime(new MdcOverride());
 
 326         String fullUrl = getFullUrl("/" + indexName + "/", false);
 
 327         HttpURLConnection conn = initializeConnection(fullUrl);
 
 330             conn.setRequestMethod("PUT");
 
 331         } catch (ProtocolException e) {
 
 332             shutdownConnection(conn);
 
 333             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
 
 336         StringBuilder sb = new StringBuilder(128);
 
 337         sb.append("{ \"settings\" : ");
 
 338         sb.append(indexSettings);
 
 341         sb.append("\"mappings\" : {");
 
 342         sb.append("\"" + typeName + "\" :");
 
 343         sb.append(indexMappings);
 
 347             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
 
 348         } catch (IOException e) {
 
 349             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
 
 350             throw new DocumentStoreOperationException(e.getMessage(), e);
 
 353         logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
 
 354         logger.debug("Request content: " + sb);
 
 356         OperationResult opResult = handleResponse(conn);
 
 357         shutdownConnection(conn);
 
 358         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
 
 364      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
 
 367      * @param indexName - The name of the index to be created
 
 368      * @param settingsAndMappings - The actual JSON object that will define the index
 
 369      * @return - The operation result of writing into Elasticsearch
 
 370      * @throws DocumentStoreOperationException
 
 372     protected OperationResult createTable(String indexName, String settingsAndMappings)
 
 373             throws DocumentStoreOperationException {
 
 374         // Grab the current time so we can use it to generate a metrics log.
 
 375         MdcOverride override = getStartTime(new MdcOverride());
 
 377         String fullUrl = getFullUrl("/" + indexName + "/", false);
 
 378         HttpURLConnection conn = initializeConnection(fullUrl);
 
 381             conn.setRequestMethod("PUT");
 
 382         } catch (ProtocolException e) {
 
 383             shutdownConnection(conn);
 
 384             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
 
 388             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
 
 389         } catch (IOException e) {
 
 390             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
 
 391             throw new DocumentStoreOperationException(e.getMessage());
 
 394         OperationResult result = handleResponse(conn);
 
 395         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
 
 401     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
 
 402             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
 
 404         if (!allowImplicitIndexCreation) {
 
 405             // Before we do anything, make sure that the specified index actually exists in the
 
 406             // document store - we don't want to rely on ElasticSearch to fail the document
 
 407             // create because it could be configured to implicitly create a non-existent index,
 
 408             // which can lead to hard-to-debug behaviour with queries down the road.
 
 409             OperationResult indexExistsResult = checkIndexExistence(indexName);
 
 410             if (!isSuccess(indexExistsResult)) {
 
 411                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
 
 412                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
 
 413                         .result(resultMsg).failureCause(resultMsg).build();
 
 417         if (document.getId() == null || document.getId().isEmpty()) {
 
 418             return createDocumentWithoutId(indexName, document);
 
 420             return createDocumentWithId(indexName, document);
 
 424     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
 
 425             throws DocumentStoreOperationException {
 
 426         // check if the document already exists
 
 427         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
 
 429         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
 
 430             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
 
 431                 opResult.setFailureCause("A document with the same id already exists.");
 
 433                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
 
 435             opResult.setResultCode(Status.CONFLICT.getStatusCode());
 
 439         // Grab the current time so we can use it to generate a metrics log.
 
 440         MdcOverride override = getStartTime(new MdcOverride());
 
 442         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
 
 443         HttpURLConnection conn = initializeConnection(fullUrl);
 
 446             conn.setRequestMethod("PUT");
 
 447         } catch (ProtocolException e) {
 
 448             shutdownConnection(conn);
 
 449             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
 
 452         attachDocument(conn, document);
 
 454         logger.debug("Sending 'PUT' request to: " + conn.getURL());
 
 456         opResult = getOperationResult(conn);
 
 457         buildDocumentResult(opResult, indexName);
 
 459         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
 
 461         shutdownConnection(conn);
 
 466     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
 
 467             throws DocumentStoreOperationException {
 
 468         // Grab the current time so we can use it to generate a metrics log.
 
 469         MdcOverride override = getStartTime(new MdcOverride());
 
 471         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
 
 472         HttpURLConnection conn = initializeConnection(fullUrl);
 
 475             conn.setRequestMethod("POST");
 
 476         } catch (ProtocolException e) {
 
 477             shutdownConnection(conn);
 
 478             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
 
 481         attachDocument(conn, document);
 
 483         logger.debug("Sending 'POST' request to: " + conn.getURL());
 
 485         DocumentOperationResult response = getOperationResult(conn);
 
 486         buildDocumentResult(response, indexName);
 
 488         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
 
 490         shutdownConnection(conn);
 
 495     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
 
 496             throws DocumentStoreOperationException {
 
 497         conn.setRequestProperty("Connection", "Close");
 
 498         attachContent(conn, doc.getContentInJson());
 
 501     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
 
 502             throws DocumentStoreOperationException {
 
 503         // Grab the current time so we can use it to generate a metrics log.
 
 504         MdcOverride override = getStartTime(new MdcOverride());
 
 506         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
 
 507         HttpURLConnection conn = initializeConnection(fullUrl);
 
 510             conn.setRequestMethod("HEAD");
 
 511         } catch (ProtocolException e) {
 
 512             shutdownConnection(conn);
 
 513             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
 
 516         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
 
 520             resultCode = conn.getResponseCode();
 
 521         } catch (IOException e) {
 
 522             shutdownConnection(conn);
 
 523             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
 
 526         logger.debug(MSG_RESPONSE_CODE + resultCode);
 
 528         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
 
 529                 .useDefaults().resultCode(resultCode).build();
 
 531         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
 
 532         shutdownConnection(conn);
 
 538     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
 
 539             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
 
 541         if (!allowImplicitIndexCreation) {
 
 542             // Before we do anything, make sure that the specified index actually exists in the
 
 543             // document store - we don't want to rely on ElasticSearch to fail the document
 
 544             // create because it could be configured to implicitly create a non-existent index,
 
 545             // which can lead to hard-to-debug behaviour with queries down the road.
 
 546             OperationResult indexExistsResult = checkIndexExistence(indexName);
 
 547             if (!isSuccess(indexExistsResult)) {
 
 548                 DocumentOperationResult opResult = new DocumentOperationResult();
 
 549                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
 
 550                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
 
 551                 opResult.setResult(resultMsg);
 
 552                 opResult.setFailureCause(resultMsg);
 
 557         // Grab the current time so we can use it to generate a metrics log.
 
 558         MdcOverride override = getStartTime(new MdcOverride());
 
 560         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
 
 561                 + document.getVersion(), false);
 
 562         HttpURLConnection conn = initializeConnection(fullUrl);
 
 565             conn.setRequestMethod("PUT");
 
 566         } catch (ProtocolException e) {
 
 567             shutdownConnection(conn);
 
 568             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
 
 571         attachDocument(conn, document);
 
 573         logger.debug("Sending 'PUT' request to: " + conn.getURL());
 
 575         DocumentOperationResult opResult = getOperationResult(conn);
 
 576         buildDocumentResult(opResult, indexName);
 
 578         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
 
 580         shutdownConnection(conn);
 
 586     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
 
 587             throws DocumentStoreOperationException {
 
 588         // Grab the current time so we can use it to generate a metrics log.
 
 589         MdcOverride override = getStartTime(new MdcOverride());
 
 591         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
 
 592                 + document.getVersion(), false);
 
 593         HttpURLConnection conn = initializeConnection(fullUrl);
 
 596             conn.setRequestMethod("DELETE");
 
 597         } catch (ProtocolException e) {
 
 598             shutdownConnection(conn);
 
 599             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
 
 602         logger.debug("\nSending 'DELETE' request to " + conn.getURL());
 
 604         DocumentOperationResult opResult = getOperationResult(conn);
 
 605         buildDocumentResult(opResult, indexName);
 
 606         // supress the etag and url in response for delete as they are not required
 
 607         if (opResult.getDocument() != null) {
 
 608             opResult.getDocument().setEtag(null);
 
 609             opResult.getDocument().setUrl(null);
 
 612         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
 
 614         shutdownConnection(conn);
 
 620     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
 
 621             throws DocumentStoreOperationException {
 
 622         // Grab the current time so we can use it to generate a metrics log.
 
 623         MdcOverride override = getStartTime(new MdcOverride());
 
 625         String fullUrl = null;
 
 626         if (document.getVersion() == null) {
 
 627             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
 
 629             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
 
 630                     + document.getVersion(), false);
 
 632         HttpURLConnection conn = initializeConnection(fullUrl);
 
 634         logger.debug("\nSending 'GET' request to: " + conn.getURL());
 
 636         DocumentOperationResult opResult = getOperationResult(conn);
 
 637         buildDocumentResult(opResult, indexName);
 
 639         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
 
 641         shutdownConnection(conn);
 
 647     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
 
 649         String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
 
 651         // Grab the current time so we can use it to generate a metrics log.
 
 652         MdcOverride override = getStartTime(new MdcOverride());
 
 654         HttpURLConnection conn = initializeConnection(fullUrl);
 
 657             conn.setRequestMethod("GET");
 
 658         } catch (ProtocolException e) {
 
 659             shutdownConnection(conn);
 
 660             throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
 
 663         logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
 
 665         SearchOperationResult opResult = getSearchOperationResult(conn);
 
 666         buildSearchResult(opResult, indexName);
 
 668         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
 
 674     public SearchOperationResult searchWithPayload(String indexName, String query)
 
 675             throws DocumentStoreOperationException {
 
 676         if (logger.isDebugEnabled()) {
 
 677             logger.debug("Querying index: " + indexName + " with query string: " + query);
 
 680         String fullUrl = getFullUrl("/" + indexName + "/_search", false);
 
 682         // Grab the current time so we can use it to generate a metrics log.
 
 683         MdcOverride override = getStartTime(new MdcOverride());
 
 685         HttpURLConnection conn = initializeConnection(fullUrl);
 
 688             conn.setRequestMethod("POST");
 
 689         } catch (ProtocolException e) {
 
 690             shutdownConnection(conn);
 
 691             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
 
 694         attachContent(conn, query);
 
 696         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
 
 697         logger.debug("Request body =  Elasticsearch query = " + query);
 
 699         SearchOperationResult opResult = getSearchOperationResult(conn);
 
 700         buildSearchResult(opResult, indexName);
 
 702         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
 
 704         shutdownConnection(conn);
 
 711     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
 
 712             throws DocumentStoreOperationException {
 
 713         if (logger.isDebugEnabled()) {
 
 714             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
 
 717         String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
 
 719         // Grab the current time so we can use it to generate a metrics log.
 
 720         MdcOverride override = getStartTime(new MdcOverride());
 
 722         HttpURLConnection conn = initializeConnection(fullUrl);
 
 725             conn.setRequestMethod("POST");
 
 726         } catch (ProtocolException e) {
 
 727             shutdownConnection(conn);
 
 728             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
 
 731         attachContent(conn, query);
 
 733         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
 
 734         logger.debug("Request body =  Elasticsearch query = " + query);
 
 736         SearchOperationResult opResult = getSearchOperationResult(conn);
 
 737         buildSuggestResult(opResult, indexName);
 
 739         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
 
 741         shutdownConnection(conn);
 
 746     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
 
 747         OutputStream outputStream = null;
 
 748         OutputStreamWriter out = null;
 
 751             outputStream = conn.getOutputStream();
 
 752         } catch (IOException e) {
 
 753             shutdownConnection(conn);
 
 754             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
 
 757         out = new OutputStreamWriter(outputStream);
 
 762         } catch (IOException e) {
 
 763             shutdownConnection(conn);
 
 764             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
 
 768     private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
 
 770         HttpURLConnection conn = null;
 
 773             url = new URL(fullUrl);
 
 774         } catch (MalformedURLException e) {
 
 775             throw new DocumentStoreOperationException("Error building a URL with " + url, e);
 
 779             conn = (HttpURLConnection) url.openConnection();
 
 780             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
 
 781             conn.setDoOutput(true);
 
 782         } catch (IOException e) {
 
 783             shutdownConnection(conn);
 
 784             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
 
 790     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
 
 791         return handleResponse(conn, new OperationResultBuilder().useDefaults());
 
 794     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
 
 795             throws DocumentStoreOperationException {
 
 799             resultCode = conn.getResponseCode();
 
 800         } catch (IOException e) {
 
 801             shutdownConnection(conn);
 
 802             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
 
 805         logger.debug(MSG_RESPONSE_CODE + resultCode);
 
 807         InputStream inputStream = null;
 
 809         if (!isSuccessCode(resultCode)) {
 
 810             inputStream = conn.getErrorStream();
 
 813                 inputStream = conn.getInputStream();
 
 814             } catch (IOException e) {
 
 815                 shutdownConnection(conn);
 
 816                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
 
 820         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
 
 821         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
 
 823         StringBuilder result = new StringBuilder(128);
 
 824         String string = null;
 
 827             while ((string = bufferedreader.readLine()) != null) {
 
 828                 result.append(string).append("\n");
 
 830         } catch (IOException e) {
 
 831             shutdownConnection(conn);
 
 832             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
 
 835         if (resultCode == Status.CONFLICT.getStatusCode()) {
 
 836             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
 
 838             rb.resultCode(resultCode);
 
 840         if (logger.isDebugEnabled()) {
 
 841             logger.debug("Raw result string from ElasticSearch = " + result.toString());
 
 843         rb.result(result.toString());
 
 844         rb.resultVersion(extractVersion(result.toString()));
 
 848     private String extractVersion(String result) {
 
 849         JSONParser parser = new JSONParser();
 
 850         String version = null;
 
 852             JSONObject root = (JSONObject) parser.parse(result);
 
 853             if (root.get(JSON_ATTR_VERSION) != null) {
 
 854                 version = root.get(JSON_ATTR_VERSION).toString();
 
 856         } catch (ParseException e) {
 
 857             // Not all responses from ElasticSearch include a version, so
 
 858             // if we don't get one back, just return an empty string rather
 
 859             // than trigger a false failure.
 
 866      * This convenience method gets the current system time and stores it in an attribute in the supplied
 
 867      * {@link MdcOverride} object so that it can be used later by the metrics logger.
 
 869      * @param override - The {@link MdcOverride} object to update.
 
 870      * @return - The supplied {@link MdcOverride} object.
 
 872     private MdcOverride getStartTime(MdcOverride override) {
 
 874         // Grab the current time...
 
 875         long startTimeInMs = System.currentTimeMillis();
 
 877         // ...and add it as an attribute to the supplied MDC Override
 
 879         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
 
 880         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
 
 882         // Return the MdcOverride object that we were passed.
 
 883         // This looks odd, but it allows us to do stuff like:
 
 885         // MdcOverride ov = getStartTime(new MdcOverride())
 
 887         // which is quite handy, but also allows us to pass in an existing
 
 888         // MdcOverride object which already has some attributes set.
 
 892     private boolean isSuccess(OperationResult result) {
 
 893         return isSuccessCode(result.getResultCode());
 
 896     private boolean isSuccessCode(int statusCode) {
 
 897         return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
 
 901     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
 
 902         if (logger.isDebugEnabled()) {
 
 903             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
 
 905             for (BulkRequest request : requests) {
 
 906                 dbgString.append("[").append(request).append("] ");
 
 909             logger.debug(dbgString.toString());
 
 912         // Grab the current time so we can use it to generate a metrics log.
 
 913         MdcOverride override = getStartTime(new MdcOverride());
 
 915         // Parse the supplied set of operations.
 
 916         // Iterate over the list of operations which we were provided and
 
 917         // translate them into a format that ElasticSearh understands.
 
 919         StringBuilder esOperationSet = new StringBuilder(128);
 
 920         List<ElasticSearchResultItem> rejected = new ArrayList<>();
 
 921         for (BulkRequest request : requests) {
 
 923             // Convert the request to the syntax ElasticSearch likes.
 
 924             if (buildEsOperation(request, esOperationSet, rejected)) {
 
 929         ElasticSearchBulkOperationResult opResult = null;
 
 932             // Open an HTTP connection to the ElasticSearch back end.
 
 933             String fullUrl = getFullUrl("/_bulk", false);
 
 935             HttpURLConnection conn;
 
 938                 url = new URL(fullUrl);
 
 939                 conn = (HttpURLConnection) url.openConnection();
 
 940                 conn.setRequestMethod("PUT");
 
 941                 conn.setDoOutput(true);
 
 942                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
 
 943                 conn.setRequestProperty("Connection", "Close");
 
 945             } catch (IOException e) {
 
 947                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
 
 948                 if (logger.isDebugEnabled()) {
 
 949                     logger.debug(Throwables.getStackTraceAsString(e));
 
 952                 throw new DocumentStoreOperationException(
 
 953                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
 
 956             StringBuilder bulkResult = new StringBuilder(128);
 
 958                 // Create an output stream to write our request to.
 
 959                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
 
 961                 if (logger.isDebugEnabled()) {
 
 962                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
 
 963                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
 
 966                 // Write the resulting request string to our output stream. (this sends the request to ES?)
 
 967                 out.write(esOperationSet.toString());
 
 970                 // Open an input stream on our connection in order to read back the results.
 
 971                 InputStream is = conn.getInputStream();
 
 972                 InputStreamReader inputstreamreader = new InputStreamReader(is);
 
 973                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
 
 975                 // Read the contents of the input stream into our result string...
 
 976                 String esResponseString = null;
 
 978                 while ((esResponseString = bufferedreader.readLine()) != null) {
 
 979                     bulkResult.append(esResponseString).append("\n");
 
 982             } catch (IOException e) {
 
 984                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
 
 985                 if (logger.isDebugEnabled()) {
 
 986                     StringWriter sw = new StringWriter();
 
 987                     e.printStackTrace(new PrintWriter(sw));
 
 988                     logger.debug(sw.toString());
 
 991                 throw new DocumentStoreOperationException(
 
 992                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
 
 995             if (logger.isDebugEnabled()) {
 
 996                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
 
 999             // ...and marshal the resulting string into a Java object.
 
1001                 opResult = marshallEsBulkResult(bulkResult.toString());
 
1003             } catch (IOException e) {
 
1005                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
 
1006                 if (logger.isDebugEnabled()) {
 
1007                     logger.debug(Throwables.getStackTraceAsString(e));
 
1010                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
 
1015         OperationResult result = new OperationResultBuilder() //
 
1016                 .resultCode(HttpStatus.MULTI_STATUS_207) //
 
1017                 .result(buildGenericBulkResultSet(opResult, rejected)) //
 
1020         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
 
1022         String resultStringForMetricsLog = result.getResult();
 
1023         if (isSuccess(result)) {
 
1024             resultStringForMetricsLog =
 
1025                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
 
1028         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
 
1030                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
 
1031                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
 
1038      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
 
1040      * @param request - The request to be performed.
 
1041      * @param sb - The string builder to append the json data to
 
1042      * @throws DocumentStoreOperationException
 
1044     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
 
1045             throws DocumentStoreOperationException {
 
1047         boolean retVal = true;
 
1048         // What kind of operation are we performing?
 
1049         switch (request.getOperationType()) {
 
1051             // Create a new document.
 
1054                 // Make sure that we were supplied a document payload.
 
1055                 if (request.getOperation().getDocument() == null) {
 
1057                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
 
1058                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
 
1062                 // Make sure that the supplied document URL is formatted
 
1064                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
 
1065                     fails.add(generateRejectionEntry(request.getOperationType(),
 
1066                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
 
1067                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
 
1071                 // Validate that the specified index actually exists before we
 
1072                 // try to perform the create.
 
1073                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
 
1075                     fails.add(generateRejectionEntry(request.getOperationType(),
 
1076                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
 
1077                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
 
1081                 // If we were supplied an id for the new document, then
 
1082                 // include it in the bulk operation to Elastic Search
 
1083                 if (request.getId() == null) {
 
1085                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
 
1087                     // Otherwise, we just leave that parameter off and ElasticSearch
 
1088                     // will generate one for us.
 
1090                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
 
1095                     // Append the document that we want to create.
 
1096                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
 
1097                 } catch (JsonProcessingException e) {
 
1098                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
 
1103             // Update an existing document.
 
1106                 // Make sure that we were supplied a document payload.
 
1107                 if (request.getOperation().getDocument() == null) {
 
1109                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
 
1110                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
 
1114                 // Make sure that the supplied document URL is formatted
 
1116                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
 
1117                     fails.add(generateRejectionEntry(request.getOperationType(),
 
1118                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
 
1119                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
 
1123                 // Validate that the specified index actually exists before we
 
1124                 // try to perform the update.
 
1125                 if (!indexExists(request.getIndex())) {
 
1127                     fails.add(generateRejectionEntry(request.getOperationType(),
 
1128                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
 
1129                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
 
1133                 // Validate that the document we are trying to update actually
 
1134                 // exists before we try to perform the update.
 
1135                 if (!documentExists(request.getIndex(), request.getId())) {
 
1137                     fails.add(generateRejectionEntry(request.getOperationType(),
 
1138                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
 
1139                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
 
1143                 // It is mandatory that a version be supplied for an update operation,
 
1144                 // so validate that now.
 
1145                 if (request.getOperation().getMetaData().getEtag() == null) {
 
1147                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
 
1148                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
 
1152                 // Generate the update request...
 
1153                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
 
1154                         request.getOperation().getMetaData().getEtag()));
 
1156                 // ...and append the document that we want to update.
 
1158                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
 
1159                 } catch (JsonProcessingException e) {
 
1160                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
 
1164             // Delete an existing document.
 
1167                 // Make sure that the supplied document URL is formatted
 
1169                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
 
1170                     fails.add(generateRejectionEntry(request.getOperationType(),
 
1171                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
 
1172                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
 
1176                 // Validate that the specified index actually exists before we
 
1177                 // try to perform the delete.
 
1178                 if (!indexExists(request.getIndex())) {
 
1180                     fails.add(generateRejectionEntry(request.getOperationType(),
 
1181                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
 
1182                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
 
1186                 // Validate that the document we are trying to update actually
 
1187                 // exists before we try to perform the delete.
 
1188                 if (!documentExists(request.getIndex(), request.getId())) {
 
1190                     fails.add(generateRejectionEntry(request.getOperationType(),
 
1191                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
 
1192                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
 
1196                 // It is mandatory that a version be supplied for a delete operation,
 
1197                 // so validate that now.
 
1198                 if (request.getOperation().getMetaData().getEtag() == null) {
 
1200                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
 
1201                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
 
1205                 // Generate the delete request.
 
1206                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
 
1207                         request.getOperation().getMetaData().getEtag()));
 
1215     private boolean indexExists(String index) throws DocumentStoreOperationException {
 
1216         return isSuccess(checkIndexExistence(index));
 
1219     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
 
1220         return isSuccess(checkDocumentExistence(index, id));
 
1224      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
 
1227      * @param rejectReason - A message describing why the operation was rejected.
 
1228      * @param anId - The identifier associated with the document being acted on.
 
1229      * @param statusCode - An HTTP status code.
 
1230      * @return - A result set item.
 
1232     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
 
1233             String anId, int statusCode, String originalUrl) {
 
1235         ElasticSearchError err = new ElasticSearchError();
 
1236         err.setReason(rejectReason);
 
1238         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
 
1241         op.setStatus(statusCode);
 
1243         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
 
1245         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
 
1249                 rejectionResult.setCreate(op);
 
1252                 rejectionResult.setIndex(op);
 
1255                 rejectionResult.setDelete(op);
 
1260         return rejectionResult;
 
1264      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
 
1265      * marshals it into a Java object.
 
1267      * @param jsonResult - The bulk operations response returned from ElasticSearch.
 
1268      * @return - The marshalled response.
 
1269      * @throws JsonParseException
 
1270      * @throws JsonMappingException
 
1271      * @throws IOException
 
1273     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
 
1274         if (jsonResult != null) {
 
1275             if (logger.isDebugEnabled()) {
 
1276                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
 
1279             ObjectMapper mapper = new ObjectMapper();
 
1280             mapper.setSerializationInclusion(Include.NON_EMPTY);
 
1282             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
 
1289      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
 
1291      * @param esResult - ElasticSearch bulk operations response.
 
1292      * @return - A generic result set.
 
1294     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
 
1295             List<ElasticSearchResultItem> rejectedOps) {
 
1297         int totalSuccess = 0;
 
1300         if (logger.isDebugEnabled()) {
 
1302             logger.debug("ESController: Build generic result set.  ES Results: "
 
1303                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
 
1306         // Build a combined list of result items from the results returned
 
1307         // from ElasticSearch and the list of operations that we rejected
 
1308         // without sending to ElasticSearch.
 
1309         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
 
1310         if (esResult != null) {
 
1311             combinedResults.addAll(Arrays.asList(esResult.getItems()));
 
1313         combinedResults.addAll(rejectedOps);
 
1315         // Iterate over the individual results in the resulting result set.
 
1316         StringBuilder resultsBuilder = new StringBuilder();
 
1317         AtomicBoolean firstItem = new AtomicBoolean(true);
 
1318         for (ElasticSearchResultItem item : combinedResults) {
 
1320             // Increment the operation counts.
 
1322             if (isSuccessCode(item.operationStatus().getStatus())) {
 
1328             // Prepend a comma to our response string unless this it the
 
1329             // first result in the set.
 
1330             if (!firstItem.compareAndSet(true, false)) {
 
1331                 resultsBuilder.append(", ");
 
1334             // Append the current result as a generic json structure.
 
1335             resultsBuilder.append(item.toJson());
 
1338         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
 
1339                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
 
1344      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
 
1346      * @param indexName - The index to look for.
 
1347      * @return - An operation result indicating the success or failure of the check.
 
1348      * @throws DocumentStoreOperationException
 
1350     public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
 
1351         // Grab the current time so we can use it to generate a metrics log.
 
1352         MdcOverride override = getStartTime(new MdcOverride());
 
1354         String fullUrl = getFullUrl("/" + indexName, false);
 
1355         HttpURLConnection conn = initializeConnection(fullUrl);
 
1358             conn.setRequestMethod("HEAD");
 
1360         } catch (ProtocolException e) {
 
1361             shutdownConnection(conn);
 
1362             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
 
1365         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
 
1369             resultCode = conn.getResponseCode();
 
1370         } catch (IOException e) {
 
1371             shutdownConnection(conn);
 
1372             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
 
1374         logger.debug(MSG_RESPONSE_CODE + resultCode);
 
1376         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
 
1377         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
 
1378         shutdownConnection(conn);
 
1383     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
 
1384         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
 
1387     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
 
1388             throws DocumentStoreOperationException {
 
1389         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
 
1392     private void buildDocumentResult(DocumentOperationResult result, String index)
 
1393             throws DocumentStoreOperationException {
 
1395         JSONParser parser = new JSONParser();
 
1398             root = (JSONObject) parser.parse(result.getResult());
 
1399             if (isSuccess(result)) {
 
1400                 // Success response object
 
1401                 Document doc = new Document();
 
1402                 doc.setEtag(result.getResultVersion());
 
1403                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
 
1405                 doc.setContent((JSONObject) root.get("_source"));
 
1406                 result.setDocument(doc);
 
1409                 // Error response object
 
1410                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
 
1411                 if (error != null) {
 
1413                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
 
1417         } catch (Exception e) {
 
1418             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
 
1422     private String buildDocumentResponseUrl(String index, String id) {
 
1423         return ApiUtils.buildDocumentUri(index, id);
 
1426     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
 
1427         JSONParser parser = new JSONParser();
 
1431             root = (JSONObject) parser.parse(result.getResult());
 
1432             if (isSuccess(result)) {
 
1433                 JSONObject hits = (JSONObject) root.get("hits");
 
1434                 JSONArray hitArray = (JSONArray) hits.get("hits");
 
1435                 SearchHits searchHits = new SearchHits();
 
1436                 searchHits.setTotalHits(hits.get("total").toString());
 
1437                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
 
1439                 for (int i = 0; i < hitArray.size(); i++) {
 
1440                     JSONObject hit = (JSONObject) hitArray.get(i);
 
1441                     SearchHit searchHit = new SearchHit();
 
1442                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
 
1443                     Document doc = new Document();
 
1444                     if (hit.get(JSON_ATTR_VERSION) != null) {
 
1445                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
 
1449                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
 
1450                     doc.setContent((JSONObject) hit.get("_source"));
 
1451                     searchHit.setDocument(doc);
 
1452                     searchHitArray.add(searchHit);
 
1454                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
 
1455                 result.setSearchResult(searchHits);
 
1457                 JSONObject aggregations = (JSONObject) root.get("aggregations");
 
1458                 if (aggregations != null) {
 
1459                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
 
1460                     AggregationResults aggs = new AggregationResults();
 
1461                     aggs.setAggregations(aggResults);
 
1462                     result.setAggregationResult(aggs);
 
1467                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
 
1468                 if (error != null) {
 
1470                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
 
1473         } catch (Exception e) {
 
1474             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
 
1478     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
 
1479         JSONParser parser = new JSONParser();
 
1482             root = (JSONObject) parser.parse(result.getResult());
 
1483             if (isSuccess(result)) {
 
1484                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
 
1485                 JSONObject hitdata = (JSONObject) hitArray.get(0);
 
1486                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
 
1487                 SuggestHits suggestHits = new SuggestHits();
 
1488                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
 
1490                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
 
1492                 for (int i = 0; i < optionsArray.size(); i++) {
 
1493                     JSONObject hit = (JSONObject) optionsArray.get(i);
 
1495                     SuggestHit suggestHit = new SuggestHit();
 
1496                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
 
1497                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
 
1498                     Document doc = new Document();
 
1499                     if (hit.get(JSON_ATTR_VERSION) != null) {
 
1500                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
 
1503                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
 
1505                     doc.setContent((JSONObject) hit.get("payload"));
 
1506                     suggestHit.setDocument(doc);
 
1507                     suggestHitArray.add(suggestHit);
 
1509                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
 
1510                 result.setSuggestResult(suggestHits);
 
1512                 JSONObject aggregations = (JSONObject) root.get("aggregations");
 
1513                 if (aggregations != null) {
 
1514                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
 
1515                     AggregationResults aggs = new AggregationResults();
 
1516                     aggs.setAggregations(aggResults);
 
1517                     result.setAggregationResult(aggs);
 
1522                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
 
1523                 if (error != null) {
 
1525                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
 
1528         } catch (Exception e) {
 
1529             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
 
1534      * Record the timing of the operation in the metrics log.
 
1537     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
 
1539         metricsLogger.info(message,
 
1541                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
 
1542                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),