import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.Response.Status.Family;
+import javax.ws.rs.core.UriBuilder;
+import org.eclipse.jetty.http.HttpStatus;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.onap.aai.sa.rest.DocumentSchema;
import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
+import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage;
import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
import org.onap.aai.sa.searchdbabstraction.entity.Document;
import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
+import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
+import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
private static final Logger metricsLogger =
LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
+ private static final String URL_QUERY_VERSION = "version=";
+
private static final String JSON_ATTR_VERSION = "_version";
private static final String JSON_ATTR_ERROR = "error";
private static final String JSON_ATTR_REASON = "reason";
- private static final String QUERY_PARAM_VERSION = "?version=";
+ private static final String DEFAULT_TYPE = "default";
private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
private static final String MSG_RESPONSE_CODE = "Response Code : ";
private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
- private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
- private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
- private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
- "Internal Error: ElasticSearch operation fault occurred";
- private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
- "Failed to get the response code from the connection.";
+
private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
private final ElasticSearchConfig config;
- private static final String DEFAULT_TYPE = "default";
-
protected AnalysisConfiguration analysisConfig;
+
+ public ElasticSearchHttpController(ElasticSearchConfig config) {
+ this.config = config;
+ analysisConfig = new AnalysisConfiguration();
+
+ String rootUrl = null;
+ try {
+ rootUrl = buildUrl(createUriBuilder("")).toString();
+ logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl);
+ checkConnection();
+ logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl);
+ } catch (Exception e) {
+ logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage());
+ }
+ }
+
public static ElasticSearchHttpController getInstance() {
synchronized (ElasticSearchHttpController.class) {
if (instance == null) {
return instance;
}
- public ElasticSearchHttpController(ElasticSearchConfig config) {
- this.config = config;
- analysisConfig = new AnalysisConfiguration();
-
- try {
- logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
- checkConnection();
- logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
- } catch (Exception e) {
- logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
- e.getMessage());
- }
- }
-
public AnalysisConfiguration getAnalysisConfig() {
return analysisConfig;
}
@Override
public OperationResult createIndex(String index, DocumentSchema documentSchema) {
- OperationResult result = new OperationResult();
- result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
-
try {
-
- // Submit the request to ElasticSearch to create the index using a
- // default document type.
- result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
+ // Submit the request to ElasticSearch to create the index using a default document type.
+ OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
DocumentSchemaUtil.generateDocumentMappings(documentSchema));
// ElasticSearch will return us a 200 code on success when we
if (isSuccess(result)) {
result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
}
+ return result;
} catch (DocumentStoreOperationException | IOException e) {
- result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
+ return new OperationResultBuilder().useDefaults()
+ .failureCause("Document store operation failure. Cause: " + e.getMessage()).build();
}
-
- return result;
}
@Override
public OperationResult createDynamicIndex(String index, String dynamicSchema) {
- OperationResult result = new OperationResult();
- result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
-
try {
- result = createTable(index, dynamicSchema);
+ OperationResult result = createTable(index, dynamicSchema);
// ElasticSearch will return us a 200 code on success when we
// want to report a 201, so translate the result here.
if (isSuccess(result)) {
result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
}
+ return result;
} catch (DocumentStoreOperationException e) {
- result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
+ return new OperationResultBuilder().useDefaults()
+ .failureCause("Document store operation failure. Cause: " + e.getMessage()).build();
}
-
- return result;
}
@Override
public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
-
- // Initialize operation result with a failure codes / fault string
- OperationResult opResult = new OperationResult();
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
-
- // Grab the current time so we can use it to generate a metrics log.
MdcOverride override = getStartTime(new MdcOverride());
- String fullUrl = getFullUrl("/" + indexName + "/", false);
- HttpURLConnection conn = initializeConnection(fullUrl);
-
- logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
-
- try {
- conn.setRequestMethod("DELETE");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
- }
-
- handleResponse(conn, opResult);
-
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger
- .info(SearchDbMsgs.DELETE_INDEX_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
- override, indexName);
-
+ HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE);
+ OperationResult opResult = handleResponse(conn);
+ logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
shutdownConnection(conn);
return opResult;
}
- private OperationResult checkConnection() throws IOException {
- String fullUrl = getFullUrl("/_cluster/health", false);
- URL url = null;
- HttpURLConnection conn = null;
-
- url = new URL(fullUrl);
- conn = (HttpURLConnection) url.openConnection();
- conn.setRequestMethod("GET");
- conn.setDoOutput(true);
- logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
-
- int resultCode = conn.getResponseCode();
- logger.debug("getClusterHealth() response Code : " + resultCode);
- OperationResult opResult = new OperationResult();
- opResult.setResultCode(resultCode);
-
- shutdownConnection(conn);
-
- return opResult;
- }
-
- private String getFullUrl(String resourceUrl, boolean isSecure) {
-
- final String host = config.getIpAddress();
- final String port = config.getHttpPort();
-
- if (isSecure) {
- return String.format("https://%s:%s%s", host, port, resourceUrl);
- } else {
- return String.format("http://%s:%s%s", host, port, resourceUrl);
- }
- }
-
- private void shutdownConnection(HttpURLConnection connection) {
- if (connection == null) {
- return;
- }
-
- final String methodName = "shutdownConnection";
- InputStream inputstream = null;
- OutputStream outputstream = null;
-
- try {
- inputstream = connection.getInputStream();
- } catch (IOException e) {
- logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
- } finally {
- if (inputstream != null) {
- try {
- inputstream.close();
- } catch (IOException e) {
- logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
- }
- }
- }
-
- try {
- outputstream = connection.getOutputStream();
- } catch (IOException e) {
- logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
- } finally {
- if (outputstream != null) {
- try {
- outputstream.close();
- } catch (IOException e) {
- logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
- }
- }
- }
-
- connection.disconnect();
- }
-
// @Override
protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
throws DocumentStoreOperationException {
-
if (indexSettings == null) {
logger.debug("No settings provided.");
}
logger.debug("No mappings provided.");
}
- OperationResult opResult = new OperationResult();
-
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
-
- // Grab the current time so we can use it to generate a metrics log.
MdcOverride override = getStartTime(new MdcOverride());
- String fullUrl = getFullUrl("/" + indexName + "/", false);
- HttpURLConnection conn = initializeConnection(fullUrl);
-
- try {
- conn.setRequestMethod("PUT");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
- }
+ HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
StringBuilder sb = new StringBuilder(128);
sb.append("{ \"settings\" : ");
throw new DocumentStoreOperationException(e.getMessage(), e);
}
- logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
- logger.debug("Request content: " + sb.toString());
-
- handleResponse(conn, opResult);
+ logger.debug("Request content: " + sb);
+ OperationResult opResult = handleResponse(conn);
shutdownConnection(conn);
-
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger
- .info(SearchDbMsgs.CREATE_INDEX_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
- override, indexName);
+ logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
return opResult;
}
*/
protected OperationResult createTable(String indexName, String settingsAndMappings)
throws DocumentStoreOperationException {
- OperationResult result = new OperationResult();
- result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
-
- // Grab the current time so we can use it to generate a metrics log.
MdcOverride override = getStartTime(new MdcOverride());
- String fullUrl = getFullUrl("/" + indexName + "/", false);
- HttpURLConnection conn = initializeConnection(fullUrl);
-
- try {
- conn.setRequestMethod("PUT");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
- }
-
+ HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
try {
attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
} catch (IOException e) {
logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
throw new DocumentStoreOperationException(e.getMessage());
}
- handleResponse(conn, result);
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger
- .info(SearchDbMsgs.CREATE_INDEX_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
- override, indexName);
+ OperationResult result = handleResponse(conn);
+ logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
return result;
}
// which can lead to hard-to-debug behaviour with queries down the road.
OperationResult indexExistsResult = checkIndexExistence(indexName);
if (!isSuccess(indexExistsResult)) {
- DocumentOperationResult opResult = new DocumentOperationResult();
- opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
String resultMsg = "Document Index '" + indexName + "' does not exist.";
- opResult.setResult(resultMsg);
- opResult.setFailureCause(resultMsg);
- return opResult;
+ return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
+ .result(resultMsg).failureCause(resultMsg).build();
}
}
}
}
- private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
- throws DocumentStoreOperationException {
- // check if the document already exists
- DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
-
- if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
- if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
- opResult.setFailureCause("A document with the same id already exists.");
- } else {
- opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
+ @Override
+ public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
+ boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
+ if (!allowImplicitIndexCreation) {
+ // Before we do anything, make sure that the specified index actually exists in the
+ // document store - we don't want to rely on ElasticSearch to fail the document
+ // create because it could be configured to implicitly create a non-existent index,
+ // which can lead to hard-to-debug behaviour with queries down the road.
+ OperationResult indexExistsResult = checkIndexExistence(indexName);
+ if (!isSuccess(indexExistsResult)) {
+ DocumentOperationResult opResult = new DocumentOperationResult();
+ opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
+ String resultMsg = "Document Index '" + indexName + "' does not exist.";
+ opResult.setResult(resultMsg);
+ opResult.setFailureCause(resultMsg);
+ return opResult;
}
- opResult.setResultCode(Status.CONFLICT.getStatusCode());
- return opResult;
}
- opResult = new DocumentOperationResult();
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
-
- // Grab the current time so we can use it to generate a metrics log.
MdcOverride override = getStartTime(new MdcOverride());
- String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
- HttpURLConnection conn = initializeConnection(fullUrl);
-
- try {
- conn.setRequestMethod("PUT");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
- }
+ final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
+ .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
+ HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
attachDocument(conn, document);
- logger.debug("Sending 'PUT' request to: " + conn.getURL());
-
- handleResponse(conn, opResult);
+ DocumentOperationResult opResult = getOperationResult(conn);
buildDocumentResult(opResult, indexName);
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger
- .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
- override, indexName);
+ logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
shutdownConnection(conn);
return opResult;
}
- private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
+ @Override
+ public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
throws DocumentStoreOperationException {
+ final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
+ .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
- DocumentOperationResult response = new DocumentOperationResult();
- // Initialize operation result with a failure codes / fault string
- response.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
-
- // Grab the current time so we can use it to generate a metrics log.
MdcOverride override = getStartTime(new MdcOverride());
- String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
- HttpURLConnection conn = initializeConnection(fullUrl);
+ HttpURLConnection conn = createConnection(url, HttpMethod.DELETE);
- try {
- conn.setRequestMethod("POST");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
+ DocumentOperationResult opResult = getOperationResult(conn);
+ buildDocumentResult(opResult, indexName);
+ // supress the etag and url in response for delete as they are not required
+ if (opResult.getDocument() != null) {
+ opResult.getDocument().setEtag(null);
+ opResult.getDocument().setUrl(null);
}
- attachDocument(conn, document);
-
- logger.debug("Sending 'POST' request to: " + conn.getURL());
-
- handleResponse(conn, response);
- buildDocumentResult(response, indexName);
-
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger
- .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
- override, indexName);
+ logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
shutdownConnection(conn);
- return response;
- }
-
- private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
- throws DocumentStoreOperationException {
- conn.setRequestProperty("Connection", "Close");
- attachContent(conn, doc.getContentInJson());
+ return opResult;
}
- private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
+ @Override
+ public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
throws DocumentStoreOperationException {
- DocumentOperationResult opResult = new DocumentOperationResult();
-
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
-
- // Grab the current time so we can use it to generate a metrics log.
- MdcOverride override = getStartTime(new MdcOverride());
-
- String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
- HttpURLConnection conn = initializeConnection(fullUrl);
-
- try {
- conn.setRequestMethod("HEAD");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
- }
-
- logger.debug("Sending 'HEAD' request to: " + conn.getURL());
-
- int resultCode;
- try {
- resultCode = conn.getResponseCode();
- } catch (IOException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
+ final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId());
+ if (document.getVersion() != null) {
+ uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion());
}
- logger.debug(MSG_RESPONSE_CODE + resultCode);
+ MdcOverride override = getStartTime(new MdcOverride());
+ HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET");
- opResult.setResultCode(resultCode);
+ DocumentOperationResult opResult = getOperationResult(conn);
+ buildDocumentResult(opResult, indexName);
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger
- .info(SearchDbMsgs.GET_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
- override, indexName, docId);
+ logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
shutdownConnection(conn);
}
@Override
- public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
- boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
-
- if (!allowImplicitIndexCreation) {
- // Before we do anything, make sure that the specified index actually exists in the
- // document store - we don't want to rely on ElasticSearch to fail the document
- // create because it could be configured to implicitly create a non-existent index,
- // which can lead to hard-to-debug behaviour with queries down the road.
- OperationResult indexExistsResult = checkIndexExistence(indexName);
- if (!isSuccess(indexExistsResult)) {
- DocumentOperationResult opResult = new DocumentOperationResult();
- opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
- String resultMsg = "Document Index '" + indexName + "' does not exist.";
- opResult.setResult(resultMsg);
- opResult.setFailureCause(resultMsg);
- return opResult;
- }
- }
+ public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
+ final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString));
- DocumentOperationResult opResult = new DocumentOperationResult();
+ MdcOverride override = getStartTime(new MdcOverride());
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
+ HttpURLConnection conn = createConnection(url, "GET");
+ SearchOperationResult opResult = getSearchOperationResult(conn);
+ buildSearchResult(opResult, indexName);
- // Grab the current time so we can use it to generate a metrics log.
- MdcOverride override = getStartTime(new MdcOverride());
+ logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
- String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
- + document.getVersion(), false);
- HttpURLConnection conn = initializeConnection(fullUrl);
+ return opResult;
+ }
- try {
- conn.setRequestMethod("PUT");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
+ @Override
+ public SearchOperationResult searchWithPayload(String indexName, String query)
+ throws DocumentStoreOperationException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Querying index: " + indexName + " with query string: " + query);
}
+ final URL url = buildUrl(createUriBuilder(indexName, "_search"));
- attachDocument(conn, document);
+ MdcOverride override = getStartTime(new MdcOverride());
- logger.debug("Sending 'PUT' request to: " + conn.getURL());
+ HttpURLConnection conn = createConnection(url, HttpMethod.POST);
+ attachContent(conn, query);
+ logger.debug("Request body = Elasticsearch query = " + query);
- handleResponse(conn, opResult);
- buildDocumentResult(opResult, indexName);
+ SearchOperationResult opResult = getSearchOperationResult(conn);
+ buildSearchResult(opResult, indexName);
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
- override, indexName, document.getId());
+ logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
shutdownConnection(conn);
}
@Override
- public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
+ public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
throws DocumentStoreOperationException {
- DocumentOperationResult opResult = new DocumentOperationResult();
-
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
+ }
- // Grab the current time so we can use it to generate a metrics log.
MdcOverride override = getStartTime(new MdcOverride());
+ HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST);
+ attachContent(conn, query);
- String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
- + document.getVersion(), false);
- HttpURLConnection conn = initializeConnection(fullUrl);
-
- try {
- conn.setRequestMethod("DELETE");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
- }
-
- logger.debug("\nSending 'DELETE' request to " + conn.getURL());
+ logger.debug("Request body = Elasticsearch query = " + query);
- handleResponse(conn, opResult);
- buildDocumentResult(opResult, indexName);
- // supress the etag and url in response for delete as they are not required
- if (opResult.getDocument() != null) {
- opResult.getDocument().setEtag(null);
- opResult.getDocument().setUrl(null);
- }
+ SearchOperationResult opResult = getSearchOperationResult(conn);
+ buildSuggestResult(opResult, indexName);
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
- override, indexName, document.getId());
+ logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
shutdownConnection(conn);
}
@Override
- public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
- throws DocumentStoreOperationException {
- DocumentOperationResult opResult = new DocumentOperationResult();
+ public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
+ if (logger.isDebugEnabled()) {
+ StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
+ for (BulkRequest request : requests) {
+ dbgString.append("[").append(request).append("] ");
+ }
+
+ logger.debug(dbgString.toString());
+ }
- // Grab the current time so we can use it to generate a metrics log.
MdcOverride override = getStartTime(new MdcOverride());
- String fullUrl = null;
- if (document.getVersion() == null) {
- fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
- } else {
- fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
- + document.getVersion(), false);
+ // Parse the supplied set of operations.
+ // Iterate over the list of operations which we were provided and
+ // translate them into a format that ElasticSearh understands.
+ int opCount = 0;
+ StringBuilder esOperationSet = new StringBuilder(128);
+ List<ElasticSearchResultItem> rejected = new ArrayList<>();
+ for (BulkRequest request : requests) {
+
+ // Convert the request to the syntax ElasticSearch likes.
+ if (buildEsOperation(request, esOperationSet, rejected)) {
+ opCount++;
+ }
}
- HttpURLConnection conn = initializeConnection(fullUrl);
- logger.debug("\nSending 'GET' request to: " + conn.getURL());
+ ElasticSearchBulkOperationResult opResult = null;
+ if (opCount > 0) {
+ HttpURLConnection conn;
+ try {
+ conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection();
+ conn.setRequestMethod(HttpMethod.PUT);
+ conn.setDoOutput(true);
+ conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
+ conn.setRequestProperty("Connection", "Close");
- handleResponse(conn, opResult);
- buildDocumentResult(opResult, indexName);
+ } catch (IOException e) {
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
- override, indexName, document.getId());
+ logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.debug(Throwables.getStackTraceAsString(e));
+ }
- shutdownConnection(conn);
+ throw new DocumentStoreOperationException(
+ "Failed to open connection to document store. Cause: " + e.getMessage(), e);
+ }
- return opResult;
- }
+ StringBuilder bulkResult = new StringBuilder(128);
+ try {
+ // Create an output stream to write our request to.
+ OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
- @Override
- public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
- SearchOperationResult opResult = new SearchOperationResult();
+ if (logger.isDebugEnabled()) {
+ logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
+ logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
+ }
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
+ // Write the resulting request string to our output stream. (this sends the request to ES?)
+ out.write(esOperationSet.toString());
+ out.close();
- String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
+ // Open an input stream on our connection in order to read back the results.
+ InputStream is = conn.getInputStream();
+ InputStreamReader inputstreamreader = new InputStreamReader(is);
+ BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
- // Grab the current time so we can use it to generate a metrics log.
- MdcOverride override = getStartTime(new MdcOverride());
+ // Read the contents of the input stream into our result string...
+ String esResponseString = null;
- HttpURLConnection conn = initializeConnection(fullUrl);
+ while ((esResponseString = bufferedreader.readLine()) != null) {
+ bulkResult.append(esResponseString).append("\n");
+ }
- try {
- conn.setRequestMethod("GET");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
- }
+ } catch (IOException e) {
- logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
+ logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
+ if (logger.isDebugEnabled()) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ logger.debug(sw.toString());
+ }
- handleResponse(conn, opResult);
- buildSearchResult(opResult, indexName);
+ throw new DocumentStoreOperationException(
+ "Failure interacting with document store. Cause: " + e.getMessage(), e);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
+ }
- metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
- override, indexName, queryString);
+ // ...and marshal the resulting string into a Java object.
+ try {
+ opResult = marshallEsBulkResult(bulkResult.toString());
- return opResult;
- }
+ } catch (IOException e) {
+ logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.debug(Throwables.getStackTraceAsString(e));
+ }
- @Override
- public SearchOperationResult searchWithPayload(String indexName, String query)
- throws DocumentStoreOperationException {
- SearchOperationResult opResult = new SearchOperationResult();
+ throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
+ e);
+ }
+ }
- if (logger.isDebugEnabled()) {
- logger.debug("Querying index: " + indexName + " with query string: " + query);
+ OperationResult result = new OperationResultBuilder() //
+ .resultCode(HttpStatus.MULTI_STATUS_207) //
+ .result(buildGenericBulkResultSet(opResult, rejected)) //
+ .build();
+
+ // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
+ // it.
+ String resultStringForMetricsLog = result.getResult();
+ if (isSuccess(result)) {
+ resultStringForMetricsLog =
+ resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
}
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
+ metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
+ new LogFields() //
+ .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
+ .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
+ override);
+
+ return result;
+ }
+
+
+ /**
+ * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
+ *
+ * @param indexName - The index to look for.
+ * @return - An operation result indicating the success or failure of the check.
+ * @throws DocumentStoreOperationException
+ */
+ private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
+ MdcOverride override = getStartTime(new MdcOverride());
+
+ HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD);
+ int resultCode;
+ try {
+ resultCode = conn.getResponseCode();
+ } catch (IOException ex) {
+ shutdownConnection(conn);
+ throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
+ }
+ logger.debug(MSG_RESPONSE_CODE + resultCode);
- String fullUrl = getFullUrl("/" + indexName + "/_search", false);
+ OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
+ logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
+ shutdownConnection(conn);
- // Grab the current time so we can use it to generate a metrics log.
- MdcOverride override = getStartTime(new MdcOverride());
+ return opResult;
+ }
- HttpURLConnection conn = initializeConnection(fullUrl);
+ private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
+ throws DocumentStoreOperationException {
+ // check if the document already exists
+ DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
- try {
- conn.setRequestMethod("POST");
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
+ if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
+ if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
+ opResult.setFailureCause("A document with the same id already exists.");
+ } else {
+ opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
+ }
+ opResult.setResultCode(Status.CONFLICT.getStatusCode());
+ return opResult;
}
- attachContent(conn, query);
+ final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()));
+ MdcOverride override = getStartTime(new MdcOverride());
- logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
- logger.debug("Request body = Elasticsearch query = " + query);
+ HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
+ attachDocument(conn, document);
- handleResponse(conn, opResult);
- buildSearchResult(opResult, indexName);
+ opResult = getOperationResult(conn);
+ buildDocumentResult(opResult, indexName);
- metricsLogger
- .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
- override, indexName, query);
+ logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
shutdownConnection(conn);
return opResult;
}
-
- @Override
- public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
+ private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
throws DocumentStoreOperationException {
+ final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE));
- SearchOperationResult opResult = new SearchOperationResult();
+ MdcOverride override = getStartTime(new MdcOverride());
+ HttpURLConnection conn = createConnection(url, HttpMethod.POST);
+ attachDocument(conn, document);
- if (logger.isDebugEnabled()) {
- logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
- }
+ DocumentOperationResult response = getOperationResult(conn);
+ buildDocumentResult(response, indexName);
- // Initialize operation result with a failure codes / fault string
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
- opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
+ logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
- String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
+ shutdownConnection(conn);
- // Grab the current time so we can use it to generate a metrics log.
- MdcOverride override = getStartTime(new MdcOverride());
+ return response;
+ }
- HttpURLConnection conn = initializeConnection(fullUrl);
+ private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
+ throws DocumentStoreOperationException {
+ conn.setRequestProperty("Connection", "Close");
+ attachContent(conn, doc.getContentInJson());
+ }
+ private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
+ throws DocumentStoreOperationException {
+ MdcOverride override = getStartTime(new MdcOverride());
+ HttpURLConnection conn =
+ createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD);
+ int resultCode;
try {
- conn.setRequestMethod("POST");
- } catch (ProtocolException e) {
+ resultCode = conn.getResponseCode();
+ } catch (IOException ex) {
shutdownConnection(conn);
- throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
+ throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
}
- attachContent(conn, query);
-
- logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
- logger.debug("Request body = Elasticsearch query = " + query);
-
- handleResponse(conn, opResult);
- buildSuggestResult(opResult, indexName);
+ logger.debug(MSG_RESPONSE_CODE + resultCode);
- metricsLogger
- .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
- override, indexName, query);
+ DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
+ .useDefaults().resultCode(resultCode).build();
+ logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
shutdownConnection(conn);
return opResult;
}
}
- private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
- URL url = null;
+ private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException {
HttpURLConnection conn = null;
-
- try {
- url = new URL(fullUrl);
- } catch (MalformedURLException e) {
- throw new DocumentStoreOperationException("Error building a URL with " + url, e);
- }
-
try {
conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
return conn;
}
- private void handleResponse(HttpURLConnection conn, OperationResult opResult)
+ private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
+ return handleResponse(conn, new OperationResultBuilder().useDefaults());
+ }
+
+ private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
throws DocumentStoreOperationException {
int resultCode;
try {
resultCode = conn.getResponseCode();
- } catch (IOException e) {
+ } catch (IOException ex) {
shutdownConnection(conn);
- throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
+ throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
}
logger.debug(MSG_RESPONSE_CODE + resultCode);
InputStream inputStream = null;
- if (!isSuccessCode(resultCode)) {
+ if (!ApiUtils.isSuccessStatusCode(resultCode)) {
inputStream = conn.getErrorStream();
} else {
try {
}
if (resultCode == Status.CONFLICT.getStatusCode()) {
- opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
+ rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
} else {
- opResult.setResultCode(resultCode);
+ rb.resultCode(resultCode);
}
if (logger.isDebugEnabled()) {
logger.debug("Raw result string from ElasticSearch = " + result.toString());
}
- opResult.setResult(result.toString());
- opResult.setResultVersion(extractVersion(result.toString()));
+ rb.result(result.toString());
+ rb.resultVersion(extractVersion(result.toString()));
+ return rb.build();
}
private String extractVersion(String result) {
// Grab the current time...
long startTimeInMs = System.currentTimeMillis();
- // ...and add it as an attribute to the supplied MDC Override
- // object.
+ // ...and add it as an attribute to the supplied MDC Override object.
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
}
private boolean isSuccess(OperationResult result) {
- return isSuccessCode(result.getResultCode());
+ return ApiUtils.isSuccessStatusCode(result.getResultCode());
}
- private boolean isSuccessCode(int statusCode) {
- return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
+ private UriBuilder createUriBuilder(String path, String... paths) {
+ UriBuilder builder = UriBuilder.fromPath(path);
+ for (String other : paths) {
+ builder.path(other);
+ }
+ builder.host(config.getIpAddress());
+ String port = Optional.ofNullable(config.getHttpPort()).orElse("0");
+ builder.port(Integer.valueOf(port));
+ builder.scheme("http");
+ return builder;
}
- @Override
- public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
-
- if (logger.isDebugEnabled()) {
- StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
-
- for (BulkRequest request : requests) {
- dbgString.append("[").append(request).append("] ");
- }
-
- logger.debug(dbgString.toString());
+ private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException {
+ try {
+ return builder.build().toURL();
+ } catch (MalformedURLException e) {
+ logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage());
+ throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e);
}
+ }
- // Grab the current time so we can use it to generate a metrics log.
- MdcOverride override = getStartTime(new MdcOverride());
-
- // Parse the supplied set of operations.
- // Iterate over the list of operations which we were provided and
- // translate them into a format that ElasticSearh understands.
- int opCount = 0;
- StringBuilder esOperationSet = new StringBuilder(128);
- List<ElasticSearchResultItem> rejected = new ArrayList<>();
- for (BulkRequest request : requests) {
-
- // Convert the request to the syntax ElasticSearch likes.
- if (buildEsOperation(request, esOperationSet, rejected)) {
- opCount++;
- }
+ private HttpURLConnection createConnection(final URL url, final String method)
+ throws DocumentStoreOperationException {
+ HttpURLConnection conn = initializeConnection(url);
+ try {
+ logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL());
+ conn.setRequestMethod(method);
+ } catch (ProtocolException e) {
+ shutdownConnection(conn);
+ throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method);
}
+ return conn;
+ }
- ElasticSearchBulkOperationResult opResult = null;
- if (opCount > 0) {
-
- // Open an HTTP connection to the ElasticSearch back end.
- String fullUrl = getFullUrl("/_bulk", false);
- URL url;
- HttpURLConnection conn;
- try {
-
- url = new URL(fullUrl);
- conn = (HttpURLConnection) url.openConnection();
- conn.setRequestMethod("PUT");
- conn.setDoOutput(true);
- conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
- conn.setRequestProperty("Connection", "Close");
-
- } catch (IOException e) {
-
- logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
- if (logger.isDebugEnabled()) {
- logger.debug(Throwables.getStackTraceAsString(e));
- }
-
- throw new DocumentStoreOperationException(
- "Failed to open connection to document store. Cause: " + e.getMessage(), e);
- }
-
- StringBuilder bulkResult = new StringBuilder(128);
- try {
- // Create an output stream to write our request to.
- OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
-
- if (logger.isDebugEnabled()) {
- logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
- logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
- }
-
- // Write the resulting request string to our output stream. (this sends the request to ES?)
- out.write(esOperationSet.toString());
- out.close();
-
- // Open an input stream on our connection in order to read back the results.
- InputStream is = conn.getInputStream();
- InputStreamReader inputstreamreader = new InputStreamReader(is);
- BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
-
- // Read the contents of the input stream into our result string...
- String esResponseString = null;
+ private OperationResult checkConnection() throws IOException, DocumentStoreOperationException {
+ HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET);
+ int resultCode = conn.getResponseCode();
+ logger.debug("getClusterHealth() response Code : " + resultCode);
+ shutdownConnection(conn);
+ return new OperationResultBuilder().resultCode(resultCode).build();
+ }
- while ((esResponseString = bufferedreader.readLine()) != null) {
- bulkResult.append(esResponseString).append("\n");
- }
+ private void shutdownConnection(HttpURLConnection connection) {
+ if (connection == null) {
+ return;
+ }
- } catch (IOException e) {
+ final String methodName = "shutdownConnection";
+ InputStream inputstream = null;
+ OutputStream outputstream = null;
- logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
- if (logger.isDebugEnabled()) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- logger.debug(sw.toString());
+ try {
+ inputstream = connection.getInputStream();
+ } catch (IOException e) {
+ logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
+ } finally {
+ if (inputstream != null) {
+ try {
+ inputstream.close();
+ } catch (IOException e) {
+ logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
}
-
- throw new DocumentStoreOperationException(
- "Failure interacting with document store. Cause: " + e.getMessage(), e);
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
}
+ }
- // ...and marshal the resulting string into a Java object.
- try {
- opResult = marshallEsBulkResult(bulkResult.toString());
-
- } catch (IOException e) {
-
- logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
- if (logger.isDebugEnabled()) {
- logger.debug(Throwables.getStackTraceAsString(e));
+ try {
+ outputstream = connection.getOutputStream();
+ } catch (IOException e) {
+ logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
+ } finally {
+ if (outputstream != null) {
+ try {
+ outputstream.close();
+ } catch (IOException e) {
+ logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
}
-
- throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
- e);
}
}
- // Finally, build the operation result and return it to the caller.
- OperationResult result = new OperationResult();
- result.setResultCode(207);
- result.setResult(buildGenericBulkResultSet(opResult, rejected));
-
- // In the success case we don't want the entire result string to be
- // dumped into the metrics log, so concatenate it.
- String resultStringForMetricsLog = result.getResult();
- if (isSuccess(result)) {
- resultStringForMetricsLog =
- resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
- }
-
- metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
- override);
-
- return result;
+ connection.disconnect();
}
-
/**
* This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
*
// Increment the operation counts.
totalOps++;
- if (isSuccessCode(item.operationStatus().getStatus())) {
+ if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) {
totalSuccess++;
} else {
totalFails++;
+ "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
}
-
- /**
- * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
- *
- * @param indexName - The index to look for.
- * @return - An operation result indicating the success or failure of the check.
- * @throws DocumentStoreOperationException
- */
- public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
-
- // Initialize operation result with a failure codes / fault string
- OperationResult opResult = new OperationResult();
- opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
-
- // Grab the current time so we can use it to generate a metrics log.
- MdcOverride override = getStartTime(new MdcOverride());
-
- String fullUrl = getFullUrl("/" + indexName, false);
- HttpURLConnection conn = initializeConnection(fullUrl);
-
- try {
- conn.setRequestMethod("HEAD");
-
- } catch (ProtocolException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
- }
-
- logger.debug("Sending 'HEAD' request to: " + conn.getURL());
-
- int resultCode;
- try {
- resultCode = conn.getResponseCode();
- } catch (IOException e) {
- shutdownConnection(conn);
- throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
- }
- logger.debug(MSG_RESPONSE_CODE + resultCode);
-
- opResult.setResultCode(resultCode);
-
- // Generate a metrics log so we can track how long the operation took.
- metricsLogger
- .info(SearchDbMsgs.CHECK_INDEX_TIME,
- new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
- .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
- override, indexName);
-
- shutdownConnection(conn);
-
- return opResult;
+ private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
+ return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
}
+ private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
+ throws DocumentStoreOperationException {
+ return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
+ }
private void buildDocumentResult(DocumentOperationResult result, String index)
throws DocumentStoreOperationException {
}
private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
-
JSONParser parser = new JSONParser();
JSONObject root;
} catch (Exception e) {
throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
}
-
}
private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
}
}
+
+ /**
+ * Record the timing of the operation in the metrics log.
+ *
+ */
+ private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
+ String... args) {
+ metricsLogger.info(message,
+ new LogFields() //
+ .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
+ .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),
+ override, args);
+ }
}