2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23 import com.att.aft.dme2.internal.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import edu.emory.mathcs.backport.java.util.Arrays;
30 import org.json.simple.JSONArray;
31 import org.json.simple.JSONObject;
32 import org.json.simple.parser.JSONParser;
33 import org.json.simple.parser.ParseException;
34 import org.onap.aai.sa.rest.AnalysisConfiguration;
35 import org.onap.aai.sa.rest.ApiUtils;
36 import org.onap.aai.sa.rest.BulkRequest;
37 import org.onap.aai.sa.rest.BulkRequest.OperationType;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
39 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
41 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
42 import org.onap.aai.sa.searchdbabstraction.entity.Document;
43 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
48 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
49 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
50 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
52 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
53 import org.onap.aai.cl.api.LogFields;
54 import org.onap.aai.cl.api.LogLine;
55 import org.onap.aai.cl.api.Logger;
56 import org.onap.aai.cl.eelf.LoggerFactory;
57 import org.onap.aai.cl.mdc.MdcContext;
58 import org.onap.aai.cl.mdc.MdcOverride;
59 import org.onap.aai.sa.rest.DocumentSchema;
61 import java.io.BufferedReader;
63 import java.io.FileInputStream;
64 import java.io.IOException;
65 import java.io.InputStream;
66 import java.io.InputStreamReader;
67 import java.io.OutputStream;
68 import java.io.OutputStreamWriter;
69 import java.io.PrintWriter;
70 import java.io.StringWriter;
71 import java.net.HttpURLConnection;
72 import java.net.MalformedURLException;
73 import java.net.ProtocolException;
75 import java.text.SimpleDateFormat;
76 import java.util.ArrayList;
77 import java.util.List;
78 import java.util.Properties;
79 import java.util.concurrent.atomic.AtomicBoolean;
80 import javax.ws.rs.core.Response.Status;
84 * This class has the Elasticsearch implementation of the
85 * DB operations defined in DocumentStoreInterface.
87 public class ElasticSearchHttpController implements DocumentStoreInterface {
89 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
90 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
91 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
92 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
93 private static final String BULK_IMPORT_INDEX_TEMPLATE =
94 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
95 private static final String BULK_DELETE_TEMPLATE =
96 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
98 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
99 "Internal Error: ElasticSearch operation fault occurred";
100 private static final Logger logger = LoggerFactory.getInstance()
101 .getLogger(ElasticSearchHttpController.class.getName());
102 private static final Logger metricsLogger = LoggerFactory.getInstance()
103 .getMetricsLogger(ElasticSearchHttpController.class.getName());
104 private final ElasticSearchConfig config;
106 private static final String DEFAULT_TYPE = "default";
108 private static ElasticSearchHttpController instance = null;
110 protected AnalysisConfiguration analysisConfig;
112 public static ElasticSearchHttpController getInstance() {
114 synchronized (ElasticSearchHttpController.class) {
116 if (instance == null) {
118 Properties properties = new Properties();
119 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
121 properties.load(new FileInputStream(file));
122 } catch (Exception e) {
123 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
124 "ElasticSearchHTTPController.getInstance",
125 e.getLocalizedMessage());
128 ElasticSearchConfig config = new ElasticSearchConfig(properties);
129 instance = new ElasticSearchHttpController(config);
136 public ElasticSearchHttpController(ElasticSearchConfig config) {
137 this.config = config;
138 analysisConfig = new AnalysisConfiguration();
141 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
143 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
144 } catch (Exception e) {
145 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
146 getFullUrl("", false), e.getMessage());
151 public AnalysisConfiguration getAnalysisConfig() {
152 return analysisConfig;
156 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
158 OperationResult result = new OperationResult();
159 result.setResultCode(500);
163 // Submit the request to ElasticSearch to create the index using a
164 // default document type.
165 result = createTable(index,
167 analysisConfig.getEsIndexSettings(),
168 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
170 // ElasticSearch will return us a 200 code on success when we
171 // want to report a 201, so translate the result here.
172 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
173 if (isSuccess(result)) {
174 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
175 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
178 } catch (DocumentStoreOperationException e) {
180 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
187 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
188 OperationResult result = new OperationResult();
189 result.setResultCode(500);
192 result = createTable(index, dynamicSchema);
194 // ElasticSearch will return us a 200 code on success when we
195 // want to report a 201, so translate the result here.
196 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
197 if (isSuccess(result)) {
198 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
200 } catch (DocumentStoreOperationException e) {
201 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
208 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
210 //Initialize operation result with a failure codes / fault string
211 OperationResult opResult = new OperationResult();
212 opResult.setResultCode(500);
213 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
215 // Grab the current time so we can use it to generate a metrics log.
216 MdcOverride override = getStartTime(new MdcOverride());
218 String fullUrl = getFullUrl("/" + indexName + "/", false);
219 HttpURLConnection conn = initializeConnection(fullUrl);
221 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
224 conn.setRequestMethod("DELETE");
225 } catch (ProtocolException e) {
226 shutdownConnection(conn);
227 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
230 handleResponse(conn, opResult);
232 // Generate a metrics log so we can track how long the operation took.
233 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
235 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
236 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
240 shutdownConnection(conn);
246 private OperationResult checkConnection() throws Exception {
248 String fullUrl = getFullUrl("/_cluster/health", false);
250 HttpURLConnection conn = null;
252 url = new URL(fullUrl);
253 conn = (HttpURLConnection) url.openConnection();
254 conn.setRequestMethod("GET");
255 conn.setDoOutput(true);
256 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
258 int resultCode = conn.getResponseCode();
259 logger.debug("getClusterHealth() response Code : " + resultCode);
260 OperationResult opResult = new OperationResult();
261 opResult.setResultCode(resultCode);
263 shutdownConnection(conn);
268 private String getFullUrl(String resourceUrl, boolean isSecure) {
270 final String host = config.getIpAddress();
271 final String port = config.getHttpPort();
274 return String.format("https://%s:%s%s", host, port, resourceUrl);
276 return String.format("http://%s:%s%s", host, port, resourceUrl);
280 private void shutdownConnection(HttpURLConnection connection) {
281 if (connection == null) {
285 InputStream inputstream = null;
286 OutputStream outputstream = null;
289 inputstream = connection.getInputStream();
290 } catch (IOException e) {
291 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
293 if (inputstream != null) {
296 } catch (IOException e) {
297 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
298 e.getLocalizedMessage());
304 outputstream = connection.getOutputStream();
305 } catch (IOException e) {
306 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
308 if (outputstream != null) {
310 outputstream.close();
311 } catch (IOException e) {
312 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
313 e.getLocalizedMessage());
318 connection.disconnect();
322 protected OperationResult createTable(String indexName, String typeName,
323 String indexSettings, String indexMappings)
324 throws DocumentStoreOperationException {
326 if (indexSettings == null) {
327 logger.debug("No settings provided.");
330 if (indexMappings == null) {
331 logger.debug("No mappings provided.");
334 OperationResult opResult = new OperationResult();
336 // Initialize operation result with a failure codes / fault string
337 opResult.setResultCode(500);
338 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
340 // Grab the current time so we can use it to generate a metrics log.
341 MdcOverride override = getStartTime(new MdcOverride());
343 String fullUrl = getFullUrl("/" + indexName + "/", false);
344 HttpURLConnection conn = initializeConnection(fullUrl);
347 conn.setRequestMethod("PUT");
348 } catch (ProtocolException e) {
349 shutdownConnection(conn);
350 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
353 StringBuilder sb = new StringBuilder(128);
354 sb.append("{ \"settings\" : ");
355 sb.append(indexSettings);
358 sb.append("\"mappings\" : {");
359 sb.append("\"" + typeName + "\" :");
360 sb.append(indexMappings);
363 attachContent(conn, sb.toString());
365 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
366 logger.debug("Request content: " + sb.toString());
368 handleResponse(conn, opResult);
370 shutdownConnection(conn);
372 // Generate a metrics log so we can track how long the operation took.
373 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
375 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
376 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
384 * Will send the passed in JSON payload to Elasticsearch using the
385 * provided index name in an attempt to create the index.
387 * @param indexName - The name of the index to be created
388 * @param settingsAndMappings - The actual JSON object that will define the index
389 * @return - The operation result of writing into Elasticsearch
390 * @throws DocumentStoreOperationException
392 protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
393 OperationResult result = new OperationResult();
394 result.setResultCode(500);
395 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
397 // Grab the current time so we can use it to generate a metrics log.
398 MdcOverride override = getStartTime(new MdcOverride());
400 String fullUrl = getFullUrl("/" + indexName + "/", false);
401 HttpURLConnection conn = initializeConnection(fullUrl);
404 conn.setRequestMethod("PUT");
405 } catch (ProtocolException e) {
406 shutdownConnection(conn);
407 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
410 attachContent(conn, settingsAndMappings);
411 handleResponse(conn, result);
413 // Generate a metrics log so we can track how long the operation took.
414 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
416 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
417 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
425 public DocumentOperationResult createDocument(String indexName,
426 DocumentStoreDataEntity document,
427 boolean allowImplicitIndexCreation)
428 throws DocumentStoreOperationException {
430 if(!allowImplicitIndexCreation) {
432 // Before we do anything, make sure that the specified index actually exists in the
433 // document store - we don't want to rely on ElasticSearch to fail the document
434 // create because it could be configured to implicitly create a non-existent index,
435 // which can lead to hard-to-debug behaviour with queries down the road.
436 OperationResult indexExistsResult = checkIndexExistence(indexName);
437 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
439 DocumentOperationResult opResult = new DocumentOperationResult();
440 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
441 opResult.setResult("Document Index '" + indexName + "' does not exist.");
442 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
447 if (document.getId() == null || document.getId().isEmpty()) {
448 return createDocumentWithoutId(indexName, document);
450 return createDocumentWithId(indexName, document);
454 private DocumentOperationResult createDocumentWithId(String indexName,
455 DocumentStoreDataEntity document)
456 throws DocumentStoreOperationException {
457 // check if the document already exists
458 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
461 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
462 if (opResult.getResultCode() == Status.OK.getStatusCode()) {
463 opResult.setFailureCause("A document with the same id already exists.");
465 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
467 opResult.setResultCode(Status.CONFLICT.getStatusCode());
471 opResult = new DocumentOperationResult();
472 // Initialize operation result with a failure codes / fault string
473 opResult.setResultCode(500);
474 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
476 // Grab the current time so we can use it to generate a metrics log.
477 MdcOverride override = getStartTime(new MdcOverride());
479 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
480 + "/" + document.getId(), false);
481 HttpURLConnection conn = initializeConnection(fullUrl);
484 conn.setRequestMethod("PUT");
485 } catch (ProtocolException e) {
486 shutdownConnection(conn);
487 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
490 attachDocument(conn, document);
492 logger.debug("Sending 'PUT' request to: " + conn.getURL());
494 handleResponse(conn, opResult);
495 buildDocumentResult(opResult, indexName);
497 // Generate a metrics log so we can track how long the operation took.
498 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
500 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
501 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
505 shutdownConnection(conn);
511 private DocumentOperationResult createDocumentWithoutId(String indexName,
512 DocumentStoreDataEntity document)
513 throws DocumentStoreOperationException {
515 DocumentOperationResult response = new DocumentOperationResult();
516 // Initialize operation result with a failure codes / fault string
517 response.setResultCode(500);
518 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
520 // Grab the current time so we can use it to generate a metrics log.
521 MdcOverride override = getStartTime(new MdcOverride());
523 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
524 HttpURLConnection conn = initializeConnection(fullUrl);
527 conn.setRequestMethod("POST");
528 } catch (ProtocolException e) {
529 shutdownConnection(conn);
530 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
533 attachDocument(conn, document);
535 logger.debug("Sending 'POST' request to: " + conn.getURL());
537 handleResponse(conn, response);
538 buildDocumentResult(response, indexName);
540 // Generate a metrics log so we can track how long the operation took.
541 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
543 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
544 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
548 shutdownConnection(conn);
553 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
554 throws DocumentStoreOperationException {
555 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
556 conn.setRequestProperty("Connection", "Close");
558 attachContent(conn, doc.getContentInJson());
561 private DocumentOperationResult checkDocumentExistence(String indexName,
563 throws DocumentStoreOperationException {
564 DocumentOperationResult opResult = new DocumentOperationResult();
566 // Initialize operation result with a failure codes / fault string
567 opResult.setResultCode(500);
569 // Grab the current time so we can use it to generate a metrics log.
570 MdcOverride override = getStartTime(new MdcOverride());
572 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
573 HttpURLConnection conn = initializeConnection(fullUrl);
576 conn.setRequestMethod("HEAD");
577 } catch (ProtocolException e) {
578 shutdownConnection(conn);
579 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
582 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
586 resultCode = conn.getResponseCode();
587 } catch (IOException e) {
588 shutdownConnection(conn);
589 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
592 logger.debug("Response Code : " + resultCode);
594 opResult.setResultCode(resultCode);
596 // Generate a metrics log so we can track how long the operation took.
597 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
599 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
600 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
605 shutdownConnection(conn);
611 public DocumentOperationResult updateDocument(String indexName,
612 DocumentStoreDataEntity document,
613 boolean allowImplicitIndexCreation)
614 throws DocumentStoreOperationException {
616 if(!allowImplicitIndexCreation) {
618 // Before we do anything, make sure that the specified index actually exists in the
619 // document store - we don't want to rely on ElasticSearch to fail the document
620 // create because it could be configured to implicitly create a non-existent index,
621 // which can lead to hard-to-debug behaviour with queries down the road.
622 OperationResult indexExistsResult = checkIndexExistence(indexName);
623 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
625 DocumentOperationResult opResult = new DocumentOperationResult();
626 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
627 opResult.setResult("Document Index '" + indexName + "' does not exist.");
628 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
633 DocumentOperationResult opResult = new DocumentOperationResult();
635 // Initialize operation result with a failure codes / fault string
636 opResult.setResultCode(500);
637 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
639 // Grab the current time so we can use it to generate a metrics log.
640 MdcOverride override = getStartTime(new MdcOverride());
642 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
643 + "?version=" + document.getVersion(), false);
644 HttpURLConnection conn = initializeConnection(fullUrl);
647 conn.setRequestMethod("PUT");
648 } catch (ProtocolException e) {
649 shutdownConnection(conn);
650 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
653 attachDocument(conn, document);
655 logger.debug("Sending 'PUT' request to: " + conn.getURL());
657 handleResponse(conn, opResult);
658 buildDocumentResult(opResult, indexName);
660 // Generate a metrics log so we can track how long the operation took.
661 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
663 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
664 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
669 shutdownConnection(conn);
675 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
676 throws DocumentStoreOperationException {
677 DocumentOperationResult opResult = new DocumentOperationResult();
679 // Initialize operation result with a failure codes / fault string
680 opResult.setResultCode(500);
681 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
683 // Grab the current time so we can use it to generate a metrics log.
684 MdcOverride override = getStartTime(new MdcOverride());
686 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
687 + "?version=" + document.getVersion(), false);
688 HttpURLConnection conn = initializeConnection(fullUrl);
691 conn.setRequestMethod("DELETE");
692 } catch (ProtocolException e) {
693 shutdownConnection(conn);
694 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
697 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
699 handleResponse(conn, opResult);
700 buildDocumentResult(opResult, indexName);
701 //supress the etag and url in response for delete as they are not required
702 if (opResult.getDocument() != null) {
703 opResult.getDocument().setEtag(null);
704 opResult.getDocument().setUrl(null);
707 // Generate a metrics log so we can track how long the operation took.
708 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
710 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
711 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
716 shutdownConnection(conn);
722 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
723 throws DocumentStoreOperationException {
724 DocumentOperationResult opResult = new DocumentOperationResult();
726 // Initialize operation result with a failure codes / fault string
727 opResult.setResultCode(500);
728 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
730 // Grab the current time so we can use it to generate a metrics log.
731 MdcOverride override = getStartTime(new MdcOverride());
733 String fullUrl = null;
734 if (document.getVersion() == null) {
735 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
737 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
738 + "?version=" + document.getVersion(), false);
740 HttpURLConnection conn = initializeConnection(fullUrl);
742 logger.debug("\nSending 'GET' request to: " + conn.getURL());
744 handleResponse(conn, opResult);
745 buildDocumentResult(opResult, indexName);
747 // Generate a metrics log so we can track how long the operation took.
748 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
750 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
751 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
756 shutdownConnection(conn);
761 public SearchOperationResult search(String indexName, String queryString)
762 throws DocumentStoreOperationException {
763 SearchOperationResult opResult = new SearchOperationResult();
765 // Initialize operation result with a failure codes / fault string
766 opResult.setResultCode(500);
767 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
769 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
771 // Grab the current time so we can use it to generate a metrics log.
772 MdcOverride override = getStartTime(new MdcOverride());
774 HttpURLConnection conn = initializeConnection(fullUrl);
777 conn.setRequestMethod("GET");
778 } catch (ProtocolException e) {
779 shutdownConnection(conn);
780 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
783 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
785 handleResponse(conn, opResult);
786 buildSearchResult(opResult, indexName);
789 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
791 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
792 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
800 public SearchOperationResult searchWithPayload(String indexName, String query)
801 throws DocumentStoreOperationException {
802 SearchOperationResult opResult = new SearchOperationResult();
804 if (logger.isDebugEnabled()) {
805 logger.debug("Querying index: " + indexName + " with query string: " + query);
808 // Initialize operation result with a failure codes / fault string
809 opResult.setResultCode(500);
810 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
812 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
814 // Grab the current time so we can use it to generate a metrics log.
815 MdcOverride override = getStartTime(new MdcOverride());
817 HttpURLConnection conn = initializeConnection(fullUrl);
820 conn.setRequestMethod("POST");
821 } catch (ProtocolException e) {
822 shutdownConnection(conn);
823 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
826 attachContent(conn, query);
828 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
829 logger.debug("Request body = Elasticsearch query = " + query);
831 handleResponse(conn, opResult);
832 buildSearchResult(opResult, indexName);
834 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
836 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
837 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
842 shutdownConnection(conn);
847 private void attachContent(HttpURLConnection conn, String content)
848 throws DocumentStoreOperationException {
849 OutputStream outputStream = null;
850 OutputStreamWriter out = null;
853 outputStream = conn.getOutputStream();
854 } catch (IOException e) {
855 shutdownConnection(conn);
856 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
859 out = new OutputStreamWriter(outputStream);
864 } catch (IOException e) {
865 shutdownConnection(conn);
866 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
870 private HttpURLConnection initializeConnection(String fullUrl)
871 throws DocumentStoreOperationException {
873 HttpURLConnection conn = null;
876 url = new URL(fullUrl);
877 } catch (MalformedURLException e) {
878 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
882 conn = (HttpURLConnection) url.openConnection();
883 conn.setDoOutput(true);
884 } catch (IOException e) {
885 shutdownConnection(conn);
886 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
892 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
893 throws DocumentStoreOperationException {
894 int resultCode = 200;
897 resultCode = conn.getResponseCode();
898 } catch (IOException e) {
899 shutdownConnection(conn);
900 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
903 logger.debug("Response Code : " + resultCode);
905 InputStream inputStream = null;
907 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
908 inputStream = conn.getErrorStream();
911 inputStream = conn.getInputStream();
912 } catch (IOException e) {
913 shutdownConnection(conn);
914 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
918 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
919 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
921 StringBuilder result = new StringBuilder(128);
922 String string = null;
925 while ((string = bufferedreader.readLine()) != null) {
926 result.append(string).append("\n");
928 } catch (IOException e) {
929 shutdownConnection(conn);
930 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
933 if (resultCode == Status.CONFLICT.getStatusCode()) {
934 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
936 opResult.setResultCode(resultCode);
938 if (logger.isDebugEnabled()) {
939 logger.debug("Raw result string from ElasticSearch = " + result.toString());
941 opResult.setResult(result.toString());
942 opResult.setResultVersion(extractVersion(result.toString()));
945 private String extractVersion(String result) throws DocumentStoreOperationException {
947 JSONParser parser = new JSONParser();
948 String version = null;
950 JSONObject root = (JSONObject) parser.parse(result);
951 if (root.get("_version") != null) {
952 version = root.get("_version").toString();
955 } catch (ParseException e) {
957 // Not all responses from ElasticSearch include a version, so
958 // if we don't get one back, just return an empty string rather
959 // than trigger a false failure.
966 * This convenience method gets the current system time and stores
967 * it in an attribute in the supplied {@link MdcOverride} object so
968 * that it can be used later by the metrics logger.
970 * @param override - The {@link MdcOverride} object to update.
971 * @return - The supplied {@link MdcOverride} object.
973 private MdcOverride getStartTime(MdcOverride override) {
975 // Grab the current time...
976 long startTimeInMs = System.currentTimeMillis();
978 // ...and add it as an attribute to the supplied MDC Override
980 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
981 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
983 // Return the MdcOverride object that we were passed.
984 // This looks odd, but it allows us to do stuff like:
986 // MdcOverride ov = getStartTime(new MdcOverride())
988 // which is quite handy, but also allows us to pass in an existing
989 // MdcOverride object which already has some attributes set.
993 private boolean isSuccess(OperationResult result) {
995 return isSuccessCode(result.getResultCode());
999 private boolean isSuccessCode(int statusCode) {
1000 return ((statusCode >= 200) && (statusCode < 300));
1005 public OperationResult performBulkOperations(BulkRequest[] requests)
1006 throws DocumentStoreOperationException {
1008 if (logger.isDebugEnabled()) {
1009 String dbgString = "ESController: performBulkOperations - Operations: ";
1011 for (BulkRequest request : requests) {
1012 dbgString += "[" + request.toString() + "] ";
1015 logger.debug(dbgString);
1018 // Grab the current time so we can use it to generate a metrics log.
1019 MdcOverride override = getStartTime(new MdcOverride());
1021 // Parse the supplied set of operations.
1022 // Iterate over the list of operations which we were provided and
1023 // translate them into a format that ElasticSearh understands.
1025 StringBuilder esOperationSet = new StringBuilder(128);
1026 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1027 for (BulkRequest request : requests) {
1029 // Convert the request to the syntax ElasticSearch likes.
1030 if (buildEsOperation(request, esOperationSet, rejected)) {
1035 ElasticSearchBulkOperationResult opResult = null;
1038 // Open an HTTP connection to the ElasticSearch back end.
1039 String fullUrl = getFullUrl("/_bulk", false);
1041 HttpURLConnection conn;
1044 url = new URL(fullUrl);
1045 conn = (HttpURLConnection) url.openConnection();
1046 conn.setRequestMethod("PUT");
1047 conn.setDoOutput(true);
1048 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1049 conn.setRequestProperty("Connection", "Close");
1051 } catch (IOException e) {
1053 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1054 if (logger.isDebugEnabled()) {
1055 logger.debug(Throwables.getStackTraceAsString(e));
1058 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
1059 + e.getMessage(), e);
1062 StringBuilder bulkResult = new StringBuilder(128);
1064 // Create an output stream to write our request to.
1065 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1068 if (logger.isDebugEnabled()) {
1069 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1070 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1074 // Write the resulting request string to our output stream. (this sends the request to ES?)
1075 out.write(esOperationSet.toString());
1078 // Open an input stream on our connection in order to read back the results.
1079 InputStream is = conn.getInputStream();
1080 InputStreamReader inputstreamreader = new InputStreamReader(is);
1081 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1083 // Read the contents of the input stream into our result string...
1084 String esResponseString = null;
1086 while ((esResponseString = bufferedreader.readLine()) != null) {
1087 bulkResult.append(esResponseString).append("\n");
1090 } catch (IOException e) {
1092 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1093 if (logger.isDebugEnabled()) {
1094 StringWriter sw = new StringWriter();
1095 e.printStackTrace(new PrintWriter(sw));
1096 logger.debug(sw.toString());
1099 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1100 + e.getMessage(), e);
1103 if (logger.isDebugEnabled()) {
1104 logger.debug("ESController: Received result string from ElasticSearch: = "
1105 + bulkResult.toString());
1108 // ...and marshal the resulting string into a Java object.
1110 opResult = marshallEsBulkResult(bulkResult.toString());
1112 } catch (IOException e) {
1114 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1115 if (logger.isDebugEnabled()) {
1116 logger.debug(Throwables.getStackTraceAsString(e));
1119 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1120 + e.getMessage(), e);
1124 // Finally, build the operation result and return it to the caller.
1125 OperationResult result = new OperationResult();
1126 result.setResultCode(207);
1127 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1129 // In the success case we don't want the entire result string to be
1130 // dumped into the metrics log, so concatenate it.
1131 String resultStringForMetricsLog = result.getResult();
1132 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1133 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1134 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1137 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1139 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1140 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1148 * This method converts a {@link BulkRequest} object into a json structure
1149 * which can be understood by ElasticSearch.
1151 * @param request - The request to be performed.
1152 * @param sb - The string builder to append the json data to
1153 * @throws DocumentStoreOperationException
1155 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1156 List<ElasticSearchResultItem> fails)
1157 throws DocumentStoreOperationException {
1159 boolean retVal = true;
1160 OperationResult indexExistsResult = null;
1162 // What kind of operation are we performing?
1163 switch (request.getOperationType()) {
1165 // Create a new document.
1168 // Make sure that we were supplied a document payload.
1169 if (request.getOperation().getDocument() == null) {
1171 fails.add(generateRejectionEntry(request.getOperationType(),
1172 "Missing document payload",
1176 request.getOperation().getMetaData().getUrl()));
1180 // Make sure that the supplied document URL is formatted
1182 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1183 fails.add(generateRejectionEntry(request.getOperationType(),
1184 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1188 request.getOperation().getMetaData().getUrl()));
1192 // Validate that the specified index actually exists before we
1193 // try to perform the create.
1194 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1196 fails.add(generateRejectionEntry(request.getOperationType(),
1197 "Specified resource does not exist: "
1198 + request.getOperation().getMetaData().getUrl(),
1202 request.getOperation().getMetaData().getUrl()));
1206 // If we were supplied an id for the new document, then
1207 // include it in the bulk operation to Elastic Search
1208 if (request.getId() == null) {
1210 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1214 // Otherwise, we just leave that parameter off and ElasticSearch
1215 // will generate one for us.
1217 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1224 // Append the document that we want to create.
1225 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1226 } catch (JsonProcessingException e) {
1227 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1232 // Update an existing document.
1235 // Make sure that we were supplied a document payload.
1236 if (request.getOperation().getDocument() == null) {
1238 fails.add(generateRejectionEntry(request.getOperationType(),
1239 "Missing document payload",
1243 request.getOperation().getMetaData().getUrl()));
1247 // Make sure that the supplied document URL is formatted
1249 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1250 fails.add(generateRejectionEntry(request.getOperationType(),
1251 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1255 request.getOperation().getMetaData().getUrl()));
1259 // Validate that the specified index actually exists before we
1260 // try to perform the update.
1261 if (!indexExists(request.getIndex())) {
1263 fails.add(generateRejectionEntry(request.getOperationType(),
1264 "Specified resource does not exist: "
1265 + request.getOperation().getMetaData().getUrl(),
1269 request.getOperation().getMetaData().getUrl()));
1273 // Validate that the document we are trying to update actually
1274 // exists before we try to perform the update.
1275 if (!documentExists(request.getIndex(), request.getId())) {
1277 fails.add(generateRejectionEntry(request.getOperationType(),
1278 "Specified resource does not exist: "
1279 + request.getOperation().getMetaData().getUrl(),
1283 request.getOperation().getMetaData().getUrl()));
1287 // It is mandatory that a version be supplied for an update operation,
1288 // so validate that now.
1289 if (request.getOperation().getMetaData().getEtag() == null) {
1291 fails.add(generateRejectionEntry(request.getOperationType(),
1292 "Missing mandatory ETag field",
1296 request.getOperation().getMetaData().getUrl()));
1300 // Generate the update request...
1301 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1305 request.getOperation().getMetaData().getEtag()));
1307 // ...and append the document that we want to update.
1309 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1310 } catch (JsonProcessingException e) {
1311 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1315 // Delete an existing document.
1318 // Make sure that the supplied document URL is formatted
1320 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1321 fails.add(generateRejectionEntry(request.getOperationType(),
1322 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1326 request.getOperation().getMetaData().getUrl()));
1330 // Validate that the specified index actually exists before we
1331 // try to perform the delete.
1332 if (!indexExists(request.getIndex())) {
1334 fails.add(generateRejectionEntry(request.getOperationType(),
1335 "Specified resource does not exist: "
1336 + request.getOperation().getMetaData().getUrl(),
1340 request.getOperation().getMetaData().getUrl()));
1344 // Validate that the document we are trying to update actually
1345 // exists before we try to perform the delete.
1346 if (!documentExists(request.getIndex(), request.getId())) {
1348 fails.add(generateRejectionEntry(request.getOperationType(),
1349 "Specified resource does not exist: "
1350 + request.getOperation().getMetaData().getUrl(),
1354 request.getOperation().getMetaData().getUrl()));
1358 // It is mandatory that a version be supplied for a delete operation,
1359 // so validate that now.
1360 if (request.getOperation().getMetaData().getEtag() == null) {
1362 fails.add(generateRejectionEntry(request.getOperationType(),
1363 "Missing mandatory ETag field",
1367 request.getOperation().getMetaData().getUrl()));
1371 // Generate the delete request.
1372 sb.append(String.format(BULK_DELETE_TEMPLATE,
1376 request.getOperation().getMetaData().getEtag()));
1384 private boolean indexExists(String index) throws DocumentStoreOperationException {
1386 OperationResult indexExistsResult = checkIndexExistence(index);
1388 return ((indexExistsResult.getResultCode() >= 200)
1389 && (indexExistsResult.getResultCode() < 300));
1392 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1394 OperationResult docExistsResult = checkDocumentExistence(index, id);
1396 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1400 * This method constructs a status entry for a bulk operation which has
1401 * been rejected before even sending it to the document store.
1403 * @param rejectReason - A message describing why the operation was rejected.
1404 * @param anId - The identifier associated with the document being
1406 * @param statusCode - An HTTP status code.
1407 * @return - A result set item.
1409 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1410 String rejectReason,
1414 String originalUrl) {
1416 ElasticSearchError err = new ElasticSearchError();
1417 err.setReason(rejectReason);
1419 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1422 op.setStatus(statusCode);
1424 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1426 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1430 rejectionResult.setCreate(op);
1433 rejectionResult.setIndex(op);
1436 rejectionResult.setDelete(op);
1441 return rejectionResult;
1446 * This method takes the json structure returned from ElasticSearch in
1447 * response to a bulk operations request and marshals it into a Java
1450 * @param jsonResult - The bulk operations response returned from
1452 * @return - The marshalled response.
1453 * @throws JsonParseException
1454 * @throws JsonMappingException
1455 * @throws IOException
1457 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1458 throws JsonParseException, JsonMappingException, IOException {
1460 if (jsonResult != null) {
1461 if (logger.isDebugEnabled()) {
1462 logger.debug("ESController: Marshalling ES result set from json: "
1463 + jsonResult.replaceAll("\n", ""));
1466 ObjectMapper mapper = new ObjectMapper();
1467 mapper.setSerializationInclusion(Include.NON_EMPTY);
1469 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1477 * This method takes the marshalled ElasticSearch bulk response and
1478 * converts it into a generic response payload.
1480 * @param esResult - ElasticSearch bulk operations response.
1481 * @return - A generic result set.
1483 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1484 List<ElasticSearchResultItem> rejectedOps) {
1487 int totalSuccess = 0;
1490 if (logger.isDebugEnabled()) {
1492 logger.debug("ESController: Build generic result set. ES Results: "
1493 + ((esResult != null) ? esResult.toString() : "[]")
1494 + " Rejected Ops: " + rejectedOps.toString());
1497 // Build a combined list of result items from the results returned
1498 // from ElasticSearch and the list of operations that we rejected
1499 // without sending to ElasticSearch.
1500 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1501 if (esResult != null) {
1502 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1504 combinedResults.addAll(rejectedOps);
1506 // Iterate over the individual results in the resulting result set.
1507 StringBuilder resultsBuilder = new StringBuilder();
1508 AtomicBoolean firstItem = new AtomicBoolean(true);
1509 for (ElasticSearchResultItem item : combinedResults) {
1511 // Increment the operation counts.
1513 if (isSuccessCode(item.operationStatus().getStatus())) {
1519 // Prepend a comma to our response string unless this it the
1520 // first result in the set.
1521 if (!firstItem.compareAndSet(true, false)) {
1522 resultsBuilder.append(", ");
1525 // Append the current result as a generic json structure.
1526 resultsBuilder.append(item.toJson());
1529 // Now, build the result string and return it.
1530 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1531 + "\"total_success\": " + totalSuccess + ", "
1532 + "\"total_fails\": " + totalFails + ", "
1534 + resultsBuilder.toString()
1537 return responseBody;
1542 * This method queryies ElasticSearch to determine if the supplied
1543 * index is present in the document store.
1545 * @param indexName - The index to look for.
1546 * @return - An operation result indicating the success or failure of
1548 * @throws DocumentStoreOperationException
1550 public OperationResult checkIndexExistence(String indexName)
1551 throws DocumentStoreOperationException {
1553 // Initialize operation result with a failure codes / fault string
1554 OperationResult opResult = new OperationResult();
1555 opResult.setResultCode(500);
1557 // Grab the current time so we can use it to generate a metrics log.
1558 MdcOverride override = getStartTime(new MdcOverride());
1560 String fullUrl = getFullUrl("/" + indexName, false);
1561 HttpURLConnection conn = initializeConnection(fullUrl);
1564 conn.setRequestMethod("HEAD");
1566 } catch (ProtocolException e) {
1567 shutdownConnection(conn);
1568 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1571 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1575 resultCode = conn.getResponseCode();
1576 } catch (IOException e) {
1577 shutdownConnection(conn);
1578 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1580 logger.debug("Response Code : " + resultCode);
1582 opResult.setResultCode(resultCode);
1584 // Generate a metrics log so we can track how long the operation took.
1585 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1587 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1588 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1592 shutdownConnection(conn);
1598 private void buildDocumentResult(DocumentOperationResult result, String index)
1599 throws DocumentStoreOperationException {
1601 JSONParser parser = new JSONParser();
1604 root = (JSONObject) parser.parse(result.getResult());
1606 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1607 // Success response object
1608 Document doc = new Document();
1609 doc.setEtag(result.getResultVersion());
1610 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1612 doc.setContent((JSONObject) root.get("_source"));
1613 result.setDocument(doc);
1616 // Error response object
1617 JSONObject error = (JSONObject) root.get("error");
1618 if (error != null) {
1619 result.setError(new ErrorResult(error.get("type").toString(),
1620 error.get("reason").toString()));
1624 } catch (Exception e) {
1625 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1626 + result.getResult());
1632 private String buildDocumentResponseUrl(String index, String id) {
1633 return ApiUtils.buildDocumentUri(index, id);
1636 private void buildSearchResult(SearchOperationResult result, String index)
1637 throws DocumentStoreOperationException {
1639 JSONParser parser = new JSONParser();
1643 root = (JSONObject) parser.parse(result.getResult());
1644 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1645 JSONObject hits = (JSONObject) root.get("hits");
1646 JSONArray hitArray = (JSONArray) hits.get("hits");
1647 SearchHits searchHits = new SearchHits();
1648 searchHits.setTotalHits(hits.get("total").toString());
1649 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1651 for (int i = 0; i < hitArray.size(); i++) {
1652 JSONObject hit = (JSONObject) hitArray.get(i);
1653 SearchHit searchHit = new SearchHit();
1654 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1655 Document doc = new Document();
1656 if (hit.get("_version") != null) {
1657 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1660 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1661 ? hit.get("_id").toString() : ""));
1662 doc.setContent((JSONObject) hit.get("_source"));
1663 searchHit.setDocument(doc);
1664 searchHitArray.add(searchHit);
1666 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1667 result.setSearchResult(searchHits);
1669 JSONObject aggregations = (JSONObject) root.get("aggregations");
1670 if (aggregations != null) {
1671 AggregationResult[] aggResults =
1672 AggregationParsingUtil.parseAggregationResults(aggregations);
1673 AggregationResults aggs = new AggregationResults();
1674 aggs.setAggregations(aggResults);
1675 result.setAggregationResult(aggs);
1680 JSONObject error = (JSONObject) root.get("error");
1681 if (error != null) {
1682 result.setError(new ErrorResult(error.get("type").toString(),
1683 error.get("reason").toString()));
1686 } catch (Exception e) {
1687 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1688 + result.getResult());