+ SearchOperationResult opResult = getSearchOperationResult(conn);
+ buildSearchResult(opResult, indexName);
+
+ logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
+
+ shutdownConnection(conn);
+
+ return opResult;
+ }
+
+
+ @Override
+ public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
+ throws DocumentStoreOperationException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
+ }
+
+ String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
+
+ // Grab the current time so we can use it to generate a metrics log.
+ MdcOverride override = getStartTime(new MdcOverride());
+
+ HttpURLConnection conn = initializeConnection(fullUrl);
+
+ try {
+ conn.setRequestMethod("POST");
+ } catch (ProtocolException e) {
+ shutdownConnection(conn);
+ throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
+ }
+
+ attachContent(conn, query);
+
+ logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
+ logger.debug("Request body = Elasticsearch query = " + query);
+
+ SearchOperationResult opResult = getSearchOperationResult(conn);
+ buildSuggestResult(opResult, indexName);
+
+ logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
+
+ shutdownConnection(conn);
+
+ return opResult;
+ }
+
+ @Override
+ public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
+ if (logger.isDebugEnabled()) {
+ StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
+
+ for (BulkRequest request : requests) {
+ dbgString.append("[").append(request).append("] ");
+ }
+
+ logger.debug(dbgString.toString());
+ }
+
+ // Grab the current time so we can use it to generate a metrics log.
+ MdcOverride override = getStartTime(new MdcOverride());
+
+ // Parse the supplied set of operations.
+ // Iterate over the list of operations which we were provided and
+ // translate them into a format that ElasticSearh understands.
+ int opCount = 0;
+ StringBuilder esOperationSet = new StringBuilder(128);
+ List<ElasticSearchResultItem> rejected = new ArrayList<>();
+ for (BulkRequest request : requests) {
+
+ // Convert the request to the syntax ElasticSearch likes.
+ if (buildEsOperation(request, esOperationSet, rejected)) {
+ opCount++;
+ }
+ }
+
+ ElasticSearchBulkOperationResult opResult = null;
+ if (opCount > 0) {
+
+ // Open an HTTP connection to the ElasticSearch back end.
+ String fullUrl = getFullUrl("/_bulk", false);
+ URL url;
+ HttpURLConnection conn;
+ try {
+
+ url = new URL(fullUrl);
+ conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("PUT");
+ conn.setDoOutput(true);
+ conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
+ conn.setRequestProperty("Connection", "Close");
+
+ } catch (IOException e) {
+
+ logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.debug(Throwables.getStackTraceAsString(e));
+ }
+
+ throw new DocumentStoreOperationException(
+ "Failed to open connection to document store. Cause: " + e.getMessage(), e);
+ }
+
+ StringBuilder bulkResult = new StringBuilder(128);
+ try {
+ // Create an output stream to write our request to.
+ OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
+ logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
+ }
+
+ // Write the resulting request string to our output stream. (this sends the request to ES?)
+ out.write(esOperationSet.toString());
+ out.close();
+
+ // Open an input stream on our connection in order to read back the results.
+ InputStream is = conn.getInputStream();
+ InputStreamReader inputstreamreader = new InputStreamReader(is);
+ BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
+
+ // Read the contents of the input stream into our result string...
+ String esResponseString = null;
+
+ while ((esResponseString = bufferedreader.readLine()) != null) {
+ bulkResult.append(esResponseString).append("\n");
+ }
+
+ } catch (IOException e) {
+
+ logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
+ if (logger.isDebugEnabled()) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ logger.debug(sw.toString());
+ }
+
+ throw new DocumentStoreOperationException(
+ "Failure interacting with document store. Cause: " + e.getMessage(), e);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
+ }
+
+ // ...and marshal the resulting string into a Java object.
+ try {
+ opResult = marshallEsBulkResult(bulkResult.toString());
+
+ } catch (IOException e) {
+ logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.debug(Throwables.getStackTraceAsString(e));
+ }
+
+ throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
+ e);
+ }
+ }
+
+ OperationResult result = new OperationResultBuilder() //
+ .resultCode(HttpStatus.MULTI_STATUS_207) //
+ .result(buildGenericBulkResultSet(opResult, rejected)) //
+ .build();
+
+ // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
+ // it.
+ String resultStringForMetricsLog = result.getResult();
+ if (isSuccess(result)) {
+ resultStringForMetricsLog =
+ resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
+ }
+
+ metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
+ new LogFields() //
+ .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
+ .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
+ override);
+
+ return result;
+ }
+
+
+ /**
+ * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
+ *
+ * @param indexName - The index to look for.
+ * @return - An operation result indicating the success or failure of the check.
+ * @throws DocumentStoreOperationException
+ */
+ private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
+ // Grab the current time so we can use it to generate a metrics log.
+ MdcOverride override = getStartTime(new MdcOverride());
+
+ String fullUrl = getFullUrl("/" + indexName, false);
+ HttpURLConnection conn = initializeConnection(fullUrl);
+
+ try {
+ conn.setRequestMethod("HEAD");
+
+ } catch (ProtocolException e) {
+ shutdownConnection(conn);
+ throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
+ }
+
+ logger.debug("Sending 'HEAD' request to: " + conn.getURL());
+
+ int resultCode;
+ try {
+ resultCode = conn.getResponseCode();
+ } catch (IOException e) {
+ shutdownConnection(conn);
+ throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
+ }
+ logger.debug(MSG_RESPONSE_CODE + resultCode);
+
+ OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
+ logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
+ shutdownConnection(conn);
+
+ return opResult;
+ }
+
+ private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
+ throws DocumentStoreOperationException {
+ // check if the document already exists
+ DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
+
+ if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
+ if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
+ opResult.setFailureCause("A document with the same id already exists.");
+ } else {
+ opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
+ }
+ opResult.setResultCode(Status.CONFLICT.getStatusCode());
+ return opResult;
+ }
+
+ // Grab the current time so we can use it to generate a metrics log.
+ MdcOverride override = getStartTime(new MdcOverride());
+
+ String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
+ HttpURLConnection conn = initializeConnection(fullUrl);
+
+ try {
+ conn.setRequestMethod("PUT");
+ } catch (ProtocolException e) {
+ shutdownConnection(conn);
+ throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
+ }
+
+ attachDocument(conn, document);
+
+ logger.debug("Sending 'PUT' request to: " + conn.getURL());
+
+ opResult = getOperationResult(conn);
+ buildDocumentResult(opResult, indexName);
+
+ logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
+
+ shutdownConnection(conn);
+
+ return opResult;
+ }
+
+ private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
+ throws DocumentStoreOperationException {
+ // Grab the current time so we can use it to generate a metrics log.
+ MdcOverride override = getStartTime(new MdcOverride());
+
+ String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
+ HttpURLConnection conn = initializeConnection(fullUrl);
+
+ try {
+ conn.setRequestMethod("POST");
+ } catch (ProtocolException e) {
+ shutdownConnection(conn);
+ throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
+ }
+
+ attachDocument(conn, document);
+
+ logger.debug("Sending 'POST' request to: " + conn.getURL());
+
+ DocumentOperationResult response = getOperationResult(conn);
+ buildDocumentResult(response, indexName);