2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23 import com.att.aft.dme2.internal.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import edu.emory.mathcs.backport.java.util.Arrays;
30 import org.json.simple.JSONArray;
31 import org.json.simple.JSONObject;
32 import org.json.simple.parser.JSONParser;
33 import org.json.simple.parser.ParseException;
34 import org.onap.aai.sa.rest.AnalysisConfiguration;
35 import org.onap.aai.sa.rest.ApiUtils;
36 import org.onap.aai.sa.rest.BulkRequest;
37 import org.onap.aai.sa.rest.BulkRequest.OperationType;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
39 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
41 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
42 import org.onap.aai.sa.searchdbabstraction.entity.Document;
43 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
48 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
49 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
50 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
52 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
53 import org.onap.aai.cl.api.LogFields;
54 import org.onap.aai.cl.api.LogLine;
55 import org.onap.aai.cl.api.Logger;
56 import org.onap.aai.cl.eelf.LoggerFactory;
57 import org.onap.aai.cl.mdc.MdcContext;
58 import org.onap.aai.cl.mdc.MdcOverride;
59 import org.onap.aai.sa.rest.DocumentSchema;
61 import java.io.BufferedReader;
63 import java.io.FileInputStream;
64 import java.io.IOException;
65 import java.io.InputStream;
66 import java.io.InputStreamReader;
67 import java.io.OutputStream;
68 import java.io.OutputStreamWriter;
69 import java.io.PrintWriter;
70 import java.io.StringWriter;
71 import java.net.HttpURLConnection;
72 import java.net.MalformedURLException;
73 import java.net.ProtocolException;
75 import java.text.SimpleDateFormat;
76 import java.util.ArrayList;
77 import java.util.List;
78 import java.util.Properties;
79 import java.util.concurrent.atomic.AtomicBoolean;
80 import javax.ws.rs.core.Response.Status;
84 * This class has the Elasticsearch implementation of the
85 * DB operations defined in DocumentStoreInterface.
87 public class ElasticSearchHttpController implements DocumentStoreInterface {
89 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
90 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
91 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
92 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
93 private static final String BULK_IMPORT_INDEX_TEMPLATE =
94 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
95 private static final String BULK_DELETE_TEMPLATE =
96 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
98 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
99 "Internal Error: ElasticSearch operation fault occurred";
100 private static final Logger logger = LoggerFactory.getInstance()
101 .getLogger(ElasticSearchHttpController.class.getName());
102 private static final Logger metricsLogger = LoggerFactory.getInstance()
103 .getMetricsLogger(ElasticSearchHttpController.class.getName());
104 private final ElasticSearchConfig config;
106 private static final String DEFAULT_TYPE = "default";
108 private static ElasticSearchHttpController instance = null;
110 protected AnalysisConfiguration analysisConfig;
112 public static ElasticSearchHttpController getInstance() {
114 synchronized (ElasticSearchHttpController.class) {
116 if (instance == null) {
118 Properties properties = new Properties();
119 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
121 properties.load(new FileInputStream(file));
122 } catch (Exception e) {
123 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
124 "ElasticSearchHTTPController.getInstance",
125 e.getLocalizedMessage());
128 ElasticSearchConfig config = new ElasticSearchConfig(properties);
129 instance = new ElasticSearchHttpController(config);
136 public ElasticSearchHttpController(ElasticSearchConfig config) {
137 this.config = config;
138 analysisConfig = new AnalysisConfiguration();
141 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
143 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
144 } catch (Exception e) {
145 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
146 getFullUrl("", false), e.getMessage());
151 public AnalysisConfiguration getAnalysisConfig() {
152 return analysisConfig;
156 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
158 OperationResult result = new OperationResult();
159 result.setResultCode(500);
163 // Submit the request to ElasticSearch to create the index using a
164 // default document type.
165 result = createTable(index,
167 analysisConfig.getEsIndexSettings(),
168 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
170 // ElasticSearch will return us a 200 code on success when we
171 // want to report a 201, so translate the result here.
172 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
173 if (isSuccess(result)) {
174 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
175 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
178 } catch (DocumentStoreOperationException e) {
180 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
188 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
190 //Initialize operation result with a failure codes / fault string
191 OperationResult opResult = new OperationResult();
192 opResult.setResultCode(500);
193 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
195 // Grab the current time so we can use it to generate a metrics log.
196 MdcOverride override = getStartTime(new MdcOverride());
198 String fullUrl = getFullUrl("/" + indexName + "/", false);
199 HttpURLConnection conn = initializeConnection(fullUrl);
201 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
204 conn.setRequestMethod("DELETE");
205 } catch (ProtocolException e) {
206 shutdownConnection(conn);
207 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
210 handleResponse(conn, opResult);
212 // Generate a metrics log so we can track how long the operation took.
213 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
215 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
216 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
220 shutdownConnection(conn);
226 private OperationResult checkConnection() throws Exception {
228 String fullUrl = getFullUrl("/_cluster/health", false);
230 HttpURLConnection conn = null;
232 url = new URL(fullUrl);
233 conn = (HttpURLConnection) url.openConnection();
234 conn.setRequestMethod("GET");
235 conn.setDoOutput(true);
236 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
238 int resultCode = conn.getResponseCode();
239 logger.debug("getClusterHealth() response Code : " + resultCode);
240 OperationResult opResult = new OperationResult();
241 opResult.setResultCode(resultCode);
243 shutdownConnection(conn);
248 private String getFullUrl(String resourceUrl, boolean isSecure) {
250 final String host = config.getIpAddress();
251 final String port = config.getHttpPort();
254 return String.format("https://%s:%s%s", host, port, resourceUrl);
256 return String.format("http://%s:%s%s", host, port, resourceUrl);
260 private void shutdownConnection(HttpURLConnection connection) {
261 if (connection == null) {
265 InputStream inputstream = null;
266 OutputStream outputstream = null;
269 inputstream = connection.getInputStream();
270 } catch (IOException e) {
271 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
273 if (inputstream != null) {
276 } catch (IOException e) {
277 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
278 e.getLocalizedMessage());
284 outputstream = connection.getOutputStream();
285 } catch (IOException e) {
286 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
288 if (outputstream != null) {
290 outputstream.close();
291 } catch (IOException e) {
292 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
293 e.getLocalizedMessage());
298 connection.disconnect();
302 protected OperationResult createTable(String indexName, String typeName,
303 String indexSettings, String indexMappings)
304 throws DocumentStoreOperationException {
306 if (indexSettings == null) {
307 logger.debug("No settings provided.");
310 if (indexMappings == null) {
311 logger.debug("No mappings provided.");
314 OperationResult opResult = new OperationResult();
316 // Initialize operation result with a failure codes / fault string
317 opResult.setResultCode(500);
318 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
320 // Grab the current time so we can use it to generate a metrics log.
321 MdcOverride override = getStartTime(new MdcOverride());
323 String fullUrl = getFullUrl("/" + indexName + "/", false);
324 HttpURLConnection conn = initializeConnection(fullUrl);
327 conn.setRequestMethod("PUT");
328 } catch (ProtocolException e) {
329 shutdownConnection(conn);
330 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
333 StringBuilder sb = new StringBuilder(128);
334 sb.append("{ \"settings\" : ");
335 sb.append(indexSettings);
338 sb.append("\"mappings\" : {");
339 sb.append("\"" + typeName + "\" :");
340 sb.append(indexMappings);
343 attachContent(conn, sb.toString());
345 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
346 logger.debug("Request content: " + sb.toString());
348 handleResponse(conn, opResult);
350 shutdownConnection(conn);
352 // Generate a metrics log so we can track how long the operation took.
353 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
355 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
356 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
364 public DocumentOperationResult createDocument(String indexName,
365 DocumentStoreDataEntity document,
366 boolean allowImplicitIndexCreation)
367 throws DocumentStoreOperationException {
369 if(!allowImplicitIndexCreation) {
371 // Before we do anything, make sure that the specified index actually exists in the
372 // document store - we don't want to rely on ElasticSearch to fail the document
373 // create because it could be configured to implicitly create a non-existent index,
374 // which can lead to hard-to-debug behaviour with queries down the road.
375 OperationResult indexExistsResult = checkIndexExistence(indexName);
376 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
378 DocumentOperationResult opResult = new DocumentOperationResult();
379 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
380 opResult.setResult("Document Index '" + indexName + "' does not exist.");
381 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
386 if (document.getId() == null || document.getId().isEmpty()) {
387 return createDocumentWithoutId(indexName, document);
389 return createDocumentWithId(indexName, document);
393 private DocumentOperationResult createDocumentWithId(String indexName,
394 DocumentStoreDataEntity document)
395 throws DocumentStoreOperationException {
396 // check if the document already exists
397 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
400 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
401 if (opResult.getResultCode() == Status.OK.getStatusCode()) {
402 opResult.setFailureCause("A document with the same id already exists.");
404 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
406 opResult.setResultCode(Status.CONFLICT.getStatusCode());
410 opResult = new DocumentOperationResult();
411 // Initialize operation result with a failure codes / fault string
412 opResult.setResultCode(500);
413 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
415 // Grab the current time so we can use it to generate a metrics log.
416 MdcOverride override = getStartTime(new MdcOverride());
418 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
419 + "/" + document.getId(), false);
420 HttpURLConnection conn = initializeConnection(fullUrl);
423 conn.setRequestMethod("PUT");
424 } catch (ProtocolException e) {
425 shutdownConnection(conn);
426 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
429 attachDocument(conn, document);
431 logger.debug("Sending 'PUT' request to: " + conn.getURL());
433 handleResponse(conn, opResult);
434 buildDocumentResult(opResult, indexName);
436 // Generate a metrics log so we can track how long the operation took.
437 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
439 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
440 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
444 shutdownConnection(conn);
450 private DocumentOperationResult createDocumentWithoutId(String indexName,
451 DocumentStoreDataEntity document)
452 throws DocumentStoreOperationException {
454 DocumentOperationResult response = new DocumentOperationResult();
455 // Initialize operation result with a failure codes / fault string
456 response.setResultCode(500);
457 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
459 // Grab the current time so we can use it to generate a metrics log.
460 MdcOverride override = getStartTime(new MdcOverride());
462 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
463 HttpURLConnection conn = initializeConnection(fullUrl);
466 conn.setRequestMethod("POST");
467 } catch (ProtocolException e) {
468 shutdownConnection(conn);
469 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
472 attachDocument(conn, document);
474 logger.debug("Sending 'POST' request to: " + conn.getURL());
476 handleResponse(conn, response);
477 buildDocumentResult(response, indexName);
479 // Generate a metrics log so we can track how long the operation took.
480 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
482 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
483 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
487 shutdownConnection(conn);
492 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
493 throws DocumentStoreOperationException {
494 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
495 conn.setRequestProperty("Connection", "Close");
497 attachContent(conn, doc.getContentInJson());
500 private DocumentOperationResult checkDocumentExistence(String indexName,
502 throws DocumentStoreOperationException {
503 DocumentOperationResult opResult = new DocumentOperationResult();
505 // Initialize operation result with a failure codes / fault string
506 opResult.setResultCode(500);
508 // Grab the current time so we can use it to generate a metrics log.
509 MdcOverride override = getStartTime(new MdcOverride());
511 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
512 HttpURLConnection conn = initializeConnection(fullUrl);
515 conn.setRequestMethod("HEAD");
516 } catch (ProtocolException e) {
517 shutdownConnection(conn);
518 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
521 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
525 resultCode = conn.getResponseCode();
526 } catch (IOException e) {
527 shutdownConnection(conn);
528 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
531 logger.debug("Response Code : " + resultCode);
533 opResult.setResultCode(resultCode);
535 // Generate a metrics log so we can track how long the operation took.
536 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
538 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
539 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
544 shutdownConnection(conn);
550 public DocumentOperationResult updateDocument(String indexName,
551 DocumentStoreDataEntity document,
552 boolean allowImplicitIndexCreation)
553 throws DocumentStoreOperationException {
555 if(!allowImplicitIndexCreation) {
557 // Before we do anything, make sure that the specified index actually exists in the
558 // document store - we don't want to rely on ElasticSearch to fail the document
559 // create because it could be configured to implicitly create a non-existent index,
560 // which can lead to hard-to-debug behaviour with queries down the road.
561 OperationResult indexExistsResult = checkIndexExistence(indexName);
562 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
564 DocumentOperationResult opResult = new DocumentOperationResult();
565 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
566 opResult.setResult("Document Index '" + indexName + "' does not exist.");
567 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
572 DocumentOperationResult opResult = new DocumentOperationResult();
574 // Initialize operation result with a failure codes / fault string
575 opResult.setResultCode(500);
576 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
578 // Grab the current time so we can use it to generate a metrics log.
579 MdcOverride override = getStartTime(new MdcOverride());
581 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
582 + "?version=" + document.getVersion(), false);
583 HttpURLConnection conn = initializeConnection(fullUrl);
586 conn.setRequestMethod("PUT");
587 } catch (ProtocolException e) {
588 shutdownConnection(conn);
589 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
592 attachDocument(conn, document);
594 logger.debug("Sending 'PUT' request to: " + conn.getURL());
596 handleResponse(conn, opResult);
597 buildDocumentResult(opResult, indexName);
599 // Generate a metrics log so we can track how long the operation took.
600 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
602 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
603 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
608 shutdownConnection(conn);
614 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
615 throws DocumentStoreOperationException {
616 DocumentOperationResult opResult = new DocumentOperationResult();
618 // Initialize operation result with a failure codes / fault string
619 opResult.setResultCode(500);
620 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
622 // Grab the current time so we can use it to generate a metrics log.
623 MdcOverride override = getStartTime(new MdcOverride());
625 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
626 + "?version=" + document.getVersion(), false);
627 HttpURLConnection conn = initializeConnection(fullUrl);
630 conn.setRequestMethod("DELETE");
631 } catch (ProtocolException e) {
632 shutdownConnection(conn);
633 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
636 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
638 handleResponse(conn, opResult);
639 buildDocumentResult(opResult, indexName);
640 //supress the etag and url in response for delete as they are not required
641 if (opResult.getDocument() != null) {
642 opResult.getDocument().setEtag(null);
643 opResult.getDocument().setUrl(null);
646 // Generate a metrics log so we can track how long the operation took.
647 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
649 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
650 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
655 shutdownConnection(conn);
661 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
662 throws DocumentStoreOperationException {
663 DocumentOperationResult opResult = new DocumentOperationResult();
665 // Initialize operation result with a failure codes / fault string
666 opResult.setResultCode(500);
667 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
669 // Grab the current time so we can use it to generate a metrics log.
670 MdcOverride override = getStartTime(new MdcOverride());
672 String fullUrl = null;
673 if (document.getVersion() == null) {
674 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
676 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
677 + "?version=" + document.getVersion(), false);
679 HttpURLConnection conn = initializeConnection(fullUrl);
681 logger.debug("\nSending 'GET' request to: " + conn.getURL());
683 handleResponse(conn, opResult);
684 buildDocumentResult(opResult, indexName);
686 // Generate a metrics log so we can track how long the operation took.
687 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
689 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
690 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
695 shutdownConnection(conn);
700 public SearchOperationResult search(String indexName, String queryString)
701 throws DocumentStoreOperationException {
702 SearchOperationResult opResult = new SearchOperationResult();
704 // Initialize operation result with a failure codes / fault string
705 opResult.setResultCode(500);
706 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
708 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
710 // Grab the current time so we can use it to generate a metrics log.
711 MdcOverride override = getStartTime(new MdcOverride());
713 HttpURLConnection conn = initializeConnection(fullUrl);
716 conn.setRequestMethod("GET");
717 } catch (ProtocolException e) {
718 shutdownConnection(conn);
719 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
722 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
724 handleResponse(conn, opResult);
725 buildSearchResult(opResult, indexName);
728 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
730 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
731 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
739 public SearchOperationResult searchWithPayload(String indexName, String query)
740 throws DocumentStoreOperationException {
741 SearchOperationResult opResult = new SearchOperationResult();
743 if (logger.isDebugEnabled()) {
744 logger.debug("Querying index: " + indexName + " with query string: " + query);
747 // Initialize operation result with a failure codes / fault string
748 opResult.setResultCode(500);
749 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
751 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
753 // Grab the current time so we can use it to generate a metrics log.
754 MdcOverride override = getStartTime(new MdcOverride());
756 HttpURLConnection conn = initializeConnection(fullUrl);
759 conn.setRequestMethod("POST");
760 } catch (ProtocolException e) {
761 shutdownConnection(conn);
762 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
765 attachContent(conn, query);
767 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
768 logger.debug("Request body = Elasticsearch query = " + query);
770 handleResponse(conn, opResult);
771 buildSearchResult(opResult, indexName);
773 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
775 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
776 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
781 shutdownConnection(conn);
786 private void attachContent(HttpURLConnection conn, String content)
787 throws DocumentStoreOperationException {
788 OutputStream outputStream = null;
789 OutputStreamWriter out = null;
792 outputStream = conn.getOutputStream();
793 } catch (IOException e) {
794 shutdownConnection(conn);
795 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
798 out = new OutputStreamWriter(outputStream);
803 } catch (IOException e) {
804 shutdownConnection(conn);
805 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
809 private HttpURLConnection initializeConnection(String fullUrl)
810 throws DocumentStoreOperationException {
812 HttpURLConnection conn = null;
815 url = new URL(fullUrl);
816 } catch (MalformedURLException e) {
817 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
821 conn = (HttpURLConnection) url.openConnection();
822 conn.setDoOutput(true);
823 } catch (IOException e) {
824 shutdownConnection(conn);
825 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
831 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
832 throws DocumentStoreOperationException {
833 int resultCode = 200;
836 resultCode = conn.getResponseCode();
837 } catch (IOException e) {
838 shutdownConnection(conn);
839 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
842 logger.debug("Response Code : " + resultCode);
844 InputStream inputStream = null;
846 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
847 inputStream = conn.getErrorStream();
850 inputStream = conn.getInputStream();
851 } catch (IOException e) {
852 shutdownConnection(conn);
853 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
857 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
858 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
860 StringBuilder result = new StringBuilder(128);
861 String string = null;
864 while ((string = bufferedreader.readLine()) != null) {
865 result.append(string).append("\n");
867 } catch (IOException e) {
868 shutdownConnection(conn);
869 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
872 if (resultCode == Status.CONFLICT.getStatusCode()) {
873 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
875 opResult.setResultCode(resultCode);
877 if (logger.isDebugEnabled()) {
878 logger.debug("Raw result string from ElasticSearch = " + result.toString());
880 opResult.setResult(result.toString());
881 opResult.setResultVersion(extractVersion(result.toString()));
884 private String extractVersion(String result) throws DocumentStoreOperationException {
886 JSONParser parser = new JSONParser();
887 String version = null;
889 JSONObject root = (JSONObject) parser.parse(result);
890 if (root.get("_version") != null) {
891 version = root.get("_version").toString();
894 } catch (ParseException e) {
896 // Not all responses from ElasticSearch include a version, so
897 // if we don't get one back, just return an empty string rather
898 // than trigger a false failure.
905 * This convenience method gets the current system time and stores
906 * it in an attribute in the supplied {@link MdcOverride} object so
907 * that it can be used later by the metrics logger.
909 * @param override - The {@link MdcOverride} object to update.
910 * @return - The supplied {@link MdcOverride} object.
912 private MdcOverride getStartTime(MdcOverride override) {
914 // Grab the current time...
915 long startTimeInMs = System.currentTimeMillis();
917 // ...and add it as an attribute to the supplied MDC Override
919 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
920 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
922 // Return the MdcOverride object that we were passed.
923 // This looks odd, but it allows us to do stuff like:
925 // MdcOverride ov = getStartTime(new MdcOverride())
927 // which is quite handy, but also allows us to pass in an existing
928 // MdcOverride object which already has some attributes set.
932 private boolean isSuccess(OperationResult result) {
934 return isSuccessCode(result.getResultCode());
938 private boolean isSuccessCode(int statusCode) {
939 return ((statusCode >= 200) && (statusCode < 300));
944 public OperationResult performBulkOperations(BulkRequest[] requests)
945 throws DocumentStoreOperationException {
947 if (logger.isDebugEnabled()) {
948 String dbgString = "ESController: performBulkOperations - Operations: ";
950 for (BulkRequest request : requests) {
951 dbgString += "[" + request.toString() + "] ";
954 logger.debug(dbgString);
957 // Grab the current time so we can use it to generate a metrics log.
958 MdcOverride override = getStartTime(new MdcOverride());
960 // Parse the supplied set of operations.
961 // Iterate over the list of operations which we were provided and
962 // translate them into a format that ElasticSearh understands.
964 StringBuilder esOperationSet = new StringBuilder(128);
965 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
966 for (BulkRequest request : requests) {
968 // Convert the request to the syntax ElasticSearch likes.
969 if (buildEsOperation(request, esOperationSet, rejected)) {
974 ElasticSearchBulkOperationResult opResult = null;
977 // Open an HTTP connection to the ElasticSearch back end.
978 String fullUrl = getFullUrl("/_bulk", false);
980 HttpURLConnection conn;
983 url = new URL(fullUrl);
984 conn = (HttpURLConnection) url.openConnection();
985 conn.setRequestMethod("PUT");
986 conn.setDoOutput(true);
987 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
988 conn.setRequestProperty("Connection", "Close");
990 } catch (IOException e) {
992 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
993 if (logger.isDebugEnabled()) {
994 logger.debug(Throwables.getStackTraceAsString(e));
997 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
998 + e.getMessage(), e);
1001 StringBuilder bulkResult = new StringBuilder(128);
1003 // Create an output stream to write our request to.
1004 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1007 if (logger.isDebugEnabled()) {
1008 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1009 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1013 // Write the resulting request string to our output stream. (this sends the request to ES?)
1014 out.write(esOperationSet.toString());
1017 // Open an input stream on our connection in order to read back the results.
1018 InputStream is = conn.getInputStream();
1019 InputStreamReader inputstreamreader = new InputStreamReader(is);
1020 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1022 // Read the contents of the input stream into our result string...
1023 String esResponseString = null;
1025 while ((esResponseString = bufferedreader.readLine()) != null) {
1026 bulkResult.append(esResponseString).append("\n");
1029 } catch (IOException e) {
1031 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1032 if (logger.isDebugEnabled()) {
1033 StringWriter sw = new StringWriter();
1034 e.printStackTrace(new PrintWriter(sw));
1035 logger.debug(sw.toString());
1038 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1039 + e.getMessage(), e);
1042 if (logger.isDebugEnabled()) {
1043 logger.debug("ESController: Received result string from ElasticSearch: = "
1044 + bulkResult.toString());
1047 // ...and marshal the resulting string into a Java object.
1049 opResult = marshallEsBulkResult(bulkResult.toString());
1051 } catch (IOException e) {
1053 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1054 if (logger.isDebugEnabled()) {
1055 logger.debug(Throwables.getStackTraceAsString(e));
1058 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1059 + e.getMessage(), e);
1063 // Finally, build the operation result and return it to the caller.
1064 OperationResult result = new OperationResult();
1065 result.setResultCode(207);
1066 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1068 // In the success case we don't want the entire result string to be
1069 // dumped into the metrics log, so concatenate it.
1070 String resultStringForMetricsLog = result.getResult();
1071 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1072 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1073 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1076 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1078 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1079 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1087 * This method converts a {@link BulkRequest} object into a json structure
1088 * which can be understood by ElasticSearch.
1090 * @param request - The request to be performed.
1091 * @param sb - The string builder to append the json data to
1092 * @throws DocumentStoreOperationException
1094 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1095 List<ElasticSearchResultItem> fails)
1096 throws DocumentStoreOperationException {
1098 boolean retVal = true;
1099 OperationResult indexExistsResult = null;
1101 // What kind of operation are we performing?
1102 switch (request.getOperationType()) {
1104 // Create a new document.
1107 // Make sure that we were supplied a document payload.
1108 if (request.getOperation().getDocument() == null) {
1110 fails.add(generateRejectionEntry(request.getOperationType(),
1111 "Missing document payload",
1115 request.getOperation().getMetaData().getUrl()));
1119 // Make sure that the supplied document URL is formatted
1121 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1122 fails.add(generateRejectionEntry(request.getOperationType(),
1123 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1127 request.getOperation().getMetaData().getUrl()));
1131 // Validate that the specified index actually exists before we
1132 // try to perform the create.
1133 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1135 fails.add(generateRejectionEntry(request.getOperationType(),
1136 "Specified resource does not exist: "
1137 + request.getOperation().getMetaData().getUrl(),
1141 request.getOperation().getMetaData().getUrl()));
1145 // If we were supplied an id for the new document, then
1146 // include it in the bulk operation to Elastic Search
1147 if (request.getId() == null) {
1149 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1153 // Otherwise, we just leave that parameter off and ElasticSearch
1154 // will generate one for us.
1156 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1163 // Append the document that we want to create.
1164 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1165 } catch (JsonProcessingException e) {
1166 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1171 // Update an existing document.
1174 // Make sure that we were supplied a document payload.
1175 if (request.getOperation().getDocument() == null) {
1177 fails.add(generateRejectionEntry(request.getOperationType(),
1178 "Missing document payload",
1182 request.getOperation().getMetaData().getUrl()));
1186 // Make sure that the supplied document URL is formatted
1188 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1189 fails.add(generateRejectionEntry(request.getOperationType(),
1190 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1194 request.getOperation().getMetaData().getUrl()));
1198 // Validate that the specified index actually exists before we
1199 // try to perform the update.
1200 if (!indexExists(request.getIndex())) {
1202 fails.add(generateRejectionEntry(request.getOperationType(),
1203 "Specified resource does not exist: "
1204 + request.getOperation().getMetaData().getUrl(),
1208 request.getOperation().getMetaData().getUrl()));
1212 // Validate that the document we are trying to update actually
1213 // exists before we try to perform the update.
1214 if (!documentExists(request.getIndex(), request.getId())) {
1216 fails.add(generateRejectionEntry(request.getOperationType(),
1217 "Specified resource does not exist: "
1218 + request.getOperation().getMetaData().getUrl(),
1222 request.getOperation().getMetaData().getUrl()));
1226 // It is mandatory that a version be supplied for an update operation,
1227 // so validate that now.
1228 if (request.getOperation().getMetaData().getEtag() == null) {
1230 fails.add(generateRejectionEntry(request.getOperationType(),
1231 "Missing mandatory ETag field",
1235 request.getOperation().getMetaData().getUrl()));
1239 // Generate the update request...
1240 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1244 request.getOperation().getMetaData().getEtag()));
1246 // ...and append the document that we want to update.
1248 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1249 } catch (JsonProcessingException e) {
1250 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1254 // Delete an existing document.
1257 // Make sure that the supplied document URL is formatted
1259 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1260 fails.add(generateRejectionEntry(request.getOperationType(),
1261 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1265 request.getOperation().getMetaData().getUrl()));
1269 // Validate that the specified index actually exists before we
1270 // try to perform the delete.
1271 if (!indexExists(request.getIndex())) {
1273 fails.add(generateRejectionEntry(request.getOperationType(),
1274 "Specified resource does not exist: "
1275 + request.getOperation().getMetaData().getUrl(),
1279 request.getOperation().getMetaData().getUrl()));
1283 // Validate that the document we are trying to update actually
1284 // exists before we try to perform the delete.
1285 if (!documentExists(request.getIndex(), request.getId())) {
1287 fails.add(generateRejectionEntry(request.getOperationType(),
1288 "Specified resource does not exist: "
1289 + request.getOperation().getMetaData().getUrl(),
1293 request.getOperation().getMetaData().getUrl()));
1297 // It is mandatory that a version be supplied for a delete operation,
1298 // so validate that now.
1299 if (request.getOperation().getMetaData().getEtag() == null) {
1301 fails.add(generateRejectionEntry(request.getOperationType(),
1302 "Missing mandatory ETag field",
1306 request.getOperation().getMetaData().getUrl()));
1310 // Generate the delete request.
1311 sb.append(String.format(BULK_DELETE_TEMPLATE,
1315 request.getOperation().getMetaData().getEtag()));
1323 private boolean indexExists(String index) throws DocumentStoreOperationException {
1325 OperationResult indexExistsResult = checkIndexExistence(index);
1327 return ((indexExistsResult.getResultCode() >= 200)
1328 && (indexExistsResult.getResultCode() < 300));
1331 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1333 OperationResult docExistsResult = checkDocumentExistence(index, id);
1335 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1339 * This method constructs a status entry for a bulk operation which has
1340 * been rejected before even sending it to the document store.
1342 * @param rejectReason - A message describing why the operation was rejected.
1343 * @param anId - The identifier associated with the document being
1345 * @param statusCode - An HTTP status code.
1346 * @return - A result set item.
1348 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1349 String rejectReason,
1353 String originalUrl) {
1355 ElasticSearchError err = new ElasticSearchError();
1356 err.setReason(rejectReason);
1358 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1361 op.setStatus(statusCode);
1363 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1365 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1369 rejectionResult.setCreate(op);
1372 rejectionResult.setIndex(op);
1375 rejectionResult.setDelete(op);
1380 return rejectionResult;
1385 * This method takes the json structure returned from ElasticSearch in
1386 * response to a bulk operations request and marshals it into a Java
1389 * @param jsonResult - The bulk operations response returned from
1391 * @return - The marshalled response.
1392 * @throws JsonParseException
1393 * @throws JsonMappingException
1394 * @throws IOException
1396 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1397 throws JsonParseException, JsonMappingException, IOException {
1399 if (jsonResult != null) {
1400 if (logger.isDebugEnabled()) {
1401 logger.debug("ESController: Marshalling ES result set from json: "
1402 + jsonResult.replaceAll("\n", ""));
1405 ObjectMapper mapper = new ObjectMapper();
1406 mapper.setSerializationInclusion(Include.NON_EMPTY);
1408 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1416 * This method takes the marshalled ElasticSearch bulk response and
1417 * converts it into a generic response payload.
1419 * @param esResult - ElasticSearch bulk operations response.
1420 * @return - A generic result set.
1422 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1423 List<ElasticSearchResultItem> rejectedOps) {
1426 int totalSuccess = 0;
1429 if (logger.isDebugEnabled()) {
1431 logger.debug("ESController: Build generic result set. ES Results: "
1432 + ((esResult != null) ? esResult.toString() : "[]")
1433 + " Rejected Ops: " + rejectedOps.toString());
1436 // Build a combined list of result items from the results returned
1437 // from ElasticSearch and the list of operations that we rejected
1438 // without sending to ElasticSearch.
1439 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1440 if (esResult != null) {
1441 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1443 combinedResults.addAll(rejectedOps);
1445 // Iterate over the individual results in the resulting result set.
1446 StringBuilder resultsBuilder = new StringBuilder();
1447 AtomicBoolean firstItem = new AtomicBoolean(true);
1448 for (ElasticSearchResultItem item : combinedResults) {
1450 // Increment the operation counts.
1452 if (isSuccessCode(item.operationStatus().getStatus())) {
1458 // Prepend a comma to our response string unless this it the
1459 // first result in the set.
1460 if (!firstItem.compareAndSet(true, false)) {
1461 resultsBuilder.append(", ");
1464 // Append the current result as a generic json structure.
1465 resultsBuilder.append(item.toJson());
1468 // Now, build the result string and return it.
1469 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1470 + "\"total_success\": " + totalSuccess + ", "
1471 + "\"total_fails\": " + totalFails + ", "
1473 + resultsBuilder.toString()
1476 return responseBody;
1481 * This method queryies ElasticSearch to determine if the supplied
1482 * index is present in the document store.
1484 * @param indexName - The index to look for.
1485 * @return - An operation result indicating the success or failure of
1487 * @throws DocumentStoreOperationException
1489 public OperationResult checkIndexExistence(String indexName)
1490 throws DocumentStoreOperationException {
1492 // Initialize operation result with a failure codes / fault string
1493 OperationResult opResult = new OperationResult();
1494 opResult.setResultCode(500);
1496 // Grab the current time so we can use it to generate a metrics log.
1497 MdcOverride override = getStartTime(new MdcOverride());
1499 String fullUrl = getFullUrl("/" + indexName, false);
1500 HttpURLConnection conn = initializeConnection(fullUrl);
1503 conn.setRequestMethod("HEAD");
1505 } catch (ProtocolException e) {
1506 shutdownConnection(conn);
1507 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1510 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1514 resultCode = conn.getResponseCode();
1515 } catch (IOException e) {
1516 shutdownConnection(conn);
1517 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1519 logger.debug("Response Code : " + resultCode);
1521 opResult.setResultCode(resultCode);
1523 // Generate a metrics log so we can track how long the operation took.
1524 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1526 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1527 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1531 shutdownConnection(conn);
1537 private void buildDocumentResult(DocumentOperationResult result, String index)
1538 throws DocumentStoreOperationException {
1540 JSONParser parser = new JSONParser();
1543 root = (JSONObject) parser.parse(result.getResult());
1545 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1546 // Success response object
1547 Document doc = new Document();
1548 doc.setEtag(result.getResultVersion());
1549 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1551 doc.setContent((JSONObject) root.get("_source"));
1552 result.setDocument(doc);
1555 // Error response object
1556 JSONObject error = (JSONObject) root.get("error");
1557 if (error != null) {
1558 result.setError(new ErrorResult(error.get("type").toString(),
1559 error.get("reason").toString()));
1563 } catch (Exception e) {
1564 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1565 + result.getResult());
1571 private String buildDocumentResponseUrl(String index, String id) {
1572 return ApiUtils.buildDocumentUri(index, id);
1575 private void buildSearchResult(SearchOperationResult result, String index)
1576 throws DocumentStoreOperationException {
1578 JSONParser parser = new JSONParser();
1582 root = (JSONObject) parser.parse(result.getResult());
1583 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1584 JSONObject hits = (JSONObject) root.get("hits");
1585 JSONArray hitArray = (JSONArray) hits.get("hits");
1586 SearchHits searchHits = new SearchHits();
1587 searchHits.setTotalHits(hits.get("total").toString());
1588 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1590 for (int i = 0; i < hitArray.size(); i++) {
1591 JSONObject hit = (JSONObject) hitArray.get(i);
1592 SearchHit searchHit = new SearchHit();
1593 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1594 Document doc = new Document();
1595 if (hit.get("_version") != null) {
1596 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1599 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1600 ? hit.get("_id").toString() : ""));
1601 doc.setContent((JSONObject) hit.get("_source"));
1602 searchHit.setDocument(doc);
1603 searchHitArray.add(searchHit);
1605 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1606 result.setSearchResult(searchHits);
1608 JSONObject aggregations = (JSONObject) root.get("aggregations");
1609 if (aggregations != null) {
1610 AggregationResult[] aggResults =
1611 AggregationParsingUtil.parseAggregationResults(aggregations);
1612 AggregationResults aggs = new AggregationResults();
1613 aggs.setAggregations(aggResults);
1614 result.setAggregationResult(aggs);
1619 JSONObject error = (JSONObject) root.get("error");
1620 if (error != null) {
1621 result.setError(new ErrorResult(error.get("type").toString(),
1622 error.get("reason").toString()));
1625 } catch (Exception e) {
1626 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1627 + result.getResult());