X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Faai%2Fsa%2Fsearchdbabstraction%2Felasticsearch%2Fdao%2FElasticSearchHttpController.java;h=c4a52b44199a0134420212ecc934f4814631e873;hb=7ca6c2a19ee1971a402a6b7eaad4324d83fb236a;hp=5fc1df0fa1a3268d052ced6027c03d734bf063f1;hpb=d5922af3f5edbb5ec083ca9b90c74c20a7a9328d;p=aai%2Fsearch-data-service.git diff --git a/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java b/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java index 5fc1df0..c4a52b4 100644 --- a/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java +++ b/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java @@ -49,10 +49,14 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.Response.Status.Family; +import javax.ws.rs.core.UriBuilder; +import org.eclipse.jetty.http.HttpStatus; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -70,12 +74,15 @@ import org.onap.aai.sa.rest.BulkRequest.OperationType; import org.onap.aai.sa.rest.DocumentSchema; import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig; import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException; +import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage; import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult; import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults; import org.onap.aai.sa.searchdbabstraction.entity.Document; import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult; import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult; import org.onap.aai.sa.searchdbabstraction.entity.OperationResult; +import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder; +import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type; import org.onap.aai.sa.searchdbabstraction.entity.SearchHit; import org.onap.aai.sa.searchdbabstraction.entity.SearchHits; import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult; @@ -99,21 +106,18 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { private static final Logger metricsLogger = LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName()); + private static final String URL_QUERY_VERSION = "version="; + private static final String JSON_ATTR_VERSION = "_version"; private static final String JSON_ATTR_ERROR = "error"; private static final String JSON_ATTR_REASON = "reason"; - private static final String QUERY_PARAM_VERSION = "?version="; + private static final String DEFAULT_TYPE = "default"; private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: "; private static final String MSG_RESPONSE_CODE = "Response Code : "; private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: "; - private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT."; - private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST."; - private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT = - "Internal Error: ElasticSearch operation fault occurred"; - private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION = - "Failed to get the response code from the connection."; + private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response."; private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE = @@ -124,13 +128,30 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n"; private static final String BULK_DELETE_TEMPLATE = "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n"; + public final static String APPLICATION_XND_JSON_TYPE = new MediaType("application", "x-ndjson").toString(); private final ElasticSearchConfig config; - private static final String DEFAULT_TYPE = "default"; - protected AnalysisConfiguration analysisConfig; + public ElasticSearchHttpController(ElasticSearchConfig config) { + this.config = config; + analysisConfig = new AnalysisConfiguration(); + + String rootUrl = null; + try { + if ("https".equals(config.getUriScheme())) { + new ElasticSearchHttpsController(config); + } + rootUrl = buildUrl(createUriBuilder("")).toString(); + logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl); + checkConnection(); + logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl); + } catch (Exception e) { + logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage()); + } + } + public static ElasticSearchHttpController getInstance() { synchronized (ElasticSearchHttpController.class) { if (instance == null) { @@ -151,34 +172,19 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { return instance; } - public ElasticSearchHttpController(ElasticSearchConfig config) { - this.config = config; - analysisConfig = new AnalysisConfiguration(); - - try { - logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false)); - checkConnection(); - logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false)); - } catch (Exception e) { - logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false), - e.getMessage()); - } - } - public AnalysisConfiguration getAnalysisConfig() { return analysisConfig; } + public ElasticSearchConfig getElasticSearchConfig() { + return config; + } + @Override public OperationResult createIndex(String index, DocumentSchema documentSchema) { - OperationResult result = new OperationResult(); - result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - try { - - // Submit the request to ElasticSearch to create the index using a - // default document type. - result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(), + // Submit the request to ElasticSearch to create the index using a default document type. + OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(), DocumentSchemaUtil.generateDocumentMappings(documentSchema)); // ElasticSearch will return us a 200 code on success when we @@ -190,20 +196,17 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { if (isSuccess(result)) { result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}"); } + return result; } catch (DocumentStoreOperationException | IOException e) { - result.setFailureCause("Document store operation failure. Cause: " + e.getMessage()); + return new OperationResultBuilder().useDefaults() + .failureCause("Document store operation failure. Cause: " + e.getMessage()).build(); } - - return result; } @Override public OperationResult createDynamicIndex(String index, String dynamicSchema) { - OperationResult result = new OperationResult(); - result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - try { - result = createTable(index, dynamicSchema); + OperationResult result = createTable(index, dynamicSchema); // ElasticSearch will return us a 200 code on success when we // want to report a 201, so translate the result here. @@ -213,127 +216,28 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { if (isSuccess(result)) { result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}"); } + return result; } catch (DocumentStoreOperationException e) { - result.setFailureCause("Document store operation failure. Cause: " + e.getMessage()); + return new OperationResultBuilder().useDefaults() + .failureCause("Document store operation failure. Cause: " + e.getMessage()).build(); } - - return result; } @Override public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException { - - // Initialize operation result with a failure codes / fault string - OperationResult opResult = new OperationResult(); - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); - - logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL()); - - try { - conn.setRequestMethod("DELETE"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e); - } - - handleResponse(conn, opResult); - - // Generate a metrics log so we can track how long the operation took. - metricsLogger - .info(SearchDbMsgs.DELETE_INDEX_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName); - - shutdownConnection(conn); - - return opResult; - } - - private OperationResult checkConnection() throws IOException { - String fullUrl = getFullUrl("/_cluster/health", false); - URL url = null; - HttpURLConnection conn = null; - - url = new URL(fullUrl); - conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("GET"); - conn.setDoOutput(true); - logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url); - - int resultCode = conn.getResponseCode(); - logger.debug("getClusterHealth() response Code : " + resultCode); - OperationResult opResult = new OperationResult(); - opResult.setResultCode(resultCode); - + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE); + OperationResult opResult = handleResponse(conn); + logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName); shutdownConnection(conn); return opResult; } - private String getFullUrl(String resourceUrl, boolean isSecure) { - - final String host = config.getIpAddress(); - final String port = config.getHttpPort(); - - if (isSecure) { - return String.format("https://%s:%s%s", host, port, resourceUrl); - } else { - return String.format("http://%s:%s%s", host, port, resourceUrl); - } - } - - private void shutdownConnection(HttpURLConnection connection) { - if (connection == null) { - return; - } - - final String methodName = "shutdownConnection"; - InputStream inputstream = null; - OutputStream outputstream = null; - - try { - inputstream = connection.getInputStream(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); - } finally { - if (inputstream != null) { - try { - inputstream.close(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); - } - } - } - - try { - outputstream = connection.getOutputStream(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); - } finally { - if (outputstream != null) { - try { - outputstream.close(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); - } - } - } - - connection.disconnect(); - } - // @Override protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings) throws DocumentStoreOperationException { - if (indexSettings == null) { logger.debug("No settings provided."); } @@ -342,24 +246,9 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { logger.debug("No mappings provided."); } - OperationResult opResult = new OperationResult(); - - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e); - } + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT); StringBuilder sb = new StringBuilder(128); sb.append("{ \"settings\" : "); @@ -378,19 +267,11 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { throw new DocumentStoreOperationException(e.getMessage(), e); } - logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL()); - logger.debug("Request content: " + sb.toString()); - - handleResponse(conn, opResult); + logger.debug("Request content: " + sb); + OperationResult opResult = handleResponse(conn); shutdownConnection(conn); - - // Generate a metrics log so we can track how long the operation took. - metricsLogger - .info(SearchDbMsgs.CREATE_INDEX_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), - override, indexName); + logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName); return opResult; } @@ -406,37 +287,18 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { */ protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException { - OperationResult result = new OperationResult(); - result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e); - } - + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT); try { attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings)); } catch (IOException e) { logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e); throw new DocumentStoreOperationException(e.getMessage()); } - handleResponse(conn, result); - // Generate a metrics log so we can track how long the operation took. - metricsLogger - .info(SearchDbMsgs.CREATE_INDEX_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()), - override, indexName); + OperationResult result = handleResponse(conn); + logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName); return result; } @@ -452,12 +314,9 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { // which can lead to hard-to-debug behaviour with queries down the road. OperationResult indexExistsResult = checkIndexExistence(indexName); if (!isSuccess(indexExistsResult)) { - DocumentOperationResult opResult = new DocumentOperationResult(); - opResult.setResultCode(Status.NOT_FOUND.getStatusCode()); String resultMsg = "Document Index '" + indexName + "' does not exist."; - opResult.setResult(resultMsg); - opResult.setFailureCause(resultMsg); - return opResult; + return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND) + .result(resultMsg).failureCause(resultMsg).build(); } } @@ -468,144 +327,83 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } } - private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document) - throws DocumentStoreOperationException { - // check if the document already exists - DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId()); - - if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) { - if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) { - opResult.setFailureCause("A document with the same id already exists."); - } else { - opResult.setFailureCause("Failed to verify a document with the specified id does not already exist."); + @Override + public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document, + boolean allowImplicitIndexCreation) throws DocumentStoreOperationException { + if (!allowImplicitIndexCreation) { + // Before we do anything, make sure that the specified index actually exists in the + // document store - we don't want to rely on ElasticSearch to fail the document + // create because it could be configured to implicitly create a non-existent index, + // which can lead to hard-to-debug behaviour with queries down the road. + OperationResult indexExistsResult = checkIndexExistence(indexName); + if (!isSuccess(indexExistsResult)) { + DocumentOperationResult opResult = new DocumentOperationResult(); + opResult.setResultCode(Status.NOT_FOUND.getStatusCode()); + String resultMsg = "Document Index '" + indexName + "' does not exist."; + opResult.setResult(resultMsg); + opResult.setFailureCause(resultMsg); + return opResult; } - opResult.setResultCode(Status.CONFLICT.getStatusCode()); - return opResult; } - opResult = new DocumentOperationResult(); - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e); - } + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()) + .replaceQuery(URL_QUERY_VERSION + document.getVersion())); + HttpURLConnection conn = createConnection(url, HttpMethod.PUT); attachDocument(conn, document); - logger.debug("Sending 'PUT' request to: " + conn.getURL()); - - handleResponse(conn, opResult); + DocumentOperationResult opResult = getOperationResult(conn); buildDocumentResult(opResult, indexName); - // Generate a metrics log so we can track how long the operation took. - metricsLogger - .info(SearchDbMsgs.CREATE_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName); + logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId()); shutdownConnection(conn); return opResult; } - private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document) + @Override + public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document) throws DocumentStoreOperationException { + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()) + .replaceQuery(URL_QUERY_VERSION + document.getVersion())); - DocumentOperationResult response = new DocumentOperationResult(); - // Initialize operation result with a failure codes / fault string - response.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false); - HttpURLConnection conn = initializeConnection(fullUrl); + HttpURLConnection conn = createConnection(url, HttpMethod.DELETE); - try { - conn.setRequestMethod("POST"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e); + DocumentOperationResult opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); + // supress the etag and url in response for delete as they are not required + if (opResult.getDocument() != null) { + opResult.getDocument().setEtag(null); + opResult.getDocument().setUrl(null); } - attachDocument(conn, document); - - logger.debug("Sending 'POST' request to: " + conn.getURL()); - - handleResponse(conn, response); - buildDocumentResult(response, indexName); - - // Generate a metrics log so we can track how long the operation took. - metricsLogger - .info(SearchDbMsgs.CREATE_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()), - override, indexName); + logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId()); shutdownConnection(conn); - return response; - } - - private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc) - throws DocumentStoreOperationException { - conn.setRequestProperty("Connection", "Close"); - attachContent(conn, doc.getContentInJson()); + return opResult; } - private DocumentOperationResult checkDocumentExistence(String indexName, String docId) + @Override + public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document) throws DocumentStoreOperationException { - DocumentOperationResult opResult = new DocumentOperationResult(); - - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("HEAD"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e); - } - - logger.debug("Sending 'HEAD' request to: " + conn.getURL()); - - int resultCode; - try { - resultCode = conn.getResponseCode(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e); + final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId()); + if (document.getVersion() != null) { + uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion()); } - logger.debug(MSG_RESPONSE_CODE + resultCode); + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET"); - opResult.setResultCode(resultCode); + DocumentOperationResult opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); - // Generate a metrics log so we can track how long the operation took. - metricsLogger - .info(SearchDbMsgs.GET_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName, docId); + logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId()); shutdownConnection(conn); @@ -613,57 +411,38 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } @Override - public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document, - boolean allowImplicitIndexCreation) throws DocumentStoreOperationException { - - if (!allowImplicitIndexCreation) { - // Before we do anything, make sure that the specified index actually exists in the - // document store - we don't want to rely on ElasticSearch to fail the document - // create because it could be configured to implicitly create a non-existent index, - // which can lead to hard-to-debug behaviour with queries down the road. - OperationResult indexExistsResult = checkIndexExistence(indexName); - if (!isSuccess(indexExistsResult)) { - DocumentOperationResult opResult = new DocumentOperationResult(); - opResult.setResultCode(Status.NOT_FOUND.getStatusCode()); - String resultMsg = "Document Index '" + indexName + "' does not exist."; - opResult.setResult(resultMsg); - opResult.setFailureCause(resultMsg); - return opResult; - } - } + public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException { + final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString)); - DocumentOperationResult opResult = new DocumentOperationResult(); + MdcOverride override = getStartTime(new MdcOverride()); - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + HttpURLConnection conn = createConnection(url, "GET"); + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSearchResult(opResult, indexName); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION - + document.getVersion(), false); - HttpURLConnection conn = initializeConnection(fullUrl); + return opResult; + } - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e); + @Override + public SearchOperationResult searchWithPayload(String indexName, String query) + throws DocumentStoreOperationException { + if (logger.isDebugEnabled()) { + logger.debug("Querying index: " + indexName + " with query string: " + query); } + final URL url = buildUrl(createUriBuilder(indexName, "_search")); - attachDocument(conn, document); + MdcOverride override = getStartTime(new MdcOverride()); - logger.debug("Sending 'PUT' request to: " + conn.getURL()); + HttpURLConnection conn = createConnection(url, HttpMethod.POST); + attachContent(conn, query); + logger.debug("Request body = Elasticsearch query = " + query); - handleResponse(conn, opResult); - buildDocumentResult(opResult, indexName); + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSearchResult(opResult, indexName); - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName, document.getId()); + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query); shutdownConnection(conn); @@ -671,43 +450,22 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } @Override - public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document) + public SearchOperationResult suggestionQueryWithPayload(String indexName, String query) throws DocumentStoreOperationException { - DocumentOperationResult opResult = new DocumentOperationResult(); - - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + if (logger.isDebugEnabled()) { + logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query); + } - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST); + attachContent(conn, query); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION - + document.getVersion(), false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("DELETE"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e); - } - - logger.debug("\nSending 'DELETE' request to " + conn.getURL()); + logger.debug("Request body = Elasticsearch query = " + query); - handleResponse(conn, opResult); - buildDocumentResult(opResult, indexName); - // supress the etag and url in response for delete as they are not required - if (opResult.getDocument() != null) { - opResult.getDocument().setEtag(null); - opResult.getDocument().setUrl(null); - } + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSuggestResult(opResult, indexName); - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), - override, indexName, document.getId()); + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query); shutdownConnection(conn); @@ -715,167 +473,239 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } @Override - public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document) - throws DocumentStoreOperationException { - DocumentOperationResult opResult = new DocumentOperationResult(); + public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException { + if (logger.isDebugEnabled()) { + StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: "); + + for (BulkRequest request : requests) { + dbgString.append("[").append(request).append("] "); + } - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + logger.debug(dbgString.toString()); + } - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = null; - if (document.getVersion() == null) { - fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false); - } else { - fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION - + document.getVersion(), false); + // Parse the supplied set of operations. + // Iterate over the list of operations which we were provided and + // translate them into a format that ElasticSearh understands. + int opCount = 0; + StringBuilder esOperationSet = new StringBuilder(128); + List rejected = new ArrayList<>(); + for (BulkRequest request : requests) { + + // Convert the request to the syntax ElasticSearch likes. + if (buildEsOperation(request, esOperationSet, rejected)) { + opCount++; + } } - HttpURLConnection conn = initializeConnection(fullUrl); - logger.debug("\nSending 'GET' request to: " + conn.getURL()); + ElasticSearchBulkOperationResult opResult = null; + if (opCount > 0) { + HttpURLConnection conn; + try { + conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection(); + conn.setRequestMethod(HttpMethod.PUT); + conn.setDoOutput(true); + conn.setRequestProperty(CONTENT_TYPE, APPLICATION_XND_JSON_TYPE); + if(config.useAuth()){ + conn.setRequestProperty("Authorization", config.getAuthValue()); + } + conn.setRequestProperty("Connection", "Close"); - handleResponse(conn, opResult); - buildDocumentResult(opResult, indexName); + } catch (IOException e) { - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName, document.getId()); + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug(Throwables.getStackTraceAsString(e)); + } - shutdownConnection(conn); + throw new DocumentStoreOperationException( + "Failed to open connection to document store. Cause: " + e.getMessage(), e); + } - return opResult; - } + StringBuilder bulkResult = new StringBuilder(128); + try { + // Create an output stream to write our request to. + OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream()); - @Override - public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException { - SearchOperationResult opResult = new SearchOperationResult(); + if (logger.isDebugEnabled()) { + logger.debug("ESController: Sending 'BULK' request to " + conn.getURL()); + logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n")); + } - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + // Write the resulting request string to our output stream. (this sends the request to ES?) + out.write(esOperationSet.toString()); + out.close(); - String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false); + // Open an input stream on our connection in order to read back the results. + InputStream is = conn.getInputStream(); + InputStreamReader inputstreamreader = new InputStreamReader(is); + BufferedReader bufferedreader = new BufferedReader(inputstreamreader); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + // Read the contents of the input stream into our result string... + String esResponseString = null; - HttpURLConnection conn = initializeConnection(fullUrl); + while ((esResponseString = bufferedreader.readLine()) != null) { + bulkResult.append(esResponseString).append("\n"); + } - try { - conn.setRequestMethod("GET"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e); - } + } catch (IOException e) { - logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL()); + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + logger.debug(sw.toString()); + } - handleResponse(conn, opResult); - buildSearchResult(opResult, indexName); + throw new DocumentStoreOperationException( + "Failure interacting with document store. Cause: " + e.getMessage(), e); + } + if (logger.isDebugEnabled()) { + logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString()); + } - metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName, queryString); + // ...and marshal the resulting string into a Java object. + try { + opResult = marshallEsBulkResult(bulkResult.toString()); - return opResult; - } + } catch (IOException e) { + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug(Throwables.getStackTraceAsString(e)); + } - @Override - public SearchOperationResult searchWithPayload(String indexName, String query) - throws DocumentStoreOperationException { - SearchOperationResult opResult = new SearchOperationResult(); + throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(), + e); + } + } - if (logger.isDebugEnabled()) { - logger.debug("Querying index: " + indexName + " with query string: " + query); + OperationResult result = new OperationResultBuilder() // + .resultCode(HttpStatus.MULTI_STATUS_207) // + .result(buildGenericBulkResultSet(opResult, rejected)) // + .build(); + + // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate + // it. + String resultStringForMetricsLog = result.getResult(); + if (isSuccess(result)) { + resultStringForMetricsLog = + resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "..."; } - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME, + new LogFields() // + .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog), + override); + + return result; + } + + + /** + * This method queryies ElasticSearch to determine if the supplied index is present in the document store. + * + * @param indexName - The index to look for. + * @return - An operation result indicating the success or failure of the check. + * @throws DocumentStoreOperationException + */ + private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException { + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD); + int resultCode; + try { + resultCode = conn.getResponseCode(); + } catch (IOException ex) { + shutdownConnection(conn); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); + } + logger.debug(MSG_RESPONSE_CODE + resultCode); - String fullUrl = getFullUrl("/" + indexName + "/_search", false); + OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build(); + logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName); + shutdownConnection(conn); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + return opResult; + } - HttpURLConnection conn = initializeConnection(fullUrl); + private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + // check if the document already exists + DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId()); - try { - conn.setRequestMethod("POST"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e); + if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) { + if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) { + opResult.setFailureCause("A document with the same id already exists."); + } else { + opResult.setFailureCause("Failed to verify a document with the specified id does not already exist."); + } + opResult.setResultCode(Status.CONFLICT.getStatusCode()); + return opResult; } - attachContent(conn, query); + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())); + MdcOverride override = getStartTime(new MdcOverride()); - logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL()); - logger.debug("Request body = Elasticsearch query = " + query); + HttpURLConnection conn = createConnection(url, HttpMethod.PUT); + attachDocument(conn, document); - handleResponse(conn, opResult); - buildSearchResult(opResult, indexName); + opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); - metricsLogger - .info(SearchDbMsgs.QUERY_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName, query); + logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName); shutdownConnection(conn); return opResult; } - - @Override - public SearchOperationResult suggestionQueryWithPayload(String indexName, String query) + private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document) throws DocumentStoreOperationException { + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE)); - SearchOperationResult opResult = new SearchOperationResult(); + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(url, HttpMethod.POST); + attachDocument(conn, document); - if (logger.isDebugEnabled()) { - logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query); - } + DocumentOperationResult response = getOperationResult(conn); + buildDocumentResult(response, indexName); - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName); - String fullUrl = getFullUrl("/" + indexName + "/_suggest", false); + shutdownConnection(conn); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + return response; + } - HttpURLConnection conn = initializeConnection(fullUrl); + private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc) + throws DocumentStoreOperationException { + conn.setRequestProperty("Connection", "Close"); + attachContent(conn, doc.getContentInJson()); + } + private DocumentOperationResult checkDocumentExistence(String indexName, String docId) + throws DocumentStoreOperationException { + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = + createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD); + int resultCode; try { - conn.setRequestMethod("POST"); - } catch (ProtocolException e) { + resultCode = conn.getResponseCode(); + } catch (IOException ex) { shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); } - attachContent(conn, query); - - logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL()); - logger.debug("Request body = Elasticsearch query = " + query); - - handleResponse(conn, opResult); - buildSuggestResult(opResult, indexName); + logger.debug(MSG_RESPONSE_CODE + resultCode); - metricsLogger - .info(SearchDbMsgs.QUERY_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName, query); + DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT) + .useDefaults().resultCode(resultCode).build(); + logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId); shutdownConnection(conn); return opResult; @@ -903,20 +733,15 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } } - private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException { - URL url = null; + private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException { HttpURLConnection conn = null; - - try { - url = new URL(fullUrl); - } catch (MalformedURLException e) { - throw new DocumentStoreOperationException("Error building a URL with " + url, e); - } - try { conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON); conn.setDoOutput(true); + if (config.useAuth()) { + conn.setRequestProperty("Authorization", config.getAuthValue()); + } } catch (IOException e) { shutdownConnection(conn); throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e); @@ -925,22 +750,26 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { return conn; } - private void handleResponse(HttpURLConnection conn, OperationResult opResult) + private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException { + return handleResponse(conn, new OperationResultBuilder().useDefaults()); + } + + private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb) throws DocumentStoreOperationException { int resultCode; try { resultCode = conn.getResponseCode(); - } catch (IOException e) { + } catch (IOException ex) { shutdownConnection(conn); - throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); } logger.debug(MSG_RESPONSE_CODE + resultCode); InputStream inputStream = null; - if (!isSuccessCode(resultCode)) { + if (!ApiUtils.isSuccessStatusCode(resultCode)) { inputStream = conn.getErrorStream(); } else { try { @@ -967,15 +796,16 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } if (resultCode == Status.CONFLICT.getStatusCode()) { - opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode()); + rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode()); } else { - opResult.setResultCode(resultCode); + rb.resultCode(resultCode); } if (logger.isDebugEnabled()) { logger.debug("Raw result string from ElasticSearch = " + result.toString()); } - opResult.setResult(result.toString()); - opResult.setResultVersion(extractVersion(result.toString())); + rb.result(result.toString()); + rb.resultVersion(extractVersion(result.toString())); + return rb.build(); } private String extractVersion(String result) { @@ -1007,8 +837,7 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { // Grab the current time... long startTimeInMs = System.currentTimeMillis(); - // ...and add it as an attribute to the supplied MDC Override - // object. + // ...and add it as an attribute to the supplied MDC Override object. SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); @@ -1023,151 +852,91 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } private boolean isSuccess(OperationResult result) { - return isSuccessCode(result.getResultCode()); + return ApiUtils.isSuccessStatusCode(result.getResultCode()); } - private boolean isSuccessCode(int statusCode) { - return Family.familyOf(statusCode).equals(Family.SUCCESSFUL); + private UriBuilder createUriBuilder(String path, String... paths) { + UriBuilder builder = UriBuilder.fromPath(path); + for (String other : paths) { + builder.path(other); + } + builder.host(config.getIpAddress()); + String port = Optional.ofNullable(config.getHttpPort()).orElse("0"); + builder.port(Integer.valueOf(port)); + builder.scheme(config.getUriScheme()); + return builder; } - @Override - public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException { - - if (logger.isDebugEnabled()) { - StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: "); - - for (BulkRequest request : requests) { - dbgString.append("[").append(request).append("] "); - } - - logger.debug(dbgString.toString()); + private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException { + try { + return builder.build().toURL(); + } catch (MalformedURLException e) { + logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage()); + throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e); } + } - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - // Parse the supplied set of operations. - // Iterate over the list of operations which we were provided and - // translate them into a format that ElasticSearh understands. - int opCount = 0; - StringBuilder esOperationSet = new StringBuilder(128); - List rejected = new ArrayList<>(); - for (BulkRequest request : requests) { - - // Convert the request to the syntax ElasticSearch likes. - if (buildEsOperation(request, esOperationSet, rejected)) { - opCount++; - } + private HttpURLConnection createConnection(final URL url, final String method) + throws DocumentStoreOperationException { + HttpURLConnection conn = initializeConnection(url); + try { + logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL()); + conn.setRequestMethod(method); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method); } + return conn; + } - ElasticSearchBulkOperationResult opResult = null; - if (opCount > 0) { - - // Open an HTTP connection to the ElasticSearch back end. - String fullUrl = getFullUrl("/_bulk", false); - URL url; - HttpURLConnection conn; - try { - - url = new URL(fullUrl); - conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("PUT"); - conn.setDoOutput(true); - conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED); - conn.setRequestProperty("Connection", "Close"); - - } catch (IOException e) { - - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - logger.debug(Throwables.getStackTraceAsString(e)); - } - - throw new DocumentStoreOperationException( - "Failed to open connection to document store. Cause: " + e.getMessage(), e); - } - - StringBuilder bulkResult = new StringBuilder(128); - try { - // Create an output stream to write our request to. - OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream()); - - if (logger.isDebugEnabled()) { - logger.debug("ESController: Sending 'BULK' request to " + conn.getURL()); - logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n")); - } - - // Write the resulting request string to our output stream. (this sends the request to ES?) - out.write(esOperationSet.toString()); - out.close(); - - // Open an input stream on our connection in order to read back the results. - InputStream is = conn.getInputStream(); - InputStreamReader inputstreamreader = new InputStreamReader(is); - BufferedReader bufferedreader = new BufferedReader(inputstreamreader); - - // Read the contents of the input stream into our result string... - String esResponseString = null; + private OperationResult checkConnection() throws IOException, DocumentStoreOperationException { + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET); + int resultCode = conn.getResponseCode(); + logger.debug("getClusterHealth() response Code : " + resultCode); + shutdownConnection(conn); + return new OperationResultBuilder().resultCode(resultCode).build(); + } - while ((esResponseString = bufferedreader.readLine()) != null) { - bulkResult.append(esResponseString).append("\n"); - } + private void shutdownConnection(HttpURLConnection connection) { + if (connection == null) { + return; + } - } catch (IOException e) { + final String methodName = "shutdownConnection"; + InputStream inputstream = null; + OutputStream outputstream = null; - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - logger.debug(sw.toString()); + try { + inputstream = connection.getInputStream(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); + } finally { + if (inputstream != null) { + try { + inputstream.close(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); } - - throw new DocumentStoreOperationException( - "Failure interacting with document store. Cause: " + e.getMessage(), e); - } - - if (logger.isDebugEnabled()) { - logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString()); } + } - // ...and marshal the resulting string into a Java object. - try { - opResult = marshallEsBulkResult(bulkResult.toString()); - - } catch (IOException e) { - - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - logger.debug(Throwables.getStackTraceAsString(e)); + try { + outputstream = connection.getOutputStream(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); + } finally { + if (outputstream != null) { + try { + outputstream.close(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); } - - throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(), - e); } } - // Finally, build the operation result and return it to the caller. - OperationResult result = new OperationResult(); - result.setResultCode(207); - result.setResult(buildGenericBulkResultSet(opResult, rejected)); - - // In the success case we don't want the entire result string to be - // dumped into the metrics log, so concatenate it. - String resultStringForMetricsLog = result.getResult(); - if (isSuccess(result)) { - resultStringForMetricsLog = - resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "..."; - } - - metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog), - override); - - return result; + connection.disconnect(); } - /** * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch. * @@ -1453,7 +1222,7 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { // Increment the operation counts. totalOps++; - if (isSuccessCode(item.operationStatus().getStatus())) { + if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) { totalSuccess++; } else { totalFails++; @@ -1473,59 +1242,14 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}"; } - - /** - * This method queryies ElasticSearch to determine if the supplied index is present in the document store. - * - * @param indexName - The index to look for. - * @return - An operation result indicating the success or failure of the check. - * @throws DocumentStoreOperationException - */ - public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException { - - // Initialize operation result with a failure codes / fault string - OperationResult opResult = new OperationResult(); - opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode()); - - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - String fullUrl = getFullUrl("/" + indexName, false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("HEAD"); - - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e); - } - - logger.debug("Sending 'HEAD' request to: " + conn.getURL()); - - int resultCode; - try { - resultCode = conn.getResponseCode(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e); - } - logger.debug(MSG_RESPONSE_CODE + resultCode); - - opResult.setResultCode(resultCode); - - // Generate a metrics log so we can track how long the operation took. - metricsLogger - .info(SearchDbMsgs.CHECK_INDEX_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), - override, indexName); - - shutdownConnection(conn); - - return opResult; + private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException { + return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults()); } + private SearchOperationResult getSearchOperationResult(HttpURLConnection conn) + throws DocumentStoreOperationException { + return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults()); + } private void buildDocumentResult(DocumentOperationResult result, String index) throws DocumentStoreOperationException { @@ -1562,7 +1286,6 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException { - JSONParser parser = new JSONParser(); JSONObject root; @@ -1612,7 +1335,6 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } catch (Exception e) { throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult()); } - } private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException { @@ -1669,4 +1391,18 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult()); } } + + /** + * Record the timing of the operation in the metrics log. + * + */ + private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult, + String... args) { + metricsLogger.info(message, + new LogFields() // + .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()) + .setField(LogLine.DefinedFields.SERVER_IP, "ElasticHost-"+config.getIpAddress()), + override, args); + } }