2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
25 import com.att.aft.dme2.internal.google.common.base.Throwables;
26 import com.fasterxml.jackson.annotation.JsonInclude.Include;
27 import com.fasterxml.jackson.core.JsonParseException;
28 import com.fasterxml.jackson.core.JsonProcessingException;
29 import com.fasterxml.jackson.databind.JsonMappingException;
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import edu.emory.mathcs.backport.java.util.Arrays;
32 import org.json.simple.JSONArray;
33 import org.json.simple.JSONObject;
34 import org.json.simple.parser.JSONParser;
35 import org.json.simple.parser.ParseException;
36 import org.onap.aai.sa.rest.AnalysisConfiguration;
37 import org.onap.aai.sa.rest.ApiUtils;
38 import org.onap.aai.sa.rest.BulkRequest;
39 import org.onap.aai.sa.rest.BulkRequest.OperationType;
40 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
41 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
42 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
44 import org.onap.aai.sa.searchdbabstraction.entity.Document;
45 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
46 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
47 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
48 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
49 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
50 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
51 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
52 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
53 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
54 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
55 import org.onap.aai.cl.api.LogFields;
56 import org.onap.aai.cl.api.LogLine;
57 import org.onap.aai.cl.api.Logger;
58 import org.onap.aai.cl.eelf.LoggerFactory;
59 import org.onap.aai.cl.mdc.MdcContext;
60 import org.onap.aai.cl.mdc.MdcOverride;
61 import org.onap.aai.sa.rest.DocumentSchema;
63 import java.io.BufferedReader;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.io.InputStreamReader;
69 import java.io.OutputStream;
70 import java.io.OutputStreamWriter;
71 import java.io.PrintWriter;
72 import java.io.StringWriter;
73 import java.net.HttpURLConnection;
74 import java.net.MalformedURLException;
75 import java.net.ProtocolException;
77 import java.text.SimpleDateFormat;
78 import java.util.ArrayList;
79 import java.util.List;
80 import java.util.Properties;
81 import java.util.concurrent.atomic.AtomicBoolean;
82 import javax.ws.rs.core.Response.Status;
86 * This class has the Elasticsearch implementation of the
87 * DB operations defined in DocumentStoreInterface.
89 public class ElasticSearchHttpController implements DocumentStoreInterface {
91 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
92 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
93 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
94 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
95 private static final String BULK_IMPORT_INDEX_TEMPLATE =
96 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
97 private static final String BULK_DELETE_TEMPLATE =
98 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
100 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
101 "Internal Error: ElasticSearch operation fault occurred";
102 private static final Logger logger = LoggerFactory.getInstance()
103 .getLogger(ElasticSearchHttpController.class.getName());
104 private static final Logger metricsLogger = LoggerFactory.getInstance()
105 .getMetricsLogger(ElasticSearchHttpController.class.getName());
106 private final ElasticSearchConfig config;
108 private static final String DEFAULT_TYPE = "default";
110 private static ElasticSearchHttpController instance = null;
112 protected AnalysisConfiguration analysisConfig;
114 public static ElasticSearchHttpController getInstance() {
116 synchronized (ElasticSearchHttpController.class) {
118 if (instance == null) {
120 Properties properties = new Properties();
121 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
123 properties.load(new FileInputStream(file));
124 } catch (Exception e) {
125 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
126 "ElasticSearchHTTPController.getInstance",
127 e.getLocalizedMessage());
130 ElasticSearchConfig config = new ElasticSearchConfig(properties);
131 instance = new ElasticSearchHttpController(config);
138 public ElasticSearchHttpController(ElasticSearchConfig config) {
139 this.config = config;
140 analysisConfig = new AnalysisConfiguration();
143 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
145 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
146 } catch (Exception e) {
147 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
148 getFullUrl("", false), e.getMessage());
153 public AnalysisConfiguration getAnalysisConfig() {
154 return analysisConfig;
158 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
160 OperationResult result = new OperationResult();
161 result.setResultCode(500);
165 // Submit the request to ElasticSearch to create the index using a
166 // default document type.
167 result = createTable(index,
169 analysisConfig.getEsIndexSettings(),
170 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
172 // ElasticSearch will return us a 200 code on success when we
173 // want to report a 201, so translate the result here.
174 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
175 if (isSuccess(result)) {
176 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
177 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
180 } catch (DocumentStoreOperationException e) {
182 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
190 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
192 //Initialize operation result with a failure codes / fault string
193 OperationResult opResult = new OperationResult();
194 opResult.setResultCode(500);
195 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
197 // Grab the current time so we can use it to generate a metrics log.
198 MdcOverride override = getStartTime(new MdcOverride());
200 String fullUrl = getFullUrl("/" + indexName + "/", false);
201 HttpURLConnection conn = initializeConnection(fullUrl);
203 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
206 conn.setRequestMethod("DELETE");
207 } catch (ProtocolException e) {
208 shutdownConnection(conn);
209 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
212 handleResponse(conn, opResult);
214 // Generate a metrics log so we can track how long the operation took.
215 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
217 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
218 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
222 shutdownConnection(conn);
228 private OperationResult checkConnection() throws Exception {
230 String fullUrl = getFullUrl("/_cluster/health", false);
232 HttpURLConnection conn = null;
234 url = new URL(fullUrl);
235 conn = (HttpURLConnection) url.openConnection();
236 conn.setRequestMethod("GET");
237 conn.setDoOutput(true);
238 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
240 int resultCode = conn.getResponseCode();
241 logger.debug("getClusterHealth() response Code : " + resultCode);
242 OperationResult opResult = new OperationResult();
243 opResult.setResultCode(resultCode);
245 shutdownConnection(conn);
250 private String getFullUrl(String resourceUrl, boolean isSecure) {
252 final String host = config.getIpAddress();
253 final String port = config.getHttpPort();
256 return String.format("https://%s:%s%s", host, port, resourceUrl);
258 return String.format("http://%s:%s%s", host, port, resourceUrl);
262 private void shutdownConnection(HttpURLConnection connection) {
263 if (connection == null) {
267 InputStream inputstream = null;
268 OutputStream outputstream = null;
271 inputstream = connection.getInputStream();
272 } catch (IOException e) {
273 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
275 if (inputstream != null) {
278 } catch (IOException e) {
279 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
280 e.getLocalizedMessage());
286 outputstream = connection.getOutputStream();
287 } catch (IOException e) {
288 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
290 if (outputstream != null) {
292 outputstream.close();
293 } catch (IOException e) {
294 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
295 e.getLocalizedMessage());
300 connection.disconnect();
304 protected OperationResult createTable(String indexName, String typeName,
305 String indexSettings, String indexMappings)
306 throws DocumentStoreOperationException {
308 if (indexSettings == null) {
309 logger.debug("No settings provided.");
312 if (indexMappings == null) {
313 logger.debug("No mappings provided.");
316 OperationResult opResult = new OperationResult();
318 // Initialize operation result with a failure codes / fault string
319 opResult.setResultCode(500);
320 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
322 // Grab the current time so we can use it to generate a metrics log.
323 MdcOverride override = getStartTime(new MdcOverride());
325 String fullUrl = getFullUrl("/" + indexName + "/", false);
326 HttpURLConnection conn = initializeConnection(fullUrl);
329 conn.setRequestMethod("PUT");
330 } catch (ProtocolException e) {
331 shutdownConnection(conn);
332 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
335 StringBuilder sb = new StringBuilder(128);
336 sb.append("{ \"settings\" : ");
337 sb.append(indexSettings);
340 sb.append("\"mappings\" : {");
341 sb.append("\"" + typeName + "\" :");
342 sb.append(indexMappings);
345 attachContent(conn, sb.toString());
347 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
348 logger.debug("Request content: " + sb.toString());
350 handleResponse(conn, opResult);
352 shutdownConnection(conn);
354 // Generate a metrics log so we can track how long the operation took.
355 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
357 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
358 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
366 public DocumentOperationResult createDocument(String indexName,
367 DocumentStoreDataEntity document,
368 boolean allowImplicitIndexCreation)
369 throws DocumentStoreOperationException {
371 if(!allowImplicitIndexCreation) {
373 // Before we do anything, make sure that the specified index actually exists in the
374 // document store - we don't want to rely on ElasticSearch to fail the document
375 // create because it could be configured to implicitly create a non-existent index,
376 // which can lead to hard-to-debug behaviour with queries down the road.
377 OperationResult indexExistsResult = checkIndexExistence(indexName);
378 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
380 DocumentOperationResult opResult = new DocumentOperationResult();
381 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
382 opResult.setResult("Document Index '" + indexName + "' does not exist.");
383 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
388 if (document.getId() == null || document.getId().isEmpty()) {
389 return createDocumentWithoutId(indexName, document);
391 return createDocumentWithId(indexName, document);
395 private DocumentOperationResult createDocumentWithId(String indexName,
396 DocumentStoreDataEntity document)
397 throws DocumentStoreOperationException {
398 // check if the document already exists
399 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
402 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
403 if (opResult.getResultCode() == Status.OK.getStatusCode()) {
404 opResult.setFailureCause("A document with the same id already exists.");
406 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
408 opResult.setResultCode(Status.CONFLICT.getStatusCode());
412 opResult = new DocumentOperationResult();
413 // Initialize operation result with a failure codes / fault string
414 opResult.setResultCode(500);
415 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
417 // Grab the current time so we can use it to generate a metrics log.
418 MdcOverride override = getStartTime(new MdcOverride());
420 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
421 + "/" + document.getId(), false);
422 HttpURLConnection conn = initializeConnection(fullUrl);
425 conn.setRequestMethod("PUT");
426 } catch (ProtocolException e) {
427 shutdownConnection(conn);
428 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
431 attachDocument(conn, document);
433 logger.debug("Sending 'PUT' request to: " + conn.getURL());
435 handleResponse(conn, opResult);
436 buildDocumentResult(opResult, indexName);
438 // Generate a metrics log so we can track how long the operation took.
439 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
441 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
442 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
446 shutdownConnection(conn);
452 private DocumentOperationResult createDocumentWithoutId(String indexName,
453 DocumentStoreDataEntity document)
454 throws DocumentStoreOperationException {
456 DocumentOperationResult response = new DocumentOperationResult();
457 // Initialize operation result with a failure codes / fault string
458 response.setResultCode(500);
459 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
461 // Grab the current time so we can use it to generate a metrics log.
462 MdcOverride override = getStartTime(new MdcOverride());
464 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
465 HttpURLConnection conn = initializeConnection(fullUrl);
468 conn.setRequestMethod("POST");
469 } catch (ProtocolException e) {
470 shutdownConnection(conn);
471 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
474 attachDocument(conn, document);
476 logger.debug("Sending 'POST' request to: " + conn.getURL());
478 handleResponse(conn, response);
479 buildDocumentResult(response, indexName);
481 // Generate a metrics log so we can track how long the operation took.
482 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
484 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
485 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
489 shutdownConnection(conn);
494 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
495 throws DocumentStoreOperationException {
496 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
497 conn.setRequestProperty("Connection", "Close");
499 attachContent(conn, doc.getContentInJson());
502 private DocumentOperationResult checkDocumentExistence(String indexName,
504 throws DocumentStoreOperationException {
505 DocumentOperationResult opResult = new DocumentOperationResult();
507 // Initialize operation result with a failure codes / fault string
508 opResult.setResultCode(500);
510 // Grab the current time so we can use it to generate a metrics log.
511 MdcOverride override = getStartTime(new MdcOverride());
513 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
514 HttpURLConnection conn = initializeConnection(fullUrl);
517 conn.setRequestMethod("HEAD");
518 } catch (ProtocolException e) {
519 shutdownConnection(conn);
520 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
523 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
527 resultCode = conn.getResponseCode();
528 } catch (IOException e) {
529 shutdownConnection(conn);
530 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
533 logger.debug("Response Code : " + resultCode);
535 opResult.setResultCode(resultCode);
537 // Generate a metrics log so we can track how long the operation took.
538 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
540 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
541 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
546 shutdownConnection(conn);
552 public DocumentOperationResult updateDocument(String indexName,
553 DocumentStoreDataEntity document,
554 boolean allowImplicitIndexCreation)
555 throws DocumentStoreOperationException {
557 if(!allowImplicitIndexCreation) {
559 // Before we do anything, make sure that the specified index actually exists in the
560 // document store - we don't want to rely on ElasticSearch to fail the document
561 // create because it could be configured to implicitly create a non-existent index,
562 // which can lead to hard-to-debug behaviour with queries down the road.
563 OperationResult indexExistsResult = checkIndexExistence(indexName);
564 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
566 DocumentOperationResult opResult = new DocumentOperationResult();
567 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
568 opResult.setResult("Document Index '" + indexName + "' does not exist.");
569 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
574 DocumentOperationResult opResult = new DocumentOperationResult();
576 // Initialize operation result with a failure codes / fault string
577 opResult.setResultCode(500);
578 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
580 // Grab the current time so we can use it to generate a metrics log.
581 MdcOverride override = getStartTime(new MdcOverride());
583 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
584 + "?version=" + document.getVersion(), false);
585 HttpURLConnection conn = initializeConnection(fullUrl);
588 conn.setRequestMethod("PUT");
589 } catch (ProtocolException e) {
590 shutdownConnection(conn);
591 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
594 attachDocument(conn, document);
596 logger.debug("Sending 'PUT' request to: " + conn.getURL());
598 handleResponse(conn, opResult);
599 buildDocumentResult(opResult, indexName);
601 // Generate a metrics log so we can track how long the operation took.
602 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
604 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
605 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
610 shutdownConnection(conn);
616 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
617 throws DocumentStoreOperationException {
618 DocumentOperationResult opResult = new DocumentOperationResult();
620 // Initialize operation result with a failure codes / fault string
621 opResult.setResultCode(500);
622 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
624 // Grab the current time so we can use it to generate a metrics log.
625 MdcOverride override = getStartTime(new MdcOverride());
627 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
628 + "?version=" + document.getVersion(), false);
629 HttpURLConnection conn = initializeConnection(fullUrl);
632 conn.setRequestMethod("DELETE");
633 } catch (ProtocolException e) {
634 shutdownConnection(conn);
635 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
638 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
640 handleResponse(conn, opResult);
641 buildDocumentResult(opResult, indexName);
642 //supress the etag and url in response for delete as they are not required
643 if (opResult.getDocument() != null) {
644 opResult.getDocument().setEtag(null);
645 opResult.getDocument().setUrl(null);
648 // Generate a metrics log so we can track how long the operation took.
649 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
651 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
652 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
657 shutdownConnection(conn);
663 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
664 throws DocumentStoreOperationException {
665 DocumentOperationResult opResult = new DocumentOperationResult();
667 // Initialize operation result with a failure codes / fault string
668 opResult.setResultCode(500);
669 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
671 // Grab the current time so we can use it to generate a metrics log.
672 MdcOverride override = getStartTime(new MdcOverride());
674 String fullUrl = null;
675 if (document.getVersion() == null) {
676 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
678 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
679 + "?version=" + document.getVersion(), false);
681 HttpURLConnection conn = initializeConnection(fullUrl);
683 logger.debug("\nSending 'GET' request to: " + conn.getURL());
685 handleResponse(conn, opResult);
686 buildDocumentResult(opResult, indexName);
688 // Generate a metrics log so we can track how long the operation took.
689 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
691 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
692 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
697 shutdownConnection(conn);
702 public SearchOperationResult search(String indexName, String queryString)
703 throws DocumentStoreOperationException {
704 SearchOperationResult opResult = new SearchOperationResult();
706 // Initialize operation result with a failure codes / fault string
707 opResult.setResultCode(500);
708 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
710 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
712 // Grab the current time so we can use it to generate a metrics log.
713 MdcOverride override = getStartTime(new MdcOverride());
715 HttpURLConnection conn = initializeConnection(fullUrl);
718 conn.setRequestMethod("GET");
719 } catch (ProtocolException e) {
720 shutdownConnection(conn);
721 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
724 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
726 handleResponse(conn, opResult);
727 buildSearchResult(opResult, indexName);
730 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
732 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
733 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
741 public SearchOperationResult searchWithPayload(String indexName, String query)
742 throws DocumentStoreOperationException {
743 SearchOperationResult opResult = new SearchOperationResult();
745 if (logger.isDebugEnabled()) {
746 logger.debug("Querying index: " + indexName + " with query string: " + query);
749 // Initialize operation result with a failure codes / fault string
750 opResult.setResultCode(500);
751 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
753 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
755 // Grab the current time so we can use it to generate a metrics log.
756 MdcOverride override = getStartTime(new MdcOverride());
758 HttpURLConnection conn = initializeConnection(fullUrl);
761 conn.setRequestMethod("POST");
762 } catch (ProtocolException e) {
763 shutdownConnection(conn);
764 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
767 attachContent(conn, query);
769 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
770 logger.debug("Request body = Elasticsearch query = " + query);
772 handleResponse(conn, opResult);
773 buildSearchResult(opResult, indexName);
775 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
777 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
778 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
783 shutdownConnection(conn);
788 private void attachContent(HttpURLConnection conn, String content)
789 throws DocumentStoreOperationException {
790 OutputStream outputStream = null;
791 OutputStreamWriter out = null;
794 outputStream = conn.getOutputStream();
795 } catch (IOException e) {
796 shutdownConnection(conn);
797 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
800 out = new OutputStreamWriter(outputStream);
805 } catch (IOException e) {
806 shutdownConnection(conn);
807 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
811 private HttpURLConnection initializeConnection(String fullUrl)
812 throws DocumentStoreOperationException {
814 HttpURLConnection conn = null;
817 url = new URL(fullUrl);
818 } catch (MalformedURLException e) {
819 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
823 conn = (HttpURLConnection) url.openConnection();
824 conn.setDoOutput(true);
825 } catch (IOException e) {
826 shutdownConnection(conn);
827 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
833 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
834 throws DocumentStoreOperationException {
835 int resultCode = 200;
838 resultCode = conn.getResponseCode();
839 } catch (IOException e) {
840 shutdownConnection(conn);
841 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
844 logger.debug("Response Code : " + resultCode);
846 InputStream inputStream = null;
848 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
849 inputStream = conn.getErrorStream();
852 inputStream = conn.getInputStream();
853 } catch (IOException e) {
854 shutdownConnection(conn);
855 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
859 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
860 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
862 StringBuilder result = new StringBuilder(128);
863 String string = null;
866 while ((string = bufferedreader.readLine()) != null) {
867 result.append(string).append("\n");
869 } catch (IOException e) {
870 shutdownConnection(conn);
871 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
874 if (resultCode == Status.CONFLICT.getStatusCode()) {
875 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
877 opResult.setResultCode(resultCode);
879 if (logger.isDebugEnabled()) {
880 logger.debug("Raw result string from ElasticSearch = " + result.toString());
882 opResult.setResult(result.toString());
883 opResult.setResultVersion(extractVersion(result.toString()));
886 private String extractVersion(String result) throws DocumentStoreOperationException {
888 JSONParser parser = new JSONParser();
889 String version = null;
891 JSONObject root = (JSONObject) parser.parse(result);
892 if (root.get("_version") != null) {
893 version = root.get("_version").toString();
896 } catch (ParseException e) {
898 // Not all responses from ElasticSearch include a version, so
899 // if we don't get one back, just return an empty string rather
900 // than trigger a false failure.
907 * This convenience method gets the current system time and stores
908 * it in an attribute in the supplied {@link MdcOverride} object so
909 * that it can be used later by the metrics logger.
911 * @param override - The {@link MdcOverride} object to update.
912 * @return - The supplied {@link MdcOverride} object.
914 private MdcOverride getStartTime(MdcOverride override) {
916 // Grab the current time...
917 long startTimeInMs = System.currentTimeMillis();
919 // ...and add it as an attribute to the supplied MDC Override
921 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
922 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
924 // Return the MdcOverride object that we were passed.
925 // This looks odd, but it allows us to do stuff like:
927 // MdcOverride ov = getStartTime(new MdcOverride())
929 // which is quite handy, but also allows us to pass in an existing
930 // MdcOverride object which already has some attributes set.
934 private boolean isSuccess(OperationResult result) {
936 return isSuccessCode(result.getResultCode());
940 private boolean isSuccessCode(int statusCode) {
941 return ((statusCode >= 200) && (statusCode < 300));
946 public OperationResult performBulkOperations(BulkRequest[] requests)
947 throws DocumentStoreOperationException {
949 if (logger.isDebugEnabled()) {
950 String dbgString = "ESController: performBulkOperations - Operations: ";
952 for (BulkRequest request : requests) {
953 dbgString += "[" + request.toString() + "] ";
956 logger.debug(dbgString);
959 // Grab the current time so we can use it to generate a metrics log.
960 MdcOverride override = getStartTime(new MdcOverride());
962 // Parse the supplied set of operations.
963 // Iterate over the list of operations which we were provided and
964 // translate them into a format that ElasticSearh understands.
966 StringBuilder esOperationSet = new StringBuilder(128);
967 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
968 for (BulkRequest request : requests) {
970 // Convert the request to the syntax ElasticSearch likes.
971 if (buildEsOperation(request, esOperationSet, rejected)) {
976 ElasticSearchBulkOperationResult opResult = null;
979 // Open an HTTP connection to the ElasticSearch back end.
980 String fullUrl = getFullUrl("/_bulk", false);
982 HttpURLConnection conn;
985 url = new URL(fullUrl);
986 conn = (HttpURLConnection) url.openConnection();
987 conn.setRequestMethod("PUT");
988 conn.setDoOutput(true);
989 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
990 conn.setRequestProperty("Connection", "Close");
992 } catch (IOException e) {
994 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
995 if (logger.isDebugEnabled()) {
996 logger.debug(Throwables.getStackTraceAsString(e));
999 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
1000 + e.getMessage(), e);
1003 StringBuilder bulkResult = new StringBuilder(128);
1005 // Create an output stream to write our request to.
1006 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1009 if (logger.isDebugEnabled()) {
1010 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1011 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1015 // Write the resulting request string to our output stream. (this sends the request to ES?)
1016 out.write(esOperationSet.toString());
1019 // Open an input stream on our connection in order to read back the results.
1020 InputStream is = conn.getInputStream();
1021 InputStreamReader inputstreamreader = new InputStreamReader(is);
1022 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1024 // Read the contents of the input stream into our result string...
1025 String esResponseString = null;
1027 while ((esResponseString = bufferedreader.readLine()) != null) {
1028 bulkResult.append(esResponseString).append("\n");
1031 } catch (IOException e) {
1033 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1034 if (logger.isDebugEnabled()) {
1035 StringWriter sw = new StringWriter();
1036 e.printStackTrace(new PrintWriter(sw));
1037 logger.debug(sw.toString());
1040 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1041 + e.getMessage(), e);
1044 if (logger.isDebugEnabled()) {
1045 logger.debug("ESController: Received result string from ElasticSearch: = "
1046 + bulkResult.toString());
1049 // ...and marshal the resulting string into a Java object.
1051 opResult = marshallEsBulkResult(bulkResult.toString());
1053 } catch (IOException e) {
1055 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1056 if (logger.isDebugEnabled()) {
1057 logger.debug(Throwables.getStackTraceAsString(e));
1060 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1061 + e.getMessage(), e);
1065 // Finally, build the operation result and return it to the caller.
1066 OperationResult result = new OperationResult();
1067 result.setResultCode(207);
1068 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1070 // In the success case we don't want the entire result string to be
1071 // dumped into the metrics log, so concatenate it.
1072 String resultStringForMetricsLog = result.getResult();
1073 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1074 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1075 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1078 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1080 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1081 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1089 * This method converts a {@link BulkRequest} object into a json structure
1090 * which can be understood by ElasticSearch.
1092 * @param request - The request to be performed.
1093 * @param sb - The string builder to append the json data to
1094 * @throws DocumentStoreOperationException
1096 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1097 List<ElasticSearchResultItem> fails)
1098 throws DocumentStoreOperationException {
1100 boolean retVal = true;
1101 OperationResult indexExistsResult = null;
1103 // What kind of operation are we performing?
1104 switch (request.getOperationType()) {
1106 // Create a new document.
1109 // Make sure that we were supplied a document payload.
1110 if (request.getOperation().getDocument() == null) {
1112 fails.add(generateRejectionEntry(request.getOperationType(),
1113 "Missing document payload",
1117 request.getOperation().getMetaData().getUrl()));
1121 // Make sure that the supplied document URL is formatted
1123 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1124 fails.add(generateRejectionEntry(request.getOperationType(),
1125 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1129 request.getOperation().getMetaData().getUrl()));
1133 // Validate that the specified index actually exists before we
1134 // try to perform the create.
1135 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1137 fails.add(generateRejectionEntry(request.getOperationType(),
1138 "Specified resource does not exist: "
1139 + request.getOperation().getMetaData().getUrl(),
1143 request.getOperation().getMetaData().getUrl()));
1147 // If we were supplied an id for the new document, then
1148 // include it in the bulk operation to Elastic Search
1149 if (request.getId() == null) {
1151 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1155 // Otherwise, we just leave that parameter off and ElasticSearch
1156 // will generate one for us.
1158 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1165 // Append the document that we want to create.
1166 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1167 } catch (JsonProcessingException e) {
1168 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1173 // Update an existing document.
1176 // Make sure that we were supplied a document payload.
1177 if (request.getOperation().getDocument() == null) {
1179 fails.add(generateRejectionEntry(request.getOperationType(),
1180 "Missing document payload",
1184 request.getOperation().getMetaData().getUrl()));
1188 // Make sure that the supplied document URL is formatted
1190 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1191 fails.add(generateRejectionEntry(request.getOperationType(),
1192 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1196 request.getOperation().getMetaData().getUrl()));
1200 // Validate that the specified index actually exists before we
1201 // try to perform the update.
1202 if (!indexExists(request.getIndex())) {
1204 fails.add(generateRejectionEntry(request.getOperationType(),
1205 "Specified resource does not exist: "
1206 + request.getOperation().getMetaData().getUrl(),
1210 request.getOperation().getMetaData().getUrl()));
1214 // Validate that the document we are trying to update actually
1215 // exists before we try to perform the update.
1216 if (!documentExists(request.getIndex(), request.getId())) {
1218 fails.add(generateRejectionEntry(request.getOperationType(),
1219 "Specified resource does not exist: "
1220 + request.getOperation().getMetaData().getUrl(),
1224 request.getOperation().getMetaData().getUrl()));
1228 // It is mandatory that a version be supplied for an update operation,
1229 // so validate that now.
1230 if (request.getOperation().getMetaData().getEtag() == null) {
1232 fails.add(generateRejectionEntry(request.getOperationType(),
1233 "Missing mandatory ETag field",
1237 request.getOperation().getMetaData().getUrl()));
1241 // Generate the update request...
1242 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1246 request.getOperation().getMetaData().getEtag()));
1248 // ...and append the document that we want to update.
1250 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1251 } catch (JsonProcessingException e) {
1252 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1256 // Delete an existing document.
1259 // Make sure that the supplied document URL is formatted
1261 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1262 fails.add(generateRejectionEntry(request.getOperationType(),
1263 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1267 request.getOperation().getMetaData().getUrl()));
1271 // Validate that the specified index actually exists before we
1272 // try to perform the delete.
1273 if (!indexExists(request.getIndex())) {
1275 fails.add(generateRejectionEntry(request.getOperationType(),
1276 "Specified resource does not exist: "
1277 + request.getOperation().getMetaData().getUrl(),
1281 request.getOperation().getMetaData().getUrl()));
1285 // Validate that the document we are trying to update actually
1286 // exists before we try to perform the delete.
1287 if (!documentExists(request.getIndex(), request.getId())) {
1289 fails.add(generateRejectionEntry(request.getOperationType(),
1290 "Specified resource does not exist: "
1291 + request.getOperation().getMetaData().getUrl(),
1295 request.getOperation().getMetaData().getUrl()));
1299 // It is mandatory that a version be supplied for a delete operation,
1300 // so validate that now.
1301 if (request.getOperation().getMetaData().getEtag() == null) {
1303 fails.add(generateRejectionEntry(request.getOperationType(),
1304 "Missing mandatory ETag field",
1308 request.getOperation().getMetaData().getUrl()));
1312 // Generate the delete request.
1313 sb.append(String.format(BULK_DELETE_TEMPLATE,
1317 request.getOperation().getMetaData().getEtag()));
1325 private boolean indexExists(String index) throws DocumentStoreOperationException {
1327 OperationResult indexExistsResult = checkIndexExistence(index);
1329 return ((indexExistsResult.getResultCode() >= 200)
1330 && (indexExistsResult.getResultCode() < 300));
1333 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1335 OperationResult docExistsResult = checkDocumentExistence(index, id);
1337 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1341 * This method constructs a status entry for a bulk operation which has
1342 * been rejected before even sending it to the document store.
1344 * @param rejectReason - A message describing why the operation was rejected.
1345 * @param anId - The identifier associated with the document being
1347 * @param statusCode - An HTTP status code.
1348 * @return - A result set item.
1350 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1351 String rejectReason,
1355 String originalUrl) {
1357 ElasticSearchError err = new ElasticSearchError();
1358 err.setReason(rejectReason);
1360 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1363 op.setStatus(statusCode);
1365 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1367 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1371 rejectionResult.setCreate(op);
1374 rejectionResult.setIndex(op);
1377 rejectionResult.setDelete(op);
1382 return rejectionResult;
1387 * This method takes the json structure returned from ElasticSearch in
1388 * response to a bulk operations request and marshals it into a Java
1391 * @param jsonResult - The bulk operations response returned from
1393 * @return - The marshalled response.
1394 * @throws JsonParseException
1395 * @throws JsonMappingException
1396 * @throws IOException
1398 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1399 throws JsonParseException, JsonMappingException, IOException {
1401 if (jsonResult != null) {
1402 if (logger.isDebugEnabled()) {
1403 logger.debug("ESController: Marshalling ES result set from json: "
1404 + jsonResult.replaceAll("\n", ""));
1407 ObjectMapper mapper = new ObjectMapper();
1408 mapper.setSerializationInclusion(Include.NON_EMPTY);
1410 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1418 * This method takes the marshalled ElasticSearch bulk response and
1419 * converts it into a generic response payload.
1421 * @param esResult - ElasticSearch bulk operations response.
1422 * @return - A generic result set.
1424 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1425 List<ElasticSearchResultItem> rejectedOps) {
1428 int totalSuccess = 0;
1431 if (logger.isDebugEnabled()) {
1433 logger.debug("ESController: Build generic result set. ES Results: "
1434 + ((esResult != null) ? esResult.toString() : "[]")
1435 + " Rejected Ops: " + rejectedOps.toString());
1438 // Build a combined list of result items from the results returned
1439 // from ElasticSearch and the list of operations that we rejected
1440 // without sending to ElasticSearch.
1441 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1442 if (esResult != null) {
1443 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1445 combinedResults.addAll(rejectedOps);
1447 // Iterate over the individual results in the resulting result set.
1448 StringBuilder resultsBuilder = new StringBuilder();
1449 AtomicBoolean firstItem = new AtomicBoolean(true);
1450 for (ElasticSearchResultItem item : combinedResults) {
1452 // Increment the operation counts.
1454 if (isSuccessCode(item.operationStatus().getStatus())) {
1460 // Prepend a comma to our response string unless this it the
1461 // first result in the set.
1462 if (!firstItem.compareAndSet(true, false)) {
1463 resultsBuilder.append(", ");
1466 // Append the current result as a generic json structure.
1467 resultsBuilder.append(item.toJson());
1470 // Now, build the result string and return it.
1471 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1472 + "\"total_success\": " + totalSuccess + ", "
1473 + "\"total_fails\": " + totalFails + ", "
1475 + resultsBuilder.toString()
1478 return responseBody;
1483 * This method queryies ElasticSearch to determine if the supplied
1484 * index is present in the document store.
1486 * @param indexName - The index to look for.
1487 * @return - An operation result indicating the success or failure of
1489 * @throws DocumentStoreOperationException
1491 public OperationResult checkIndexExistence(String indexName)
1492 throws DocumentStoreOperationException {
1494 // Initialize operation result with a failure codes / fault string
1495 OperationResult opResult = new OperationResult();
1496 opResult.setResultCode(500);
1498 // Grab the current time so we can use it to generate a metrics log.
1499 MdcOverride override = getStartTime(new MdcOverride());
1501 String fullUrl = getFullUrl("/" + indexName, false);
1502 HttpURLConnection conn = initializeConnection(fullUrl);
1505 conn.setRequestMethod("HEAD");
1507 } catch (ProtocolException e) {
1508 shutdownConnection(conn);
1509 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1512 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1516 resultCode = conn.getResponseCode();
1517 } catch (IOException e) {
1518 shutdownConnection(conn);
1519 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1521 logger.debug("Response Code : " + resultCode);
1523 opResult.setResultCode(resultCode);
1525 // Generate a metrics log so we can track how long the operation took.
1526 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1528 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1529 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1533 shutdownConnection(conn);
1539 private void buildDocumentResult(DocumentOperationResult result, String index)
1540 throws DocumentStoreOperationException {
1542 JSONParser parser = new JSONParser();
1545 root = (JSONObject) parser.parse(result.getResult());
1547 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1548 // Success response object
1549 Document doc = new Document();
1550 doc.setEtag(result.getResultVersion());
1551 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1553 doc.setContent((JSONObject) root.get("_source"));
1554 result.setDocument(doc);
1557 // Error response object
1558 JSONObject error = (JSONObject) root.get("error");
1559 if (error != null) {
1560 result.setError(new ErrorResult(error.get("type").toString(),
1561 error.get("reason").toString()));
1565 } catch (Exception e) {
1566 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1567 + result.getResult());
1573 private String buildDocumentResponseUrl(String index, String id) {
1574 return ApiUtils.buildDocumentUri(index, id);
1577 private void buildSearchResult(SearchOperationResult result, String index)
1578 throws DocumentStoreOperationException {
1580 JSONParser parser = new JSONParser();
1584 root = (JSONObject) parser.parse(result.getResult());
1585 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1586 JSONObject hits = (JSONObject) root.get("hits");
1587 JSONArray hitArray = (JSONArray) hits.get("hits");
1588 SearchHits searchHits = new SearchHits();
1589 searchHits.setTotalHits(hits.get("total").toString());
1590 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1592 for (int i = 0; i < hitArray.size(); i++) {
1593 JSONObject hit = (JSONObject) hitArray.get(i);
1594 SearchHit searchHit = new SearchHit();
1595 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1596 Document doc = new Document();
1597 if (hit.get("_version") != null) {
1598 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1601 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1602 ? hit.get("_id").toString() : ""));
1603 doc.setContent((JSONObject) hit.get("_source"));
1604 searchHit.setDocument(doc);
1605 searchHitArray.add(searchHit);
1607 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1608 result.setSearchResult(searchHits);
1610 JSONObject aggregations = (JSONObject) root.get("aggregations");
1611 if (aggregations != null) {
1612 AggregationResult[] aggResults =
1613 AggregationParsingUtil.parseAggregationResults(aggregations);
1614 AggregationResults aggs = new AggregationResults();
1615 aggs.setAggregations(aggResults);
1616 result.setAggregationResult(aggs);
1621 JSONObject error = (JSONObject) root.get("error");
1622 if (error != null) {
1623 result.setError(new ErrorResult(error.get("type").toString(),
1624 error.get("reason").toString()));
1627 } catch (Exception e) {
1628 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1629 + result.getResult());