X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Faai%2Fsa%2Fsearchdbabstraction%2Felasticsearch%2Fdao%2FElasticSearchHttpController.java;h=6d08c1dcd9d2460685c11685f454af6c61cfb5c3;hb=ecd35f117eb0118f9cbd96fe132a1c359214bfc7;hp=de38d28c1d1363d23f5461fd9eb165160bc86781;hpb=0ed6c8a6bc6a2b067d11b14bb0a5be8e8680dd4f;p=aai%2Fsearch-data-service.git diff --git a/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java b/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java index de38d28..6d08c1d 100644 --- a/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java +++ b/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java @@ -49,10 +49,12 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; +import javax.ws.rs.HttpMethod; import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.Response.Status.Family; +import javax.ws.rs.core.UriBuilder; import org.eclipse.jetty.http.HttpStatus; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @@ -71,6 +73,7 @@ import org.onap.aai.sa.rest.BulkRequest.OperationType; import org.onap.aai.sa.rest.DocumentSchema; import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig; import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException; +import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage; import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult; import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults; import org.onap.aai.sa.searchdbabstraction.entity.Document; @@ -102,20 +105,18 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { private static final Logger metricsLogger = LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName()); + private static final String URL_QUERY_VERSION = "version="; + private static final String JSON_ATTR_VERSION = "_version"; private static final String JSON_ATTR_ERROR = "error"; private static final String JSON_ATTR_REASON = "reason"; private static final String DEFAULT_TYPE = "default"; - private static final String QUERY_PARAM_VERSION = "?version="; private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: "; private static final String MSG_RESPONSE_CODE = "Response Code : "; private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: "; - private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT."; - private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST."; - private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION = - "Failed to get the response code from the connection."; + private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response."; private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE = @@ -136,13 +137,14 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { this.config = config; analysisConfig = new AnalysisConfiguration(); + String rootUrl = null; try { - logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false)); + rootUrl = buildUrl(createUriBuilder("")).toString(); + logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl); checkConnection(); - logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false)); + logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl); } catch (Exception e) { - logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false), - e.getMessage()); + logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage()); } } @@ -191,7 +193,6 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { return new OperationResultBuilder().useDefaults() .failureCause("Document store operation failure. Cause: " + e.getMessage()).build(); } - } @Override @@ -216,21 +217,9 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { @Override public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException { - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); - - logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL()); - - try { - conn.setRequestMethod("DELETE"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e); - } - + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE); OperationResult opResult = handleResponse(conn); logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName); shutdownConnection(conn); @@ -238,77 +227,6 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { return opResult; } - private OperationResult checkConnection() throws IOException { - String fullUrl = getFullUrl("/_cluster/health", false); - URL url = null; - HttpURLConnection conn = null; - - url = new URL(fullUrl); - conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("GET"); - conn.setDoOutput(true); - logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url); - - int resultCode = conn.getResponseCode(); - logger.debug("getClusterHealth() response Code : " + resultCode); - - shutdownConnection(conn); - - return new OperationResultBuilder().resultCode(resultCode).build(); - } - - private String getFullUrl(String resourceUrl, boolean isSecure) { - - final String host = config.getIpAddress(); - final String port = config.getHttpPort(); - - if (isSecure) { - return String.format("https://%s:%s%s", host, port, resourceUrl); - } else { - return String.format("http://%s:%s%s", host, port, resourceUrl); - } - } - - private void shutdownConnection(HttpURLConnection connection) { - if (connection == null) { - return; - } - - final String methodName = "shutdownConnection"; - InputStream inputstream = null; - OutputStream outputstream = null; - - try { - inputstream = connection.getInputStream(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); - } finally { - if (inputstream != null) { - try { - inputstream.close(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); - } - } - } - - try { - outputstream = connection.getOutputStream(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); - } finally { - if (outputstream != null) { - try { - outputstream.close(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); - } - } - } - - connection.disconnect(); - } - // @Override protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings) throws DocumentStoreOperationException { @@ -320,18 +238,9 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { logger.debug("No mappings provided."); } - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e); - } + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT); StringBuilder sb = new StringBuilder(128); sb.append("{ \"settings\" : "); @@ -350,7 +259,6 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { throw new DocumentStoreOperationException(e.getMessage(), e); } - logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL()); logger.debug("Request content: " + sb); OperationResult opResult = handleResponse(conn); @@ -371,19 +279,9 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { */ protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException { - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e); - } - + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT); try { attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings)); } catch (IOException e) { @@ -421,323 +319,382 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } } - private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document) - throws DocumentStoreOperationException { - // check if the document already exists - DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId()); - - if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) { - if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) { - opResult.setFailureCause("A document with the same id already exists."); - } else { - opResult.setFailureCause("Failed to verify a document with the specified id does not already exist."); + @Override + public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document, + boolean allowImplicitIndexCreation) throws DocumentStoreOperationException { + if (!allowImplicitIndexCreation) { + // Before we do anything, make sure that the specified index actually exists in the + // document store - we don't want to rely on ElasticSearch to fail the document + // create because it could be configured to implicitly create a non-existent index, + // which can lead to hard-to-debug behaviour with queries down the road. + OperationResult indexExistsResult = checkIndexExistence(indexName); + if (!isSuccess(indexExistsResult)) { + DocumentOperationResult opResult = new DocumentOperationResult(); + opResult.setResultCode(Status.NOT_FOUND.getStatusCode()); + String resultMsg = "Document Index '" + indexName + "' does not exist."; + opResult.setResult(resultMsg); + opResult.setFailureCause(resultMsg); + return opResult; } - opResult.setResultCode(Status.CONFLICT.getStatusCode()); - return opResult; } - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e); - } + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()) + .replaceQuery(URL_QUERY_VERSION + document.getVersion())); + HttpURLConnection conn = createConnection(url, HttpMethod.PUT); attachDocument(conn, document); - logger.debug("Sending 'PUT' request to: " + conn.getURL()); - - opResult = getOperationResult(conn); + DocumentOperationResult opResult = getOperationResult(conn); buildDocumentResult(opResult, indexName); - logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName); + logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId()); shutdownConnection(conn); return opResult; } - private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document) + @Override + public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document) throws DocumentStoreOperationException { - // Grab the current time so we can use it to generate a metrics log. + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()) + .replaceQuery(URL_QUERY_VERSION + document.getVersion())); + MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false); - HttpURLConnection conn = initializeConnection(fullUrl); + HttpURLConnection conn = createConnection(url, HttpMethod.DELETE); - try { - conn.setRequestMethod("POST"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e); + DocumentOperationResult opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); + // supress the etag and url in response for delete as they are not required + if (opResult.getDocument() != null) { + opResult.getDocument().setEtag(null); + opResult.getDocument().setUrl(null); } - attachDocument(conn, document); + logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId()); - logger.debug("Sending 'POST' request to: " + conn.getURL()); + shutdownConnection(conn); - DocumentOperationResult response = getOperationResult(conn); - buildDocumentResult(response, indexName); + return opResult; + } - logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName); + @Override + public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId()); + if (document.getVersion() != null) { + uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion()); + } + + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET"); + + DocumentOperationResult opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId()); shutdownConnection(conn); - return response; + return opResult; } - private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc) - throws DocumentStoreOperationException { - conn.setRequestProperty("Connection", "Close"); - attachContent(conn, doc.getContentInJson()); + @Override + public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException { + final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString)); + + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(url, "GET"); + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSearchResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString); + + return opResult; } - private DocumentOperationResult checkDocumentExistence(String indexName, String docId) + @Override + public SearchOperationResult searchWithPayload(String indexName, String query) throws DocumentStoreOperationException { - // Grab the current time so we can use it to generate a metrics log. + if (logger.isDebugEnabled()) { + logger.debug("Querying index: " + indexName + " with query string: " + query); + } + final URL url = buildUrl(createUriBuilder(indexName, "_search")); + MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false); - HttpURLConnection conn = initializeConnection(fullUrl); + HttpURLConnection conn = createConnection(url, HttpMethod.POST); + attachContent(conn, query); + logger.debug("Request body = Elasticsearch query = " + query); - try { - conn.setRequestMethod("HEAD"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e); - } + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSearchResult(opResult, indexName); - logger.debug("Sending 'HEAD' request to: " + conn.getURL()); + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query); - int resultCode; - try { - resultCode = conn.getResponseCode(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e); + shutdownConnection(conn); + + return opResult; + } + + @Override + public SearchOperationResult suggestionQueryWithPayload(String indexName, String query) + throws DocumentStoreOperationException { + if (logger.isDebugEnabled()) { + logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query); } - logger.debug(MSG_RESPONSE_CODE + resultCode); + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST); + attachContent(conn, query); - DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT) - .useDefaults().resultCode(resultCode).build(); + logger.debug("Request body = Elasticsearch query = " + query); + + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSuggestResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query); - logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId); shutdownConnection(conn); return opResult; } @Override - public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document, - boolean allowImplicitIndexCreation) throws DocumentStoreOperationException { + public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException { + if (logger.isDebugEnabled()) { + StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: "); - if (!allowImplicitIndexCreation) { - // Before we do anything, make sure that the specified index actually exists in the - // document store - we don't want to rely on ElasticSearch to fail the document - // create because it could be configured to implicitly create a non-existent index, - // which can lead to hard-to-debug behaviour with queries down the road. - OperationResult indexExistsResult = checkIndexExistence(indexName); - if (!isSuccess(indexExistsResult)) { - DocumentOperationResult opResult = new DocumentOperationResult(); - opResult.setResultCode(Status.NOT_FOUND.getStatusCode()); - String resultMsg = "Document Index '" + indexName + "' does not exist."; - opResult.setResult(resultMsg); - opResult.setFailureCause(resultMsg); - return opResult; + for (BulkRequest request : requests) { + dbgString.append("[").append(request).append("] "); } + + logger.debug(dbgString.toString()); } - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION - + document.getVersion(), false); - HttpURLConnection conn = initializeConnection(fullUrl); + // Parse the supplied set of operations. + // Iterate over the list of operations which we were provided and + // translate them into a format that ElasticSearh understands. + int opCount = 0; + StringBuilder esOperationSet = new StringBuilder(128); + List rejected = new ArrayList<>(); + for (BulkRequest request : requests) { - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e); + // Convert the request to the syntax ElasticSearch likes. + if (buildEsOperation(request, esOperationSet, rejected)) { + opCount++; + } } - attachDocument(conn, document); + ElasticSearchBulkOperationResult opResult = null; + if (opCount > 0) { + HttpURLConnection conn; + try { + conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection(); + conn.setRequestMethod(HttpMethod.PUT); + conn.setDoOutput(true); + conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED); + conn.setRequestProperty("Connection", "Close"); - logger.debug("Sending 'PUT' request to: " + conn.getURL()); + } catch (IOException e) { - DocumentOperationResult opResult = getOperationResult(conn); - buildDocumentResult(opResult, indexName); + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug(Throwables.getStackTraceAsString(e)); + } - logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId()); + throw new DocumentStoreOperationException( + "Failed to open connection to document store. Cause: " + e.getMessage(), e); + } - shutdownConnection(conn); + StringBuilder bulkResult = new StringBuilder(128); + try { + // Create an output stream to write our request to. + OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream()); - return opResult; - } + if (logger.isDebugEnabled()) { + logger.debug("ESController: Sending 'BULK' request to " + conn.getURL()); + logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n")); + } - @Override - public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document) - throws DocumentStoreOperationException { - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + // Write the resulting request string to our output stream. (this sends the request to ES?) + out.write(esOperationSet.toString()); + out.close(); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION - + document.getVersion(), false); - HttpURLConnection conn = initializeConnection(fullUrl); + // Open an input stream on our connection in order to read back the results. + InputStream is = conn.getInputStream(); + InputStreamReader inputstreamreader = new InputStreamReader(is); + BufferedReader bufferedreader = new BufferedReader(inputstreamreader); - try { - conn.setRequestMethod("DELETE"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e); - } + // Read the contents of the input stream into our result string... + String esResponseString = null; - logger.debug("\nSending 'DELETE' request to " + conn.getURL()); + while ((esResponseString = bufferedreader.readLine()) != null) { + bulkResult.append(esResponseString).append("\n"); + } - DocumentOperationResult opResult = getOperationResult(conn); - buildDocumentResult(opResult, indexName); - // supress the etag and url in response for delete as they are not required - if (opResult.getDocument() != null) { - opResult.getDocument().setEtag(null); - opResult.getDocument().setUrl(null); - } + } catch (IOException e) { - logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId()); + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + logger.debug(sw.toString()); + } - shutdownConnection(conn); + throw new DocumentStoreOperationException( + "Failure interacting with document store. Cause: " + e.getMessage(), e); + } - return opResult; - } + if (logger.isDebugEnabled()) { + logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString()); + } - @Override - public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document) - throws DocumentStoreOperationException { - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + // ...and marshal the resulting string into a Java object. + try { + opResult = marshallEsBulkResult(bulkResult.toString()); - String fullUrl = null; - if (document.getVersion() == null) { - fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false); - } else { - fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION - + document.getVersion(), false); - } - HttpURLConnection conn = initializeConnection(fullUrl); + } catch (IOException e) { + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug(Throwables.getStackTraceAsString(e)); + } - logger.debug("\nSending 'GET' request to: " + conn.getURL()); + throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(), + e); + } + } - DocumentOperationResult opResult = getOperationResult(conn); - buildDocumentResult(opResult, indexName); + OperationResult result = new OperationResultBuilder() // + .resultCode(HttpStatus.MULTI_STATUS_207) // + .result(buildGenericBulkResultSet(opResult, rejected)) // + .build(); - logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId()); + // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate + // it. + String resultStringForMetricsLog = result.getResult(); + if (isSuccess(result)) { + resultStringForMetricsLog = + resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "..."; + } - shutdownConnection(conn); + metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME, + new LogFields() // + .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog), + override); - return opResult; + return result; } - @Override - public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException { - - String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false); - // Grab the current time so we can use it to generate a metrics log. + /** + * This method queryies ElasticSearch to determine if the supplied index is present in the document store. + * + * @param indexName - The index to look for. + * @return - An operation result indicating the success or failure of the check. + * @throws DocumentStoreOperationException + */ + private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException { MdcOverride override = getStartTime(new MdcOverride()); - HttpURLConnection conn = initializeConnection(fullUrl); - + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD); + int resultCode; try { - conn.setRequestMethod("GET"); - } catch (ProtocolException e) { + resultCode = conn.getResponseCode(); + } catch (IOException ex) { shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); } + logger.debug(MSG_RESPONSE_CODE + resultCode); - logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL()); + OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build(); + logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName); + shutdownConnection(conn); - SearchOperationResult opResult = getSearchOperationResult(conn); - buildSearchResult(opResult, indexName); + return opResult; + } + + private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + // check if the document already exists + DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId()); + + if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) { + if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) { + opResult.setFailureCause("A document with the same id already exists."); + } else { + opResult.setFailureCause("Failed to verify a document with the specified id does not already exist."); + } + opResult.setResultCode(Status.CONFLICT.getStatusCode()); + return opResult; + } + + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())); + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(url, HttpMethod.PUT); + attachDocument(conn, document); + + opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); - logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString); + logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName); + + shutdownConnection(conn); return opResult; } - @Override - public SearchOperationResult searchWithPayload(String indexName, String query) + private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document) throws DocumentStoreOperationException { - if (logger.isDebugEnabled()) { - logger.debug("Querying index: " + indexName + " with query string: " + query); - } + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE)); - String fullUrl = getFullUrl("/" + indexName + "/_search", false); - - // Grab the current time so we can use it to generate a metrics log. MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(url, HttpMethod.POST); + attachDocument(conn, document); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("POST"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e); - } - - attachContent(conn, query); - - logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL()); - logger.debug("Request body = Elasticsearch query = " + query); - - SearchOperationResult opResult = getSearchOperationResult(conn); - buildSearchResult(opResult, indexName); + DocumentOperationResult response = getOperationResult(conn); + buildDocumentResult(response, indexName); - logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query); + logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName); shutdownConnection(conn); - return opResult; + return response; } - - @Override - public SearchOperationResult suggestionQueryWithPayload(String indexName, String query) + private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc) throws DocumentStoreOperationException { - if (logger.isDebugEnabled()) { - logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query); - } - - String fullUrl = getFullUrl("/" + indexName + "/_suggest", false); + conn.setRequestProperty("Connection", "Close"); + attachContent(conn, doc.getContentInJson()); + } - // Grab the current time so we can use it to generate a metrics log. + private DocumentOperationResult checkDocumentExistence(String indexName, String docId) + throws DocumentStoreOperationException { MdcOverride override = getStartTime(new MdcOverride()); - - HttpURLConnection conn = initializeConnection(fullUrl); - + HttpURLConnection conn = + createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD); + int resultCode; try { - conn.setRequestMethod("POST"); - } catch (ProtocolException e) { + resultCode = conn.getResponseCode(); + } catch (IOException ex) { shutdownConnection(conn); - throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); } - attachContent(conn, query); - - logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL()); - logger.debug("Request body = Elasticsearch query = " + query); - - SearchOperationResult opResult = getSearchOperationResult(conn); - buildSuggestResult(opResult, indexName); + logger.debug(MSG_RESPONSE_CODE + resultCode); - logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query); + DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT) + .useDefaults().resultCode(resultCode).build(); + logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId); shutdownConnection(conn); return opResult; @@ -765,16 +722,8 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } } - private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException { - URL url = null; + private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException { HttpURLConnection conn = null; - - try { - url = new URL(fullUrl); - } catch (MalformedURLException e) { - throw new DocumentStoreOperationException("Error building a URL with " + url, e); - } - try { conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON); @@ -797,16 +746,16 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { try { resultCode = conn.getResponseCode(); - } catch (IOException e) { + } catch (IOException ex) { shutdownConnection(conn); - throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); } logger.debug(MSG_RESPONSE_CODE + resultCode); InputStream inputStream = null; - if (!isSuccessCode(resultCode)) { + if (!ApiUtils.isSuccessStatusCode(resultCode)) { inputStream = conn.getErrorStream(); } else { try { @@ -874,8 +823,7 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { // Grab the current time... long startTimeInMs = System.currentTimeMillis(); - // ...and add it as an attribute to the supplied MDC Override - // object. + // ...and add it as an attribute to the supplied MDC Override object. SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); @@ -890,148 +838,89 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { } private boolean isSuccess(OperationResult result) { - return isSuccessCode(result.getResultCode()); + return ApiUtils.isSuccessStatusCode(result.getResultCode()); } - private boolean isSuccessCode(int statusCode) { - return Family.familyOf(statusCode).equals(Family.SUCCESSFUL); + private UriBuilder createUriBuilder(String path, String... paths) { + UriBuilder builder = UriBuilder.fromPath(path); + for (String other : paths) { + builder.path(other); + } + builder.host(config.getIpAddress()); + String port = Optional.ofNullable(config.getHttpPort()).orElse("0"); + builder.port(Integer.valueOf(port)); + builder.scheme("http"); + return builder; } - @Override - public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException { - if (logger.isDebugEnabled()) { - StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: "); - - for (BulkRequest request : requests) { - dbgString.append("[").append(request).append("] "); - } - - logger.debug(dbgString.toString()); + private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException { + try { + return builder.build().toURL(); + } catch (MalformedURLException e) { + logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage()); + throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e); } + } - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - // Parse the supplied set of operations. - // Iterate over the list of operations which we were provided and - // translate them into a format that ElasticSearh understands. - int opCount = 0; - StringBuilder esOperationSet = new StringBuilder(128); - List rejected = new ArrayList<>(); - for (BulkRequest request : requests) { - - // Convert the request to the syntax ElasticSearch likes. - if (buildEsOperation(request, esOperationSet, rejected)) { - opCount++; - } + private HttpURLConnection createConnection(final URL url, final String method) + throws DocumentStoreOperationException { + HttpURLConnection conn = initializeConnection(url); + try { + logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL()); + conn.setRequestMethod(method); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method); } + return conn; + } - ElasticSearchBulkOperationResult opResult = null; - if (opCount > 0) { - - // Open an HTTP connection to the ElasticSearch back end. - String fullUrl = getFullUrl("/_bulk", false); - URL url; - HttpURLConnection conn; - try { - - url = new URL(fullUrl); - conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("PUT"); - conn.setDoOutput(true); - conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED); - conn.setRequestProperty("Connection", "Close"); - - } catch (IOException e) { - - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - logger.debug(Throwables.getStackTraceAsString(e)); - } - - throw new DocumentStoreOperationException( - "Failed to open connection to document store. Cause: " + e.getMessage(), e); - } - - StringBuilder bulkResult = new StringBuilder(128); - try { - // Create an output stream to write our request to. - OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream()); - - if (logger.isDebugEnabled()) { - logger.debug("ESController: Sending 'BULK' request to " + conn.getURL()); - logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n")); - } - - // Write the resulting request string to our output stream. (this sends the request to ES?) - out.write(esOperationSet.toString()); - out.close(); - - // Open an input stream on our connection in order to read back the results. - InputStream is = conn.getInputStream(); - InputStreamReader inputstreamreader = new InputStreamReader(is); - BufferedReader bufferedreader = new BufferedReader(inputstreamreader); - - // Read the contents of the input stream into our result string... - String esResponseString = null; + private OperationResult checkConnection() throws IOException, DocumentStoreOperationException { + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET); + int resultCode = conn.getResponseCode(); + logger.debug("getClusterHealth() response Code : " + resultCode); + shutdownConnection(conn); + return new OperationResultBuilder().resultCode(resultCode).build(); + } - while ((esResponseString = bufferedreader.readLine()) != null) { - bulkResult.append(esResponseString).append("\n"); - } + private void shutdownConnection(HttpURLConnection connection) { + if (connection == null) { + return; + } - } catch (IOException e) { + final String methodName = "shutdownConnection"; + InputStream inputstream = null; + OutputStream outputstream = null; - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - logger.debug(sw.toString()); + try { + inputstream = connection.getInputStream(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); + } finally { + if (inputstream != null) { + try { + inputstream.close(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); } - - throw new DocumentStoreOperationException( - "Failure interacting with document store. Cause: " + e.getMessage(), e); - } - - if (logger.isDebugEnabled()) { - logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString()); } + } - // ...and marshal the resulting string into a Java object. - try { - opResult = marshallEsBulkResult(bulkResult.toString()); - - } catch (IOException e) { - - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - logger.debug(Throwables.getStackTraceAsString(e)); + try { + outputstream = connection.getOutputStream(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); + } finally { + if (outputstream != null) { + try { + outputstream.close(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); } - - throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(), - e); } } - OperationResult result = new OperationResultBuilder() // - .resultCode(HttpStatus.MULTI_STATUS_207) // - .result(buildGenericBulkResultSet(opResult, rejected)) // - .build(); - - // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate - // it. - String resultStringForMetricsLog = result.getResult(); - if (isSuccess(result)) { - resultStringForMetricsLog = - resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "..."; - } - - metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME, - new LogFields() // - .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog), - override); - - return result; + connection.disconnect(); } /** @@ -1319,7 +1208,7 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { // Increment the operation counts. totalOps++; - if (isSuccessCode(item.operationStatus().getStatus())) { + if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) { totalSuccess++; } else { totalFails++; @@ -1339,47 +1228,6 @@ public class ElasticSearchHttpController implements DocumentStoreInterface { + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}"; } - - /** - * This method queryies ElasticSearch to determine if the supplied index is present in the document store. - * - * @param indexName - The index to look for. - * @return - An operation result indicating the success or failure of the check. - * @throws DocumentStoreOperationException - */ - public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException { - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - String fullUrl = getFullUrl("/" + indexName, false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("HEAD"); - - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e); - } - - logger.debug("Sending 'HEAD' request to: " + conn.getURL()); - - int resultCode; - try { - resultCode = conn.getResponseCode(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e); - } - logger.debug(MSG_RESPONSE_CODE + resultCode); - - OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build(); - logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName); - shutdownConnection(conn); - - return opResult; - } - private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException { return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults()); }