2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License ati
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.openecomp.sa.searchdbabstraction.elasticsearch.dao;
27 import com.att.aft.dme2.internal.google.common.base.Throwables;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import edu.emory.mathcs.backport.java.util.Arrays;
34 import org.json.simple.JSONArray;
35 import org.json.simple.JSONObject;
36 import org.json.simple.parser.JSONParser;
37 import org.json.simple.parser.ParseException;
38 import org.openecomp.cl.api.LogFields;
39 import org.openecomp.cl.api.LogLine;
40 import org.openecomp.cl.api.Logger;
41 import org.openecomp.cl.eelf.LoggerFactory;
42 import org.openecomp.cl.mdc.MdcContext;
43 import org.openecomp.cl.mdc.MdcOverride;
44 import org.openecomp.sa.rest.AnalysisConfiguration;
45 import org.openecomp.sa.rest.ApiUtils;
46 import org.openecomp.sa.rest.BulkRequest;
47 import org.openecomp.sa.rest.BulkRequest.OperationType;
48 import org.openecomp.sa.rest.DocumentSchema;
49 import org.openecomp.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
50 import org.openecomp.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
51 import org.openecomp.sa.searchdbabstraction.entity.AggregationResult;
52 import org.openecomp.sa.searchdbabstraction.entity.AggregationResults;
53 import org.openecomp.sa.searchdbabstraction.entity.Document;
54 import org.openecomp.sa.searchdbabstraction.entity.DocumentOperationResult;
55 import org.openecomp.sa.searchdbabstraction.entity.ErrorResult;
56 import org.openecomp.sa.searchdbabstraction.entity.OperationResult;
57 import org.openecomp.sa.searchdbabstraction.entity.SearchHit;
58 import org.openecomp.sa.searchdbabstraction.entity.SearchHits;
59 import org.openecomp.sa.searchdbabstraction.entity.SearchOperationResult;
60 import org.openecomp.sa.searchdbabstraction.logging.SearchDbMsgs;
61 import org.openecomp.sa.searchdbabstraction.util.AggregationParsingUtil;
62 import org.openecomp.sa.searchdbabstraction.util.DocumentSchemaUtil;
63 import org.openecomp.sa.searchdbabstraction.util.SearchDbConstants;
65 import java.io.BufferedReader;
67 import java.io.FileInputStream;
68 import java.io.IOException;
69 import java.io.InputStream;
70 import java.io.InputStreamReader;
71 import java.io.OutputStream;
72 import java.io.OutputStreamWriter;
73 import java.io.PrintWriter;
74 import java.io.StringWriter;
75 import java.net.HttpURLConnection;
76 import java.net.MalformedURLException;
77 import java.net.ProtocolException;
79 import java.text.SimpleDateFormat;
80 import java.util.ArrayList;
81 import java.util.List;
82 import java.util.Properties;
83 import java.util.concurrent.atomic.AtomicBoolean;
84 import javax.ws.rs.core.Response.Status;
88 * This class has the Elasticsearch implementation of the
89 * DB operations defined in DocumentStoreInterface.
91 public class ElasticSearchHttpController implements DocumentStoreInterface {
93 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
94 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
95 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
96 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
97 private static final String BULK_IMPORT_INDEX_TEMPLATE =
98 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
99 private static final String BULK_DELETE_TEMPLATE =
100 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
102 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
103 "Internal Error: ElasticSearch operation fault occurred";
104 private static final Logger logger = LoggerFactory.getInstance()
105 .getLogger(ElasticSearchHttpController.class.getName());
106 private static final Logger metricsLogger = LoggerFactory.getInstance()
107 .getMetricsLogger(ElasticSearchHttpController.class.getName());
108 private final ElasticSearchConfig config;
110 private static final String DEFAULT_TYPE = "default";
112 private static ElasticSearchHttpController instance = null;
114 protected AnalysisConfiguration analysisConfig;
116 public static ElasticSearchHttpController getInstance() {
118 synchronized (ElasticSearchHttpController.class) {
120 if (instance == null) {
122 Properties properties = new Properties();
123 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
125 properties.load(new FileInputStream(file));
126 } catch (Exception e) {
127 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
128 "ElasticSearchHTTPController.getInstance",
129 e.getLocalizedMessage());
132 ElasticSearchConfig config = new ElasticSearchConfig(properties);
133 instance = new ElasticSearchHttpController(config);
140 public ElasticSearchHttpController(ElasticSearchConfig config) {
141 this.config = config;
142 analysisConfig = new AnalysisConfiguration();
145 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
147 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
148 } catch (Exception e) {
149 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
150 getFullUrl("", false), e.getMessage());
155 public AnalysisConfiguration getAnalysisConfig() {
156 return analysisConfig;
160 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
162 OperationResult result = new OperationResult();
163 result.setResultCode(500);
167 // Submit the request to ElasticSearch to create the index using a
168 // default document type.
169 result = createTable(index,
171 analysisConfig.getEsIndexSettings(),
172 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
174 // ElasticSearch will return us a 200 code on success when we
175 // want to report a 201, so translate the result here.
176 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
177 if (isSuccess(result)) {
178 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
179 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
182 } catch (DocumentStoreOperationException e) {
184 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
192 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
194 //Initialize operation result with a failure codes / fault string
195 OperationResult opResult = new OperationResult();
196 opResult.setResultCode(500);
197 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
199 // Grab the current time so we can use it to generate a metrics log.
200 MdcOverride override = getStartTime(new MdcOverride());
202 String fullUrl = getFullUrl("/" + indexName + "/", false);
203 HttpURLConnection conn = initializeConnection(fullUrl);
205 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
208 conn.setRequestMethod("DELETE");
209 } catch (ProtocolException e) {
210 shutdownConnection(conn);
211 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
214 handleResponse(conn, opResult);
216 // Generate a metrics log so we can track how long the operation took.
217 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
219 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
220 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
224 shutdownConnection(conn);
230 private OperationResult checkConnection() throws Exception {
232 String fullUrl = getFullUrl("/_cluster/health", false);
234 HttpURLConnection conn = null;
236 url = new URL(fullUrl);
237 conn = (HttpURLConnection) url.openConnection();
238 conn.setRequestMethod("GET");
239 conn.setDoOutput(true);
240 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
242 int resultCode = conn.getResponseCode();
243 logger.debug("getClusterHealth() response Code : " + resultCode);
244 OperationResult opResult = new OperationResult();
245 opResult.setResultCode(resultCode);
247 shutdownConnection(conn);
252 private String getFullUrl(String resourceUrl, boolean isSecure) {
254 final String host = config.getIpAddress();
255 final String port = config.getHttpPort();
258 return String.format("https://%s:%s%s", host, port, resourceUrl);
260 return String.format("http://%s:%s%s", host, port, resourceUrl);
264 private void shutdownConnection(HttpURLConnection connection) {
265 if (connection == null) {
269 InputStream inputstream = null;
270 OutputStream outputstream = null;
273 inputstream = connection.getInputStream();
274 } catch (IOException e) {
275 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
277 if (inputstream != null) {
280 } catch (IOException e) {
281 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
282 e.getLocalizedMessage());
288 outputstream = connection.getOutputStream();
289 } catch (IOException e) {
290 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
292 if (outputstream != null) {
294 outputstream.close();
295 } catch (IOException e) {
296 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
297 e.getLocalizedMessage());
302 connection.disconnect();
306 protected OperationResult createTable(String indexName, String typeName,
307 String indexSettings, String indexMappings)
308 throws DocumentStoreOperationException {
310 if (indexSettings == null) {
311 logger.debug("No settings provided.");
314 if (indexMappings == null) {
315 logger.debug("No mappings provided.");
318 OperationResult opResult = new OperationResult();
320 // Initialize operation result with a failure codes / fault string
321 opResult.setResultCode(500);
322 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
324 // Grab the current time so we can use it to generate a metrics log.
325 MdcOverride override = getStartTime(new MdcOverride());
327 String fullUrl = getFullUrl("/" + indexName + "/", false);
328 HttpURLConnection conn = initializeConnection(fullUrl);
331 conn.setRequestMethod("PUT");
332 } catch (ProtocolException e) {
333 shutdownConnection(conn);
334 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
337 StringBuilder sb = new StringBuilder(128);
338 sb.append("{ \"settings\" : ");
339 sb.append(indexSettings);
342 sb.append("\"mappings\" : {");
343 sb.append("\"" + typeName + "\" :");
344 sb.append(indexMappings);
347 attachContent(conn, sb.toString());
349 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
350 logger.debug("Request content: " + sb.toString());
352 handleResponse(conn, opResult);
354 shutdownConnection(conn);
356 // Generate a metrics log so we can track how long the operation took.
357 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
359 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
360 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
368 public DocumentOperationResult createDocument(String indexName,
369 DocumentStoreDataEntity document,
370 boolean allowImplicitIndexCreation)
371 throws DocumentStoreOperationException {
373 if(!allowImplicitIndexCreation) {
375 // Before we do anything, make sure that the specified index actually exists in the
376 // document store - we don't want to rely on ElasticSearch to fail the document
377 // create because it could be configured to implicitly create a non-existent index,
378 // which can lead to hard-to-debug behaviour with queries down the road.
379 OperationResult indexExistsResult = checkIndexExistence(indexName);
380 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
382 DocumentOperationResult opResult = new DocumentOperationResult();
383 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
384 opResult.setResult("Document Index '" + indexName + "' does not exist.");
385 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
390 if (document.getId() == null || document.getId().isEmpty()) {
391 return createDocumentWithoutId(indexName, document);
393 return createDocumentWithId(indexName, document);
397 private DocumentOperationResult createDocumentWithId(String indexName,
398 DocumentStoreDataEntity document)
399 throws DocumentStoreOperationException {
400 // check if the document already exists
401 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
404 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
405 if (opResult.getResultCode() == Status.OK.getStatusCode()) {
406 opResult.setFailureCause("A document with the same id already exists.");
408 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
410 opResult.setResultCode(Status.CONFLICT.getStatusCode());
414 opResult = new DocumentOperationResult();
415 // Initialize operation result with a failure codes / fault string
416 opResult.setResultCode(500);
417 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
419 // Grab the current time so we can use it to generate a metrics log.
420 MdcOverride override = getStartTime(new MdcOverride());
422 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
423 + "/" + document.getId(), false);
424 HttpURLConnection conn = initializeConnection(fullUrl);
427 conn.setRequestMethod("PUT");
428 } catch (ProtocolException e) {
429 shutdownConnection(conn);
430 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
433 attachDocument(conn, document);
435 logger.debug("Sending 'PUT' request to: " + conn.getURL());
437 handleResponse(conn, opResult);
438 buildDocumentResult(opResult, indexName);
440 // Generate a metrics log so we can track how long the operation took.
441 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
443 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
444 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
448 shutdownConnection(conn);
454 private DocumentOperationResult createDocumentWithoutId(String indexName,
455 DocumentStoreDataEntity document)
456 throws DocumentStoreOperationException {
458 DocumentOperationResult response = new DocumentOperationResult();
459 // Initialize operation result with a failure codes / fault string
460 response.setResultCode(500);
461 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
463 // Grab the current time so we can use it to generate a metrics log.
464 MdcOverride override = getStartTime(new MdcOverride());
466 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
467 HttpURLConnection conn = initializeConnection(fullUrl);
470 conn.setRequestMethod("POST");
471 } catch (ProtocolException e) {
472 shutdownConnection(conn);
473 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
476 attachDocument(conn, document);
478 logger.debug("Sending 'POST' request to: " + conn.getURL());
480 handleResponse(conn, response);
481 buildDocumentResult(response, indexName);
483 // Generate a metrics log so we can track how long the operation took.
484 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
486 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
487 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
491 shutdownConnection(conn);
496 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
497 throws DocumentStoreOperationException {
498 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
499 conn.setRequestProperty("Connection", "Close");
501 attachContent(conn, doc.getContentInJson());
504 private DocumentOperationResult checkDocumentExistence(String indexName,
506 throws DocumentStoreOperationException {
507 DocumentOperationResult opResult = new DocumentOperationResult();
509 // Initialize operation result with a failure codes / fault string
510 opResult.setResultCode(500);
512 // Grab the current time so we can use it to generate a metrics log.
513 MdcOverride override = getStartTime(new MdcOverride());
515 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
516 HttpURLConnection conn = initializeConnection(fullUrl);
519 conn.setRequestMethod("HEAD");
520 } catch (ProtocolException e) {
521 shutdownConnection(conn);
522 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
525 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
529 resultCode = conn.getResponseCode();
530 } catch (IOException e) {
531 shutdownConnection(conn);
532 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
535 logger.debug("Response Code : " + resultCode);
537 opResult.setResultCode(resultCode);
539 // Generate a metrics log so we can track how long the operation took.
540 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
542 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
543 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
548 shutdownConnection(conn);
554 public DocumentOperationResult updateDocument(String indexName,
555 DocumentStoreDataEntity document,
556 boolean allowImplicitIndexCreation)
557 throws DocumentStoreOperationException {
559 if(!allowImplicitIndexCreation) {
561 // Before we do anything, make sure that the specified index actually exists in the
562 // document store - we don't want to rely on ElasticSearch to fail the document
563 // create because it could be configured to implicitly create a non-existent index,
564 // which can lead to hard-to-debug behaviour with queries down the road.
565 OperationResult indexExistsResult = checkIndexExistence(indexName);
566 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
568 DocumentOperationResult opResult = new DocumentOperationResult();
569 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
570 opResult.setResult("Document Index '" + indexName + "' does not exist.");
571 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
576 DocumentOperationResult opResult = new DocumentOperationResult();
578 // Initialize operation result with a failure codes / fault string
579 opResult.setResultCode(500);
580 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
582 // Grab the current time so we can use it to generate a metrics log.
583 MdcOverride override = getStartTime(new MdcOverride());
585 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
586 + "?version=" + document.getVersion(), false);
587 HttpURLConnection conn = initializeConnection(fullUrl);
590 conn.setRequestMethod("PUT");
591 } catch (ProtocolException e) {
592 shutdownConnection(conn);
593 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
596 attachDocument(conn, document);
598 logger.debug("Sending 'PUT' request to: " + conn.getURL());
600 handleResponse(conn, opResult);
601 buildDocumentResult(opResult, indexName);
603 // Generate a metrics log so we can track how long the operation took.
604 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
606 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
607 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
612 shutdownConnection(conn);
618 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
619 throws DocumentStoreOperationException {
620 DocumentOperationResult opResult = new DocumentOperationResult();
622 // Initialize operation result with a failure codes / fault string
623 opResult.setResultCode(500);
624 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
626 // Grab the current time so we can use it to generate a metrics log.
627 MdcOverride override = getStartTime(new MdcOverride());
629 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
630 + "?version=" + document.getVersion(), false);
631 HttpURLConnection conn = initializeConnection(fullUrl);
634 conn.setRequestMethod("DELETE");
635 } catch (ProtocolException e) {
636 shutdownConnection(conn);
637 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
640 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
642 handleResponse(conn, opResult);
643 buildDocumentResult(opResult, indexName);
644 //supress the etag and url in response for delete as they are not required
645 if (opResult.getDocument() != null) {
646 opResult.getDocument().setEtag(null);
647 opResult.getDocument().setUrl(null);
650 // Generate a metrics log so we can track how long the operation took.
651 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
653 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
654 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
659 shutdownConnection(conn);
665 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
666 throws DocumentStoreOperationException {
667 DocumentOperationResult opResult = new DocumentOperationResult();
669 // Initialize operation result with a failure codes / fault string
670 opResult.setResultCode(500);
671 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
673 // Grab the current time so we can use it to generate a metrics log.
674 MdcOverride override = getStartTime(new MdcOverride());
676 String fullUrl = null;
677 if (document.getVersion() == null) {
678 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
680 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
681 + "?version=" + document.getVersion(), false);
683 HttpURLConnection conn = initializeConnection(fullUrl);
685 logger.debug("\nSending 'GET' request to: " + conn.getURL());
687 handleResponse(conn, opResult);
688 buildDocumentResult(opResult, indexName);
690 // Generate a metrics log so we can track how long the operation took.
691 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
693 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
694 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
699 shutdownConnection(conn);
704 public SearchOperationResult search(String indexName, String queryString)
705 throws DocumentStoreOperationException {
706 SearchOperationResult opResult = new SearchOperationResult();
708 // Initialize operation result with a failure codes / fault string
709 opResult.setResultCode(500);
710 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
712 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
714 // Grab the current time so we can use it to generate a metrics log.
715 MdcOverride override = getStartTime(new MdcOverride());
717 HttpURLConnection conn = initializeConnection(fullUrl);
720 conn.setRequestMethod("GET");
721 } catch (ProtocolException e) {
722 shutdownConnection(conn);
723 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
726 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
728 handleResponse(conn, opResult);
729 buildSearchResult(opResult, indexName);
732 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
734 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
735 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
743 public SearchOperationResult searchWithPayload(String indexName, String query)
744 throws DocumentStoreOperationException {
745 SearchOperationResult opResult = new SearchOperationResult();
747 if (logger.isDebugEnabled()) {
748 logger.debug("Querying index: " + indexName + " with query string: " + query);
751 // Initialize operation result with a failure codes / fault string
752 opResult.setResultCode(500);
753 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
755 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
757 // Grab the current time so we can use it to generate a metrics log.
758 MdcOverride override = getStartTime(new MdcOverride());
760 HttpURLConnection conn = initializeConnection(fullUrl);
763 conn.setRequestMethod("POST");
764 } catch (ProtocolException e) {
765 shutdownConnection(conn);
766 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
769 attachContent(conn, query);
771 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
772 logger.debug("Request body = Elasticsearch query = " + query);
774 handleResponse(conn, opResult);
775 buildSearchResult(opResult, indexName);
777 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
779 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
780 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
785 shutdownConnection(conn);
790 private void attachContent(HttpURLConnection conn, String content)
791 throws DocumentStoreOperationException {
792 OutputStream outputStream = null;
793 OutputStreamWriter out = null;
796 outputStream = conn.getOutputStream();
797 } catch (IOException e) {
798 shutdownConnection(conn);
799 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
802 out = new OutputStreamWriter(outputStream);
807 } catch (IOException e) {
808 shutdownConnection(conn);
809 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
813 private HttpURLConnection initializeConnection(String fullUrl)
814 throws DocumentStoreOperationException {
816 HttpURLConnection conn = null;
819 url = new URL(fullUrl);
820 } catch (MalformedURLException e) {
821 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
825 conn = (HttpURLConnection) url.openConnection();
826 conn.setDoOutput(true);
827 } catch (IOException e) {
828 shutdownConnection(conn);
829 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
835 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
836 throws DocumentStoreOperationException {
837 int resultCode = 200;
840 resultCode = conn.getResponseCode();
841 } catch (IOException e) {
842 shutdownConnection(conn);
843 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
846 logger.debug("Response Code : " + resultCode);
848 InputStream inputStream = null;
850 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
851 inputStream = conn.getErrorStream();
854 inputStream = conn.getInputStream();
855 } catch (IOException e) {
856 shutdownConnection(conn);
857 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
861 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
862 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
864 StringBuilder result = new StringBuilder(128);
865 String string = null;
868 while ((string = bufferedreader.readLine()) != null) {
869 result.append(string).append("\n");
871 } catch (IOException e) {
872 shutdownConnection(conn);
873 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
876 if (resultCode == Status.CONFLICT.getStatusCode()) {
877 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
879 opResult.setResultCode(resultCode);
881 if (logger.isDebugEnabled()) {
882 logger.debug("Raw result string from ElasticSearch = " + result.toString());
884 opResult.setResult(result.toString());
885 opResult.setResultVersion(extractVersion(result.toString()));
888 private String extractVersion(String result) throws DocumentStoreOperationException {
890 JSONParser parser = new JSONParser();
891 String version = null;
893 JSONObject root = (JSONObject) parser.parse(result);
894 if (root.get("_version") != null) {
895 version = root.get("_version").toString();
898 } catch (ParseException e) {
900 // Not all responses from ElasticSearch include a version, so
901 // if we don't get one back, just return an empty string rather
902 // than trigger a false failure.
909 * This convenience method gets the current system time and stores
910 * it in an attribute in the supplied {@link MdcOverride} object so
911 * that it can be used later by the metrics logger.
913 * @param override - The {@link MdcOverride} object to update.
914 * @return - The supplied {@link MdcOverride} object.
916 private MdcOverride getStartTime(MdcOverride override) {
918 // Grab the current time...
919 long startTimeInMs = System.currentTimeMillis();
921 // ...and add it as an attribute to the supplied MDC Override
923 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
924 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
926 // Return the MdcOverride object that we were passed.
927 // This looks odd, but it allows us to do stuff like:
929 // MdcOverride ov = getStartTime(new MdcOverride())
931 // which is quite handy, but also allows us to pass in an existing
932 // MdcOverride object which already has some attributes set.
936 private boolean isSuccess(OperationResult result) {
938 return isSuccessCode(result.getResultCode());
942 private boolean isSuccessCode(int statusCode) {
943 return ((statusCode >= 200) && (statusCode < 300));
948 public OperationResult performBulkOperations(BulkRequest[] requests)
949 throws DocumentStoreOperationException {
951 if (logger.isDebugEnabled()) {
952 String dbgString = "ESController: performBulkOperations - Operations: ";
954 for (BulkRequest request : requests) {
955 dbgString += "[" + request.toString() + "] ";
958 logger.debug(dbgString);
961 // Grab the current time so we can use it to generate a metrics log.
962 MdcOverride override = getStartTime(new MdcOverride());
964 // Parse the supplied set of operations.
965 // Iterate over the list of operations which we were provided and
966 // translate them into a format that ElasticSearh understands.
968 StringBuilder esOperationSet = new StringBuilder(128);
969 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
970 for (BulkRequest request : requests) {
972 // Convert the request to the syntax ElasticSearch likes.
973 if (buildEsOperation(request, esOperationSet, rejected)) {
978 ElasticSearchBulkOperationResult opResult = null;
981 // Open an HTTP connection to the ElasticSearch back end.
982 String fullUrl = getFullUrl("/_bulk", false);
984 HttpURLConnection conn;
987 url = new URL(fullUrl);
988 conn = (HttpURLConnection) url.openConnection();
989 conn.setRequestMethod("PUT");
990 conn.setDoOutput(true);
991 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
992 conn.setRequestProperty("Connection", "Close");
994 } catch (IOException e) {
996 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
997 if (logger.isDebugEnabled()) {
998 logger.debug(Throwables.getStackTraceAsString(e));
1001 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
1002 + e.getMessage(), e);
1005 StringBuilder bulkResult = new StringBuilder(128);
1007 // Create an output stream to write our request to.
1008 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1011 if (logger.isDebugEnabled()) {
1012 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1013 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1017 // Write the resulting request string to our output stream. (this sends the request to ES?)
1018 out.write(esOperationSet.toString());
1021 // Open an input stream on our connection in order to read back the results.
1022 InputStream is = conn.getInputStream();
1023 InputStreamReader inputstreamreader = new InputStreamReader(is);
1024 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1026 // Read the contents of the input stream into our result string...
1027 String esResponseString = null;
1029 while ((esResponseString = bufferedreader.readLine()) != null) {
1030 bulkResult.append(esResponseString).append("\n");
1033 } catch (IOException e) {
1035 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1036 if (logger.isDebugEnabled()) {
1037 StringWriter sw = new StringWriter();
1038 e.printStackTrace(new PrintWriter(sw));
1039 logger.debug(sw.toString());
1042 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1043 + e.getMessage(), e);
1046 if (logger.isDebugEnabled()) {
1047 logger.debug("ESController: Received result string from ElasticSearch: = "
1048 + bulkResult.toString());
1051 // ...and marshal the resulting string into a Java object.
1053 opResult = marshallEsBulkResult(bulkResult.toString());
1055 } catch (IOException e) {
1057 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1058 if (logger.isDebugEnabled()) {
1059 logger.debug(Throwables.getStackTraceAsString(e));
1062 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1063 + e.getMessage(), e);
1067 // Finally, build the operation result and return it to the caller.
1068 OperationResult result = new OperationResult();
1069 result.setResultCode(207);
1070 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1072 // In the success case we don't want the entire result string to be
1073 // dumped into the metrics log, so concatenate it.
1074 String resultStringForMetricsLog = result.getResult();
1075 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1076 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1077 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1080 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1082 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1083 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1091 * This method converts a {@link BulkRequest} object into a json structure
1092 * which can be understood by ElasticSearch.
1094 * @param request - The request to be performed.
1095 * @param sb - The string builder to append the json data to
1096 * @throws DocumentStoreOperationException
1098 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1099 List<ElasticSearchResultItem> fails)
1100 throws DocumentStoreOperationException {
1102 boolean retVal = true;
1103 OperationResult indexExistsResult = null;
1105 // What kind of operation are we performing?
1106 switch (request.getOperationType()) {
1108 // Create a new document.
1111 // Make sure that we were supplied a document payload.
1112 if (request.getOperation().getDocument() == null) {
1114 fails.add(generateRejectionEntry(request.getOperationType(),
1115 "Missing document payload",
1119 request.getOperation().getMetaData().getUrl()));
1123 // Make sure that the supplied document URL is formatted
1125 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1126 fails.add(generateRejectionEntry(request.getOperationType(),
1127 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1131 request.getOperation().getMetaData().getUrl()));
1135 // Validate that the specified index actually exists before we
1136 // try to perform the create.
1137 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1139 fails.add(generateRejectionEntry(request.getOperationType(),
1140 "Specified resource does not exist: "
1141 + request.getOperation().getMetaData().getUrl(),
1145 request.getOperation().getMetaData().getUrl()));
1149 // If we were supplied an id for the new document, then
1150 // include it in the bulk operation to Elastic Search
1151 if (request.getId() == null) {
1153 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1157 // Otherwise, we just leave that parameter off and ElasticSearch
1158 // will generate one for us.
1160 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1167 // Append the document that we want to create.
1168 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1169 } catch (JsonProcessingException e) {
1170 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1175 // Update an existing document.
1178 // Make sure that we were supplied a document payload.
1179 if (request.getOperation().getDocument() == null) {
1181 fails.add(generateRejectionEntry(request.getOperationType(),
1182 "Missing document payload",
1186 request.getOperation().getMetaData().getUrl()));
1190 // Make sure that the supplied document URL is formatted
1192 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1193 fails.add(generateRejectionEntry(request.getOperationType(),
1194 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1198 request.getOperation().getMetaData().getUrl()));
1202 // Validate that the specified index actually exists before we
1203 // try to perform the update.
1204 if (!indexExists(request.getIndex())) {
1206 fails.add(generateRejectionEntry(request.getOperationType(),
1207 "Specified resource does not exist: "
1208 + request.getOperation().getMetaData().getUrl(),
1212 request.getOperation().getMetaData().getUrl()));
1216 // Validate that the document we are trying to update actually
1217 // exists before we try to perform the update.
1218 if (!documentExists(request.getIndex(), request.getId())) {
1220 fails.add(generateRejectionEntry(request.getOperationType(),
1221 "Specified resource does not exist: "
1222 + request.getOperation().getMetaData().getUrl(),
1226 request.getOperation().getMetaData().getUrl()));
1230 // It is mandatory that a version be supplied for an update operation,
1231 // so validate that now.
1232 if (request.getOperation().getMetaData().getEtag() == null) {
1234 fails.add(generateRejectionEntry(request.getOperationType(),
1235 "Missing mandatory ETag field",
1239 request.getOperation().getMetaData().getUrl()));
1243 // Generate the update request...
1244 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1248 request.getOperation().getMetaData().getEtag()));
1250 // ...and append the document that we want to update.
1252 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1253 } catch (JsonProcessingException e) {
1254 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1258 // Delete an existing document.
1261 // Make sure that the supplied document URL is formatted
1263 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1264 fails.add(generateRejectionEntry(request.getOperationType(),
1265 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1269 request.getOperation().getMetaData().getUrl()));
1273 // Validate that the specified index actually exists before we
1274 // try to perform the delete.
1275 if (!indexExists(request.getIndex())) {
1277 fails.add(generateRejectionEntry(request.getOperationType(),
1278 "Specified resource does not exist: "
1279 + request.getOperation().getMetaData().getUrl(),
1283 request.getOperation().getMetaData().getUrl()));
1287 // Validate that the document we are trying to update actually
1288 // exists before we try to perform the delete.
1289 if (!documentExists(request.getIndex(), request.getId())) {
1291 fails.add(generateRejectionEntry(request.getOperationType(),
1292 "Specified resource does not exist: "
1293 + request.getOperation().getMetaData().getUrl(),
1297 request.getOperation().getMetaData().getUrl()));
1301 // It is mandatory that a version be supplied for a delete operation,
1302 // so validate that now.
1303 if (request.getOperation().getMetaData().getEtag() == null) {
1305 fails.add(generateRejectionEntry(request.getOperationType(),
1306 "Missing mandatory ETag field",
1310 request.getOperation().getMetaData().getUrl()));
1314 // Generate the delete request.
1315 sb.append(String.format(BULK_DELETE_TEMPLATE,
1319 request.getOperation().getMetaData().getEtag()));
1327 private boolean indexExists(String index) throws DocumentStoreOperationException {
1329 OperationResult indexExistsResult = checkIndexExistence(index);
1331 return ((indexExistsResult.getResultCode() >= 200)
1332 && (indexExistsResult.getResultCode() < 300));
1335 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1337 OperationResult docExistsResult = checkDocumentExistence(index, id);
1339 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1343 * This method constructs a status entry for a bulk operation which has
1344 * been rejected before even sending it to the document store.
1346 * @param rejectReason - A message describing why the operation was rejected.
1347 * @param anId - The identifier associated with the document being
1349 * @param statusCode - An HTTP status code.
1350 * @return - A result set item.
1352 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1353 String rejectReason,
1357 String originalUrl) {
1359 ElasticSearchError err = new ElasticSearchError();
1360 err.setReason(rejectReason);
1362 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1365 op.setStatus(statusCode);
1367 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1369 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1373 rejectionResult.setCreate(op);
1376 rejectionResult.setIndex(op);
1379 rejectionResult.setDelete(op);
1384 return rejectionResult;
1389 * This method takes the json structure returned from ElasticSearch in
1390 * response to a bulk operations request and marshals it into a Java
1393 * @param jsonResult - The bulk operations response returned from
1395 * @return - The marshalled response.
1396 * @throws JsonParseException
1397 * @throws JsonMappingException
1398 * @throws IOException
1400 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1401 throws JsonParseException, JsonMappingException, IOException {
1403 if (jsonResult != null) {
1404 if (logger.isDebugEnabled()) {
1405 logger.debug("ESController: Marshalling ES result set from json: "
1406 + jsonResult.replaceAll("\n", ""));
1409 ObjectMapper mapper = new ObjectMapper();
1410 mapper.setSerializationInclusion(Include.NON_EMPTY);
1412 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1420 * This method takes the marshalled ElasticSearch bulk response and
1421 * converts it into a generic response payload.
1423 * @param esResult - ElasticSearch bulk operations response.
1424 * @return - A generic result set.
1426 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1427 List<ElasticSearchResultItem> rejectedOps) {
1430 int totalSuccess = 0;
1433 if (logger.isDebugEnabled()) {
1435 logger.debug("ESController: Build generic result set. ES Results: "
1436 + ((esResult != null) ? esResult.toString() : "[]")
1437 + " Rejected Ops: " + rejectedOps.toString());
1440 // Build a combined list of result items from the results returned
1441 // from ElasticSearch and the list of operations that we rejected
1442 // without sending to ElasticSearch.
1443 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1444 if (esResult != null) {
1445 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1447 combinedResults.addAll(rejectedOps);
1449 // Iterate over the individual results in the resulting result set.
1450 StringBuilder resultsBuilder = new StringBuilder();
1451 AtomicBoolean firstItem = new AtomicBoolean(true);
1452 for (ElasticSearchResultItem item : combinedResults) {
1454 // Increment the operation counts.
1456 if (isSuccessCode(item.operationStatus().getStatus())) {
1462 // Prepend a comma to our response string unless this it the
1463 // first result in the set.
1464 if (!firstItem.compareAndSet(true, false)) {
1465 resultsBuilder.append(", ");
1468 // Append the current result as a generic json structure.
1469 resultsBuilder.append(item.toJson());
1472 // Now, build the result string and return it.
1473 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1474 + "\"total_success\": " + totalSuccess + ", "
1475 + "\"total_fails\": " + totalFails + ", "
1477 + resultsBuilder.toString()
1480 return responseBody;
1485 * This method queryies ElasticSearch to determine if the supplied
1486 * index is present in the document store.
1488 * @param indexName - The index to look for.
1489 * @return - An operation result indicating the success or failure of
1491 * @throws DocumentStoreOperationException
1493 public OperationResult checkIndexExistence(String indexName)
1494 throws DocumentStoreOperationException {
1496 // Initialize operation result with a failure codes / fault string
1497 OperationResult opResult = new OperationResult();
1498 opResult.setResultCode(500);
1500 // Grab the current time so we can use it to generate a metrics log.
1501 MdcOverride override = getStartTime(new MdcOverride());
1503 String fullUrl = getFullUrl("/" + indexName, false);
1504 HttpURLConnection conn = initializeConnection(fullUrl);
1507 conn.setRequestMethod("HEAD");
1509 } catch (ProtocolException e) {
1510 shutdownConnection(conn);
1511 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1514 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1518 resultCode = conn.getResponseCode();
1519 } catch (IOException e) {
1520 shutdownConnection(conn);
1521 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1523 logger.debug("Response Code : " + resultCode);
1525 opResult.setResultCode(resultCode);
1527 // Generate a metrics log so we can track how long the operation took.
1528 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1530 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1531 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1535 shutdownConnection(conn);
1541 private void buildDocumentResult(DocumentOperationResult result, String index)
1542 throws DocumentStoreOperationException {
1544 JSONParser parser = new JSONParser();
1547 root = (JSONObject) parser.parse(result.getResult());
1549 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1550 // Success response object
1551 Document doc = new Document();
1552 doc.setEtag(result.getResultVersion());
1553 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1555 doc.setContent((JSONObject) root.get("_source"));
1556 result.setDocument(doc);
1559 // Error response object
1560 JSONObject error = (JSONObject) root.get("error");
1561 if (error != null) {
1562 result.setError(new ErrorResult(error.get("type").toString(),
1563 error.get("reason").toString()));
1567 } catch (Exception e) {
1568 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1569 + result.getResult());
1575 private String buildDocumentResponseUrl(String index, String id) {
1576 return ApiUtils.buildDocumentUri(index, id);
1579 private void buildSearchResult(SearchOperationResult result, String index)
1580 throws DocumentStoreOperationException {
1582 JSONParser parser = new JSONParser();
1586 root = (JSONObject) parser.parse(result.getResult());
1587 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1588 JSONObject hits = (JSONObject) root.get("hits");
1589 JSONArray hitArray = (JSONArray) hits.get("hits");
1590 SearchHits searchHits = new SearchHits();
1591 searchHits.setTotalHits(hits.get("total").toString());
1592 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1594 for (int i = 0; i < hitArray.size(); i++) {
1595 JSONObject hit = (JSONObject) hitArray.get(i);
1596 SearchHit searchHit = new SearchHit();
1597 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1598 Document doc = new Document();
1599 if (hit.get("_version") != null) {
1600 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1603 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1604 ? hit.get("_id").toString() : ""));
1605 doc.setContent((JSONObject) hit.get("_source"));
1606 searchHit.setDocument(doc);
1607 searchHitArray.add(searchHit);
1609 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1610 result.setSearchResult(searchHits);
1612 JSONObject aggregations = (JSONObject) root.get("aggregations");
1613 if (aggregations != null) {
1614 AggregationResult[] aggResults =
1615 AggregationParsingUtil.parseAggregationResults(aggregations);
1616 AggregationResults aggs = new AggregationResults();
1617 aggs.setAggregations(aggResults);
1618 result.setAggregationResult(aggs);
1623 JSONObject error = (JSONObject) root.get("error");
1624 if (error != null) {
1625 result.setError(new ErrorResult(error.get("type").toString(),
1626 error.get("reason").toString()));
1629 } catch (Exception e) {
1630 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1631 + result.getResult());