2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License ati
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.openecomp.sa.searchdbabstraction.elasticsearch.dao;
27 import com.att.aft.dme2.internal.google.common.base.Throwables;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import edu.emory.mathcs.backport.java.util.Arrays;
34 import org.json.simple.JSONArray;
35 import org.json.simple.JSONObject;
36 import org.json.simple.parser.JSONParser;
37 import org.json.simple.parser.ParseException;
38 import org.openecomp.cl.api.LogFields;
39 import org.openecomp.cl.api.LogLine;
40 import org.openecomp.cl.api.Logger;
41 import org.openecomp.cl.eelf.LoggerFactory;
42 import org.openecomp.cl.mdc.MdcContext;
43 import org.openecomp.cl.mdc.MdcOverride;
44 import org.openecomp.sa.rest.AnalysisConfiguration;
45 import org.openecomp.sa.rest.ApiUtils;
46 import org.openecomp.sa.rest.BulkRequest;
47 import org.openecomp.sa.rest.BulkRequest.OperationType;
48 import org.openecomp.sa.rest.DocumentSchema;
49 import org.openecomp.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
50 import org.openecomp.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
51 import org.openecomp.sa.searchdbabstraction.entity.AggregationResult;
52 import org.openecomp.sa.searchdbabstraction.entity.AggregationResults;
53 import org.openecomp.sa.searchdbabstraction.entity.Document;
54 import org.openecomp.sa.searchdbabstraction.entity.DocumentOperationResult;
55 import org.openecomp.sa.searchdbabstraction.entity.ErrorResult;
56 import org.openecomp.sa.searchdbabstraction.entity.OperationResult;
57 import org.openecomp.sa.searchdbabstraction.entity.SearchHit;
58 import org.openecomp.sa.searchdbabstraction.entity.SearchHits;
59 import org.openecomp.sa.searchdbabstraction.entity.SearchOperationResult;
60 import org.openecomp.sa.searchdbabstraction.logging.SearchDbMsgs;
61 import org.openecomp.sa.searchdbabstraction.util.AggregationParsingUtil;
62 import org.openecomp.sa.searchdbabstraction.util.DocumentSchemaUtil;
63 import org.openecomp.sa.searchdbabstraction.util.SearchDbConstants;
65 import java.io.BufferedReader;
67 import java.io.FileInputStream;
68 import java.io.IOException;
69 import java.io.InputStream;
70 import java.io.InputStreamReader;
71 import java.io.OutputStream;
72 import java.io.OutputStreamWriter;
73 import java.io.PrintWriter;
74 import java.io.StringWriter;
75 import java.net.HttpURLConnection;
76 import java.net.MalformedURLException;
77 import java.net.ProtocolException;
79 import java.text.SimpleDateFormat;
80 import java.util.ArrayList;
81 import java.util.List;
82 import java.util.Properties;
83 import java.util.concurrent.atomic.AtomicBoolean;
84 import javax.ws.rs.core.Response.Status;
88 * This class has the Elasticsearch implementation of the
89 * DB operations defined in DocumentStoreInterface.
91 public class ElasticSearchHttpController implements DocumentStoreInterface {
93 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
94 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
95 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
96 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
97 private static final String BULK_IMPORT_INDEX_TEMPLATE =
98 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
99 private static final String BULK_DELETE_TEMPLATE =
100 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
102 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
103 "Internal Error: ElasticSearch operation fault occurred";
104 private static final Logger logger = LoggerFactory.getInstance()
105 .getLogger(ElasticSearchHttpController.class.getName());
106 private static final Logger metricsLogger = LoggerFactory.getInstance()
107 .getMetricsLogger(ElasticSearchHttpController.class.getName());
108 private final ElasticSearchConfig config;
110 private static final String DEFAULT_TYPE = "default";
112 private static ElasticSearchHttpController instance = null;
114 protected AnalysisConfiguration analysisConfig;
116 public static ElasticSearchHttpController getInstance() {
118 synchronized (ElasticSearchHttpController.class) {
120 if (instance == null) {
122 Properties properties = new Properties();
123 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
125 properties.load(new FileInputStream(file));
126 } catch (Exception e) {
127 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
128 "ElasticSearchHTTPController.getInstance",
129 e.getLocalizedMessage());
132 ElasticSearchConfig config = new ElasticSearchConfig(properties);
133 instance = new ElasticSearchHttpController(config);
140 public ElasticSearchHttpController(ElasticSearchConfig config) {
141 this.config = config;
142 analysisConfig = new AnalysisConfiguration();
145 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
147 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
148 } catch (Exception e) {
149 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
150 getFullUrl("", false), e.getMessage());
155 public AnalysisConfiguration getAnalysisConfig() {
156 return analysisConfig;
160 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
162 OperationResult result = new OperationResult();
163 result.setResultCode(500);
167 // Submit the request to ElasticSearch to create the index using a
168 // default document type.
169 result = createTable(index,
171 analysisConfig.getEsIndexSettings(),
172 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
174 // ElasticSearch will return us a 200 code on success when we
175 // want to report a 201, so translate the result here.
176 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
177 if (isSuccess(result)) {
178 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
179 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
182 } catch (DocumentStoreOperationException e) {
184 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
192 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
194 //Initialize operation result with a failure codes / fault string
195 OperationResult opResult = new OperationResult();
196 opResult.setResultCode(500);
197 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
199 // Grab the current time so we can use it to generate a metrics log.
200 MdcOverride override = getStartTime(new MdcOverride());
202 String fullUrl = getFullUrl("/" + indexName + "/", false);
203 HttpURLConnection conn = initializeConnection(fullUrl);
205 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
208 conn.setRequestMethod("DELETE");
209 } catch (ProtocolException e) {
210 shutdownConnection(conn);
211 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
214 handleResponse(conn, opResult);
216 // Generate a metrics log so we can track how long the operation took.
217 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
219 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
220 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
224 shutdownConnection(conn);
230 private OperationResult checkConnection() throws Exception {
232 String fullUrl = getFullUrl("/_cluster/health", false);
234 HttpURLConnection conn = null;
236 url = new URL(fullUrl);
237 conn = (HttpURLConnection) url.openConnection();
238 conn.setRequestMethod("GET");
239 conn.setDoOutput(true);
240 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
242 int resultCode = conn.getResponseCode();
243 logger.debug("getClusterHealth() response Code : " + resultCode);
244 OperationResult opResult = new OperationResult();
245 opResult.setResultCode(resultCode);
247 shutdownConnection(conn);
252 private String getFullUrl(String resourceUrl, boolean isSecure) {
254 final String host = config.getIpAddress();
255 final String port = config.getHttpPort();
258 return String.format("https://%s:%s%s", host, port, resourceUrl);
260 return String.format("http://%s:%s%s", host, port, resourceUrl);
264 private void shutdownConnection(HttpURLConnection connection) {
265 if (connection == null) {
269 InputStream inputstream = null;
270 OutputStream outputstream = null;
273 inputstream = connection.getInputStream();
274 } catch (IOException e) {
275 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
277 if (inputstream != null) {
280 } catch (IOException e) {
281 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
282 e.getLocalizedMessage());
288 outputstream = connection.getOutputStream();
289 } catch (IOException e) {
290 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
292 if (outputstream != null) {
294 outputstream.close();
295 } catch (IOException e) {
296 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
297 e.getLocalizedMessage());
302 connection.disconnect();
306 protected OperationResult createTable(String indexName, String typeName,
307 String indexSettings, String indexMappings)
308 throws DocumentStoreOperationException {
310 if (indexSettings == null) {
311 logger.debug("No settings provided.");
314 if (indexMappings == null) {
315 logger.debug("No mappings provided.");
318 OperationResult opResult = new OperationResult();
320 // Initialize operation result with a failure codes / fault string
321 opResult.setResultCode(500);
322 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
324 // Grab the current time so we can use it to generate a metrics log.
325 MdcOverride override = getStartTime(new MdcOverride());
327 String fullUrl = getFullUrl("/" + indexName + "/", false);
328 HttpURLConnection conn = initializeConnection(fullUrl);
331 conn.setRequestMethod("PUT");
332 } catch (ProtocolException e) {
333 shutdownConnection(conn);
334 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
337 StringBuilder sb = new StringBuilder(128);
338 sb.append("{ \"settings\" : ");
339 sb.append(indexSettings);
342 sb.append("\"mappings\" : {");
343 sb.append("\"" + typeName + "\" :");
344 sb.append(indexMappings);
347 attachContent(conn, sb.toString());
349 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
350 logger.debug("Request content: " + sb.toString());
352 handleResponse(conn, opResult);
354 shutdownConnection(conn);
356 // Generate a metrics log so we can track how long the operation took.
357 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
359 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
360 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
368 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document)
369 throws DocumentStoreOperationException {
370 if (document.getId() == null || document.getId().isEmpty()) {
371 return createDocumentWithoutId(indexName, document);
373 return createDocumentWithId(indexName, document);
377 private DocumentOperationResult createDocumentWithId(String indexName,
378 DocumentStoreDataEntity document)
379 throws DocumentStoreOperationException {
380 // check if the document already exists
381 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
384 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
385 if (opResult.getResultCode() == Status.OK.getStatusCode()) {
386 opResult.setFailureCause("A document with the same id already exists.");
388 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
390 opResult.setResultCode(Status.CONFLICT.getStatusCode());
394 opResult = new DocumentOperationResult();
395 // Initialize operation result with a failure codes / fault string
396 opResult.setResultCode(500);
397 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
399 // Grab the current time so we can use it to generate a metrics log.
400 MdcOverride override = getStartTime(new MdcOverride());
402 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
403 + "/" + document.getId(), false);
404 HttpURLConnection conn = initializeConnection(fullUrl);
407 conn.setRequestMethod("PUT");
408 } catch (ProtocolException e) {
409 shutdownConnection(conn);
410 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
413 attachDocument(conn, document);
415 logger.debug("Sending 'PUT' request to: " + conn.getURL());
417 handleResponse(conn, opResult);
418 buildDocumentResult(opResult, indexName);
420 // Generate a metrics log so we can track how long the operation took.
421 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
423 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
424 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
428 shutdownConnection(conn);
434 private DocumentOperationResult createDocumentWithoutId(String indexName,
435 DocumentStoreDataEntity document)
436 throws DocumentStoreOperationException {
438 DocumentOperationResult response = new DocumentOperationResult();
439 // Initialize operation result with a failure codes / fault string
440 response.setResultCode(500);
441 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
443 // Grab the current time so we can use it to generate a metrics log.
444 MdcOverride override = getStartTime(new MdcOverride());
446 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
447 HttpURLConnection conn = initializeConnection(fullUrl);
450 conn.setRequestMethod("POST");
451 } catch (ProtocolException e) {
452 shutdownConnection(conn);
453 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
456 attachDocument(conn, document);
458 logger.debug("Sending 'POST' request to: " + conn.getURL());
460 handleResponse(conn, response);
461 buildDocumentResult(response, indexName);
463 // Generate a metrics log so we can track how long the operation took.
464 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
466 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
467 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
471 shutdownConnection(conn);
476 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
477 throws DocumentStoreOperationException {
478 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
479 conn.setRequestProperty("Connection", "Close");
481 attachContent(conn, doc.getContentInJson());
484 private DocumentOperationResult checkDocumentExistence(String indexName,
486 throws DocumentStoreOperationException {
487 DocumentOperationResult opResult = new DocumentOperationResult();
489 // Initialize operation result with a failure codes / fault string
490 opResult.setResultCode(500);
492 // Grab the current time so we can use it to generate a metrics log.
493 MdcOverride override = getStartTime(new MdcOverride());
495 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
496 HttpURLConnection conn = initializeConnection(fullUrl);
499 conn.setRequestMethod("HEAD");
500 } catch (ProtocolException e) {
501 shutdownConnection(conn);
502 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
505 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
509 resultCode = conn.getResponseCode();
510 } catch (IOException e) {
511 shutdownConnection(conn);
512 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
515 logger.debug("Response Code : " + resultCode);
517 opResult.setResultCode(resultCode);
519 // Generate a metrics log so we can track how long the operation took.
520 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
522 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
523 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
528 shutdownConnection(conn);
534 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document)
535 throws DocumentStoreOperationException {
536 DocumentOperationResult opResult = new DocumentOperationResult();
538 // Initialize operation result with a failure codes / fault string
539 opResult.setResultCode(500);
540 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
542 // Grab the current time so we can use it to generate a metrics log.
543 MdcOverride override = getStartTime(new MdcOverride());
545 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
546 + "?version=" + document.getVersion(), false);
547 HttpURLConnection conn = initializeConnection(fullUrl);
550 conn.setRequestMethod("PUT");
551 } catch (ProtocolException e) {
552 shutdownConnection(conn);
553 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
556 attachDocument(conn, document);
558 logger.debug("Sending 'PUT' request to: " + conn.getURL());
560 handleResponse(conn, opResult);
561 buildDocumentResult(opResult, indexName);
563 // Generate a metrics log so we can track how long the operation took.
564 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
566 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
567 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
572 shutdownConnection(conn);
578 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
579 throws DocumentStoreOperationException {
580 DocumentOperationResult opResult = new DocumentOperationResult();
582 // Initialize operation result with a failure codes / fault string
583 opResult.setResultCode(500);
584 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
586 // Grab the current time so we can use it to generate a metrics log.
587 MdcOverride override = getStartTime(new MdcOverride());
589 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
590 + "?version=" + document.getVersion(), false);
591 HttpURLConnection conn = initializeConnection(fullUrl);
594 conn.setRequestMethod("DELETE");
595 } catch (ProtocolException e) {
596 shutdownConnection(conn);
597 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
600 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
602 handleResponse(conn, opResult);
603 buildDocumentResult(opResult, indexName);
604 //supress the etag and url in response for delete as they are not required
605 if (opResult.getDocument() != null) {
606 opResult.getDocument().setEtag(null);
607 opResult.getDocument().setUrl(null);
610 // Generate a metrics log so we can track how long the operation took.
611 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
613 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
614 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
619 shutdownConnection(conn);
625 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
626 throws DocumentStoreOperationException {
627 DocumentOperationResult opResult = new DocumentOperationResult();
629 // Initialize operation result with a failure codes / fault string
630 opResult.setResultCode(500);
631 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
633 // Grab the current time so we can use it to generate a metrics log.
634 MdcOverride override = getStartTime(new MdcOverride());
636 String fullUrl = null;
637 if (document.getVersion() == null) {
638 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
640 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
641 + "?version=" + document.getVersion(), false);
643 HttpURLConnection conn = initializeConnection(fullUrl);
645 logger.debug("\nSending 'GET' request to: " + conn.getURL());
647 handleResponse(conn, opResult);
648 buildDocumentResult(opResult, indexName);
650 // Generate a metrics log so we can track how long the operation took.
651 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
653 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
654 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
659 shutdownConnection(conn);
664 public SearchOperationResult search(String indexName, String queryString)
665 throws DocumentStoreOperationException {
666 SearchOperationResult opResult = new SearchOperationResult();
668 // Initialize operation result with a failure codes / fault string
669 opResult.setResultCode(500);
670 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
672 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
674 // Grab the current time so we can use it to generate a metrics log.
675 MdcOverride override = getStartTime(new MdcOverride());
677 HttpURLConnection conn = initializeConnection(fullUrl);
680 conn.setRequestMethod("GET");
681 } catch (ProtocolException e) {
682 shutdownConnection(conn);
683 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
686 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
688 handleResponse(conn, opResult);
689 buildSearchResult(opResult, indexName);
692 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
694 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
695 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
703 public SearchOperationResult searchWithPayload(String indexName, String query)
704 throws DocumentStoreOperationException {
705 SearchOperationResult opResult = new SearchOperationResult();
707 if (logger.isDebugEnabled()) {
708 logger.debug("Querying index: " + indexName + " with query string: " + query);
711 // Initialize operation result with a failure codes / fault string
712 opResult.setResultCode(500);
713 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
715 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
717 // Grab the current time so we can use it to generate a metrics log.
718 MdcOverride override = getStartTime(new MdcOverride());
720 HttpURLConnection conn = initializeConnection(fullUrl);
723 conn.setRequestMethod("POST");
724 } catch (ProtocolException e) {
725 shutdownConnection(conn);
726 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
729 attachContent(conn, query);
731 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
732 logger.debug("Request body = Elasticsearch query = " + query);
734 handleResponse(conn, opResult);
735 buildSearchResult(opResult, indexName);
737 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
739 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
740 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
745 shutdownConnection(conn);
750 private void attachContent(HttpURLConnection conn, String content)
751 throws DocumentStoreOperationException {
752 OutputStream outputStream = null;
753 OutputStreamWriter out = null;
756 outputStream = conn.getOutputStream();
757 } catch (IOException e) {
758 shutdownConnection(conn);
759 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
762 out = new OutputStreamWriter(outputStream);
767 } catch (IOException e) {
768 shutdownConnection(conn);
769 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
773 private HttpURLConnection initializeConnection(String fullUrl)
774 throws DocumentStoreOperationException {
776 HttpURLConnection conn = null;
779 url = new URL(fullUrl);
780 } catch (MalformedURLException e) {
781 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
785 conn = (HttpURLConnection) url.openConnection();
786 conn.setDoOutput(true);
787 } catch (IOException e) {
788 shutdownConnection(conn);
789 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
795 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
796 throws DocumentStoreOperationException {
797 int resultCode = 200;
800 resultCode = conn.getResponseCode();
801 } catch (IOException e) {
802 shutdownConnection(conn);
803 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
806 logger.debug("Response Code : " + resultCode);
808 InputStream inputStream = null;
810 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
811 inputStream = conn.getErrorStream();
814 inputStream = conn.getInputStream();
815 } catch (IOException e) {
816 shutdownConnection(conn);
817 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
821 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
822 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
824 StringBuilder result = new StringBuilder(128);
825 String string = null;
828 while ((string = bufferedreader.readLine()) != null) {
829 result.append(string).append("\n");
831 } catch (IOException e) {
832 shutdownConnection(conn);
833 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
836 if (resultCode == Status.CONFLICT.getStatusCode()) {
837 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
839 opResult.setResultCode(resultCode);
841 if (logger.isDebugEnabled()) {
842 logger.debug("Raw result string from ElasticSearch = " + result.toString());
844 opResult.setResult(result.toString());
845 opResult.setResultVersion(extractVersion(result.toString()));
848 private String extractVersion(String result) throws DocumentStoreOperationException {
850 JSONParser parser = new JSONParser();
851 String version = null;
853 JSONObject root = (JSONObject) parser.parse(result);
854 if (root.get("_version") != null) {
855 version = root.get("_version").toString();
858 } catch (ParseException e) {
860 // Not all responses from ElasticSearch include a version, so
861 // if we don't get one back, just return an empty string rather
862 // than trigger a false failure.
869 * This convenience method gets the current system time and stores
870 * it in an attribute in the supplied {@link MdcOverride} object so
871 * that it can be used later by the metrics logger.
873 * @param override - The {@link MdcOverride} object to update.
874 * @return - The supplied {@link MdcOverride} object.
876 private MdcOverride getStartTime(MdcOverride override) {
878 // Grab the current time...
879 long startTimeInMs = System.currentTimeMillis();
881 // ...and add it as an attribute to the supplied MDC Override
883 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
884 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
886 // Return the MdcOverride object that we were passed.
887 // This looks odd, but it allows us to do stuff like:
889 // MdcOverride ov = getStartTime(new MdcOverride())
891 // which is quite handy, but also allows us to pass in an existing
892 // MdcOverride object which already has some attributes set.
896 private boolean isSuccess(OperationResult result) {
898 return isSuccessCode(result.getResultCode());
902 private boolean isSuccessCode(int statusCode) {
903 return ((statusCode >= 200) && (statusCode < 300));
908 public OperationResult performBulkOperations(BulkRequest[] requests)
909 throws DocumentStoreOperationException {
911 if (logger.isDebugEnabled()) {
912 String dbgString = "ESController: performBulkOperations - Operations: ";
914 for (BulkRequest request : requests) {
915 dbgString += "[" + request.toString() + "] ";
918 logger.debug(dbgString);
921 // Grab the current time so we can use it to generate a metrics log.
922 MdcOverride override = getStartTime(new MdcOverride());
924 // Parse the supplied set of operations.
925 // Iterate over the list of operations which we were provided and
926 // translate them into a format that ElasticSearh understands.
928 StringBuilder esOperationSet = new StringBuilder(128);
929 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
930 for (BulkRequest request : requests) {
932 // Convert the request to the syntax ElasticSearch likes.
933 if (buildEsOperation(request, esOperationSet, rejected)) {
938 ElasticSearchBulkOperationResult opResult = null;
941 // Open an HTTP connection to the ElasticSearch back end.
942 String fullUrl = getFullUrl("/_bulk", false);
944 HttpURLConnection conn;
947 url = new URL(fullUrl);
948 conn = (HttpURLConnection) url.openConnection();
949 conn.setRequestMethod("PUT");
950 conn.setDoOutput(true);
951 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
952 conn.setRequestProperty("Connection", "Close");
954 } catch (IOException e) {
956 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
957 if (logger.isDebugEnabled()) {
958 logger.debug(Throwables.getStackTraceAsString(e));
961 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
962 + e.getMessage(), e);
965 StringBuilder bulkResult = new StringBuilder(128);
967 // Create an output stream to write our request to.
968 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
971 if (logger.isDebugEnabled()) {
972 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
973 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
977 // Write the resulting request string to our output stream. (this sends the request to ES?)
978 out.write(esOperationSet.toString());
981 // Open an input stream on our connection in order to read back the results.
982 InputStream is = conn.getInputStream();
983 InputStreamReader inputstreamreader = new InputStreamReader(is);
984 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
986 // Read the contents of the input stream into our result string...
987 String esResponseString = null;
989 while ((esResponseString = bufferedreader.readLine()) != null) {
990 bulkResult.append(esResponseString).append("\n");
993 } catch (IOException e) {
995 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
996 if (logger.isDebugEnabled()) {
997 StringWriter sw = new StringWriter();
998 e.printStackTrace(new PrintWriter(sw));
999 logger.debug(sw.toString());
1002 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1003 + e.getMessage(), e);
1006 if (logger.isDebugEnabled()) {
1007 logger.debug("ESController: Received result string from ElasticSearch: = "
1008 + bulkResult.toString());
1011 // ...and marshal the resulting string into a Java object.
1013 opResult = marshallEsBulkResult(bulkResult.toString());
1015 } catch (IOException e) {
1017 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1018 if (logger.isDebugEnabled()) {
1019 logger.debug(Throwables.getStackTraceAsString(e));
1022 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1023 + e.getMessage(), e);
1027 // Finally, build the operation result and return it to the caller.
1028 OperationResult result = new OperationResult();
1029 result.setResultCode(207);
1030 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1032 // In the success case we don't want the entire result string to be
1033 // dumped into the metrics log, so concatenate it.
1034 String resultStringForMetricsLog = result.getResult();
1035 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1036 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1037 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1040 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1042 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1043 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1051 * This method converts a {@link BulkRequest} object into a json structure
1052 * which can be understood by ElasticSearch.
1054 * @param request - The request to be performed.
1055 * @param sb - The string builder to append the json data to
1056 * @throws DocumentStoreOperationException
1058 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1059 List<ElasticSearchResultItem> fails)
1060 throws DocumentStoreOperationException {
1062 boolean retVal = true;
1063 OperationResult indexExistsResult = null;
1065 // What kind of operation are we performing?
1066 switch (request.getOperationType()) {
1068 // Create a new document.
1071 // Make sure that we were supplied a document payload.
1072 if (request.getOperation().getDocument() == null) {
1074 fails.add(generateRejectionEntry(request.getOperationType(),
1075 "Missing document payload",
1079 request.getOperation().getMetaData().getUrl()));
1083 // Make sure that the supplied document URL is formatted
1085 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1086 fails.add(generateRejectionEntry(request.getOperationType(),
1087 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1091 request.getOperation().getMetaData().getUrl()));
1095 // Validate that the specified index actually exists before we
1096 // try to perform the create.
1097 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1099 fails.add(generateRejectionEntry(request.getOperationType(),
1100 "Specified resource does not exist: "
1101 + request.getOperation().getMetaData().getUrl(),
1105 request.getOperation().getMetaData().getUrl()));
1109 // If we were supplied an id for the new document, then
1110 // include it in the bulk operation to Elastic Search
1111 if (request.getId() == null) {
1113 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1117 // Otherwise, we just leave that parameter off and ElasticSearch
1118 // will generate one for us.
1120 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1127 // Append the document that we want to create.
1128 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1129 } catch (JsonProcessingException e) {
1130 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1135 // Update an existing document.
1138 // Make sure that we were supplied a document payload.
1139 if (request.getOperation().getDocument() == null) {
1141 fails.add(generateRejectionEntry(request.getOperationType(),
1142 "Missing document payload",
1146 request.getOperation().getMetaData().getUrl()));
1150 // Make sure that the supplied document URL is formatted
1152 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1153 fails.add(generateRejectionEntry(request.getOperationType(),
1154 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1158 request.getOperation().getMetaData().getUrl()));
1162 // Validate that the specified index actually exists before we
1163 // try to perform the update.
1164 if (!indexExists(request.getIndex())) {
1166 fails.add(generateRejectionEntry(request.getOperationType(),
1167 "Specified resource does not exist: "
1168 + request.getOperation().getMetaData().getUrl(),
1172 request.getOperation().getMetaData().getUrl()));
1176 // Validate that the document we are trying to update actually
1177 // exists before we try to perform the update.
1178 if (!documentExists(request.getIndex(), request.getId())) {
1180 fails.add(generateRejectionEntry(request.getOperationType(),
1181 "Specified resource does not exist: "
1182 + request.getOperation().getMetaData().getUrl(),
1186 request.getOperation().getMetaData().getUrl()));
1190 // It is mandatory that a version be supplied for an update operation,
1191 // so validate that now.
1192 if (request.getOperation().getMetaData().getEtag() == null) {
1194 fails.add(generateRejectionEntry(request.getOperationType(),
1195 "Missing mandatory ETag field",
1199 request.getOperation().getMetaData().getUrl()));
1203 // Generate the update request...
1204 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1208 request.getOperation().getMetaData().getEtag()));
1210 // ...and append the document that we want to update.
1212 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1213 } catch (JsonProcessingException e) {
1214 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1218 // Delete an existing document.
1221 // Make sure that the supplied document URL is formatted
1223 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1224 fails.add(generateRejectionEntry(request.getOperationType(),
1225 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1229 request.getOperation().getMetaData().getUrl()));
1233 // Validate that the specified index actually exists before we
1234 // try to perform the delete.
1235 if (!indexExists(request.getIndex())) {
1237 fails.add(generateRejectionEntry(request.getOperationType(),
1238 "Specified resource does not exist: "
1239 + request.getOperation().getMetaData().getUrl(),
1243 request.getOperation().getMetaData().getUrl()));
1247 // Validate that the document we are trying to update actually
1248 // exists before we try to perform the delete.
1249 if (!documentExists(request.getIndex(), request.getId())) {
1251 fails.add(generateRejectionEntry(request.getOperationType(),
1252 "Specified resource does not exist: "
1253 + request.getOperation().getMetaData().getUrl(),
1257 request.getOperation().getMetaData().getUrl()));
1261 // It is mandatory that a version be supplied for a delete operation,
1262 // so validate that now.
1263 if (request.getOperation().getMetaData().getEtag() == null) {
1265 fails.add(generateRejectionEntry(request.getOperationType(),
1266 "Missing mandatory ETag field",
1270 request.getOperation().getMetaData().getUrl()));
1274 // Generate the delete request.
1275 sb.append(String.format(BULK_DELETE_TEMPLATE,
1279 request.getOperation().getMetaData().getEtag()));
1287 private boolean indexExists(String index) throws DocumentStoreOperationException {
1289 OperationResult indexExistsResult = checkIndexExistence(index);
1291 return ((indexExistsResult.getResultCode() >= 200)
1292 && (indexExistsResult.getResultCode() < 300));
1295 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1297 OperationResult docExistsResult = checkDocumentExistence(index, id);
1299 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1303 * This method constructs a status entry for a bulk operation which has
1304 * been rejected before even sending it to the document store.
1306 * @param rejectReason - A message describing why the operation was rejected.
1307 * @param anId - The identifier associated with the document being
1309 * @param statusCode - An HTTP status code.
1310 * @return - A result set item.
1312 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1313 String rejectReason,
1317 String originalUrl) {
1319 ElasticSearchError err = new ElasticSearchError();
1320 err.setReason(rejectReason);
1322 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1325 op.setStatus(statusCode);
1327 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1329 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1333 rejectionResult.setCreate(op);
1336 rejectionResult.setIndex(op);
1339 rejectionResult.setDelete(op);
1344 return rejectionResult;
1349 * This method takes the json structure returned from ElasticSearch in
1350 * response to a bulk operations request and marshals it into a Java
1353 * @param jsonResult - The bulk operations response returned from
1355 * @return - The marshalled response.
1356 * @throws JsonParseException
1357 * @throws JsonMappingException
1358 * @throws IOException
1360 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1361 throws JsonParseException, JsonMappingException, IOException {
1363 if (jsonResult != null) {
1364 if (logger.isDebugEnabled()) {
1365 logger.debug("ESController: Marshalling ES result set from json: "
1366 + jsonResult.replaceAll("\n", ""));
1369 ObjectMapper mapper = new ObjectMapper();
1370 mapper.setSerializationInclusion(Include.NON_EMPTY);
1372 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1380 * This method takes the marshalled ElasticSearch bulk response and
1381 * converts it into a generic response payload.
1383 * @param esResult - ElasticSearch bulk operations response.
1384 * @return - A generic result set.
1386 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1387 List<ElasticSearchResultItem> rejectedOps) {
1390 int totalSuccess = 0;
1393 if (logger.isDebugEnabled()) {
1395 logger.debug("ESController: Build generic result set. ES Results: "
1396 + ((esResult != null) ? esResult.toString() : "[]")
1397 + " Rejected Ops: " + rejectedOps.toString());
1400 // Build a combined list of result items from the results returned
1401 // from ElasticSearch and the list of operations that we rejected
1402 // without sending to ElasticSearch.
1403 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1404 if (esResult != null) {
1405 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1407 combinedResults.addAll(rejectedOps);
1409 // Iterate over the individual results in the resulting result set.
1410 StringBuilder resultsBuilder = new StringBuilder();
1411 AtomicBoolean firstItem = new AtomicBoolean(true);
1412 for (ElasticSearchResultItem item : combinedResults) {
1414 // Increment the operation counts.
1416 if (isSuccessCode(item.operationStatus().getStatus())) {
1422 // Prepend a comma to our response string unless this it the
1423 // first result in the set.
1424 if (!firstItem.compareAndSet(true, false)) {
1425 resultsBuilder.append(", ");
1428 // Append the current result as a generic json structure.
1429 resultsBuilder.append(item.toJson());
1432 // Now, build the result string and return it.
1433 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1434 + "\"total_success\": " + totalSuccess + ", "
1435 + "\"total_fails\": " + totalFails + ", "
1437 + resultsBuilder.toString()
1440 return responseBody;
1445 * This method queryies ElasticSearch to determine if the supplied
1446 * index is present in the document store.
1448 * @param indexName - The index to look for.
1449 * @return - An operation result indicating the success or failure of
1451 * @throws DocumentStoreOperationException
1453 public OperationResult checkIndexExistence(String indexName)
1454 throws DocumentStoreOperationException {
1456 // Initialize operation result with a failure codes / fault string
1457 OperationResult opResult = new OperationResult();
1458 opResult.setResultCode(500);
1460 // Grab the current time so we can use it to generate a metrics log.
1461 MdcOverride override = getStartTime(new MdcOverride());
1463 String fullUrl = getFullUrl("/" + indexName, false);
1464 HttpURLConnection conn = initializeConnection(fullUrl);
1467 conn.setRequestMethod("HEAD");
1469 } catch (ProtocolException e) {
1470 shutdownConnection(conn);
1471 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1474 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1478 resultCode = conn.getResponseCode();
1479 } catch (IOException e) {
1480 shutdownConnection(conn);
1481 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1483 logger.debug("Response Code : " + resultCode);
1485 opResult.setResultCode(resultCode);
1487 // Generate a metrics log so we can track how long the operation took.
1488 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1490 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1491 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1495 shutdownConnection(conn);
1501 private void buildDocumentResult(DocumentOperationResult result, String index)
1502 throws DocumentStoreOperationException {
1504 JSONParser parser = new JSONParser();
1507 root = (JSONObject) parser.parse(result.getResult());
1509 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1510 // Success response object
1511 Document doc = new Document();
1512 doc.setEtag(result.getResultVersion());
1513 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1515 doc.setContent((JSONObject) root.get("_source"));
1516 result.setDocument(doc);
1519 // Error response object
1520 JSONObject error = (JSONObject) root.get("error");
1521 if (error != null) {
1522 result.setError(new ErrorResult(error.get("type").toString(),
1523 error.get("reason").toString()));
1527 } catch (Exception e) {
1528 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1529 + result.getResult());
1535 private String buildDocumentResponseUrl(String index, String id) {
1536 return ApiUtils.buildDocumentUri(index, id);
1539 private void buildSearchResult(SearchOperationResult result, String index)
1540 throws DocumentStoreOperationException {
1542 JSONParser parser = new JSONParser();
1546 root = (JSONObject) parser.parse(result.getResult());
1547 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1548 JSONObject hits = (JSONObject) root.get("hits");
1549 JSONArray hitArray = (JSONArray) hits.get("hits");
1550 SearchHits searchHits = new SearchHits();
1551 searchHits.setTotalHits(hits.get("total").toString());
1552 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1554 for (int i = 0; i < hitArray.size(); i++) {
1555 JSONObject hit = (JSONObject) hitArray.get(i);
1556 SearchHit searchHit = new SearchHit();
1557 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1558 Document doc = new Document();
1559 if (hit.get("_version") != null) {
1560 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1563 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1564 ? hit.get("_id").toString() : ""));
1565 doc.setContent((JSONObject) hit.get("_source"));
1566 searchHit.setDocument(doc);
1567 searchHitArray.add(searchHit);
1569 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1570 result.setSearchResult(searchHits);
1572 JSONObject aggregations = (JSONObject) root.get("aggregations");
1573 if (aggregations != null) {
1574 AggregationResult[] aggResults =
1575 AggregationParsingUtil.parseAggregationResults(aggregations);
1576 AggregationResults aggs = new AggregationResults();
1577 aggs.setAggregations(aggResults);
1578 result.setAggregationResult(aggs);
1583 JSONObject error = (JSONObject) root.get("error");
1584 if (error != null) {
1585 result.setError(new ErrorResult(error.get("type").toString(),
1586 error.get("reason").toString()));
1589 } catch (Exception e) {
1590 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1591 + result.getResult());