2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.eclipse.jetty.http.HttpStatus;
57 import org.json.simple.JSONArray;
58 import org.json.simple.JSONObject;
59 import org.json.simple.parser.JSONParser;
60 import org.json.simple.parser.ParseException;
61 import org.onap.aai.cl.api.LogFields;
62 import org.onap.aai.cl.api.LogLine;
63 import org.onap.aai.cl.api.Logger;
64 import org.onap.aai.cl.eelf.LoggerFactory;
65 import org.onap.aai.cl.mdc.MdcContext;
66 import org.onap.aai.cl.mdc.MdcOverride;
67 import org.onap.aai.sa.rest.AnalysisConfiguration;
68 import org.onap.aai.sa.rest.ApiUtils;
69 import org.onap.aai.sa.rest.BulkRequest;
70 import org.onap.aai.sa.rest.BulkRequest.OperationType;
71 import org.onap.aai.sa.rest.DocumentSchema;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
73 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
75 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
76 import org.onap.aai.sa.searchdbabstraction.entity.Document;
77 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
80 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
81 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
82 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
84 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
85 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
86 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
87 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
88 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
89 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
90 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
91 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
94 * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
96 public class ElasticSearchHttpController implements DocumentStoreInterface {
98 private static ElasticSearchHttpController instance = null;
100 private static final Logger logger =
101 LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
102 private static final Logger metricsLogger =
103 LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
105 private static final String JSON_ATTR_VERSION = "_version";
106 private static final String JSON_ATTR_ERROR = "error";
107 private static final String JSON_ATTR_REASON = "reason";
109 private static final String DEFAULT_TYPE = "default";
110 private static final String QUERY_PARAM_VERSION = "?version=";
112 private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
113 private static final String MSG_RESPONSE_CODE = "Response Code : ";
114 private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
115 private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
116 private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
117 private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
118 "Failed to get the response code from the connection.";
119 private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
121 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
122 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
123 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
124 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
125 private static final String BULK_IMPORT_INDEX_TEMPLATE =
126 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
127 private static final String BULK_DELETE_TEMPLATE =
128 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
130 private final ElasticSearchConfig config;
132 protected AnalysisConfiguration analysisConfig;
135 public ElasticSearchHttpController(ElasticSearchConfig config) {
136 this.config = config;
137 analysisConfig = new AnalysisConfiguration();
140 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
142 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
143 } catch (Exception e) {
144 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
149 public static ElasticSearchHttpController getInstance() {
150 synchronized (ElasticSearchHttpController.class) {
151 if (instance == null) {
152 Properties properties = new Properties();
153 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
155 properties.load(new FileInputStream(file));
156 } catch (Exception e) {
157 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
158 e.getLocalizedMessage());
161 ElasticSearchConfig config = new ElasticSearchConfig(properties);
162 instance = new ElasticSearchHttpController(config);
169 public AnalysisConfiguration getAnalysisConfig() {
170 return analysisConfig;
174 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
176 // Submit the request to ElasticSearch to create the index using a default document type.
177 OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
178 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
180 // ElasticSearch will return us a 200 code on success when we
181 // want to report a 201, so translate the result here.
182 if (result.getResultCode() == Status.OK.getStatusCode()) {
183 result.setResultCode(Status.CREATED.getStatusCode());
186 if (isSuccess(result)) {
187 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
190 } catch (DocumentStoreOperationException | IOException e) {
191 return new OperationResultBuilder().useDefaults()
192 .failureCause("Document store operation failure. Cause: " + e.getMessage()).build();
198 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
200 OperationResult result = createTable(index, dynamicSchema);
202 // ElasticSearch will return us a 200 code on success when we
203 // want to report a 201, so translate the result here.
204 if (result.getResultCode() == Status.OK.getStatusCode()) {
205 result.setResultCode(Status.CREATED.getStatusCode());
207 if (isSuccess(result)) {
208 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
211 } catch (DocumentStoreOperationException e) {
212 return new OperationResultBuilder().useDefaults()
213 .failureCause("Document store operation failure. Cause: " + e.getMessage()).build();
218 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
219 // Grab the current time so we can use it to generate a metrics log.
220 MdcOverride override = getStartTime(new MdcOverride());
222 String fullUrl = getFullUrl("/" + indexName + "/", false);
223 HttpURLConnection conn = initializeConnection(fullUrl);
225 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
228 conn.setRequestMethod("DELETE");
229 } catch (ProtocolException e) {
230 shutdownConnection(conn);
231 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
234 OperationResult opResult = handleResponse(conn);
235 logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
236 shutdownConnection(conn);
241 private OperationResult checkConnection() throws IOException {
242 String fullUrl = getFullUrl("/_cluster/health", false);
244 HttpURLConnection conn = null;
246 url = new URL(fullUrl);
247 conn = (HttpURLConnection) url.openConnection();
248 conn.setRequestMethod("GET");
249 conn.setDoOutput(true);
250 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
252 int resultCode = conn.getResponseCode();
253 logger.debug("getClusterHealth() response Code : " + resultCode);
255 shutdownConnection(conn);
257 return new OperationResultBuilder().resultCode(resultCode).build();
260 private String getFullUrl(String resourceUrl, boolean isSecure) {
262 final String host = config.getIpAddress();
263 final String port = config.getHttpPort();
266 return String.format("https://%s:%s%s", host, port, resourceUrl);
268 return String.format("http://%s:%s%s", host, port, resourceUrl);
272 private void shutdownConnection(HttpURLConnection connection) {
273 if (connection == null) {
277 final String methodName = "shutdownConnection";
278 InputStream inputstream = null;
279 OutputStream outputstream = null;
282 inputstream = connection.getInputStream();
283 } catch (IOException e) {
284 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
286 if (inputstream != null) {
289 } catch (IOException e) {
290 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
296 outputstream = connection.getOutputStream();
297 } catch (IOException e) {
298 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
300 if (outputstream != null) {
302 outputstream.close();
303 } catch (IOException e) {
304 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
309 connection.disconnect();
313 protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
314 throws DocumentStoreOperationException {
315 if (indexSettings == null) {
316 logger.debug("No settings provided.");
319 if (indexMappings == null) {
320 logger.debug("No mappings provided.");
323 // Grab the current time so we can use it to generate a metrics log.
324 MdcOverride override = getStartTime(new MdcOverride());
326 String fullUrl = getFullUrl("/" + indexName + "/", false);
327 HttpURLConnection conn = initializeConnection(fullUrl);
330 conn.setRequestMethod("PUT");
331 } catch (ProtocolException e) {
332 shutdownConnection(conn);
333 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
336 StringBuilder sb = new StringBuilder(128);
337 sb.append("{ \"settings\" : ");
338 sb.append(indexSettings);
341 sb.append("\"mappings\" : {");
342 sb.append("\"" + typeName + "\" :");
343 sb.append(indexMappings);
347 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
348 } catch (IOException e) {
349 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
350 throw new DocumentStoreOperationException(e.getMessage(), e);
353 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
354 logger.debug("Request content: " + sb);
356 OperationResult opResult = handleResponse(conn);
357 shutdownConnection(conn);
358 logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
364 * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
367 * @param indexName - The name of the index to be created
368 * @param settingsAndMappings - The actual JSON object that will define the index
369 * @return - The operation result of writing into Elasticsearch
370 * @throws DocumentStoreOperationException
372 protected OperationResult createTable(String indexName, String settingsAndMappings)
373 throws DocumentStoreOperationException {
374 // Grab the current time so we can use it to generate a metrics log.
375 MdcOverride override = getStartTime(new MdcOverride());
377 String fullUrl = getFullUrl("/" + indexName + "/", false);
378 HttpURLConnection conn = initializeConnection(fullUrl);
381 conn.setRequestMethod("PUT");
382 } catch (ProtocolException e) {
383 shutdownConnection(conn);
384 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
388 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
389 } catch (IOException e) {
390 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
391 throw new DocumentStoreOperationException(e.getMessage());
394 OperationResult result = handleResponse(conn);
395 logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
401 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
402 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
404 if (!allowImplicitIndexCreation) {
405 // Before we do anything, make sure that the specified index actually exists in the
406 // document store - we don't want to rely on ElasticSearch to fail the document
407 // create because it could be configured to implicitly create a non-existent index,
408 // which can lead to hard-to-debug behaviour with queries down the road.
409 OperationResult indexExistsResult = checkIndexExistence(indexName);
410 if (!isSuccess(indexExistsResult)) {
411 String resultMsg = "Document Index '" + indexName + "' does not exist.";
412 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
413 .result(resultMsg).failureCause(resultMsg).build();
417 if (document.getId() == null || document.getId().isEmpty()) {
418 return createDocumentWithoutId(indexName, document);
420 return createDocumentWithId(indexName, document);
424 private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
425 throws DocumentStoreOperationException {
426 // check if the document already exists
427 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
429 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
430 if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
431 opResult.setFailureCause("A document with the same id already exists.");
433 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
435 opResult.setResultCode(Status.CONFLICT.getStatusCode());
439 // Grab the current time so we can use it to generate a metrics log.
440 MdcOverride override = getStartTime(new MdcOverride());
442 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
443 HttpURLConnection conn = initializeConnection(fullUrl);
446 conn.setRequestMethod("PUT");
447 } catch (ProtocolException e) {
448 shutdownConnection(conn);
449 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
452 attachDocument(conn, document);
454 logger.debug("Sending 'PUT' request to: " + conn.getURL());
456 opResult = getOperationResult(conn);
457 buildDocumentResult(opResult, indexName);
459 logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
461 shutdownConnection(conn);
466 private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
467 throws DocumentStoreOperationException {
468 // Grab the current time so we can use it to generate a metrics log.
469 MdcOverride override = getStartTime(new MdcOverride());
471 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
472 HttpURLConnection conn = initializeConnection(fullUrl);
475 conn.setRequestMethod("POST");
476 } catch (ProtocolException e) {
477 shutdownConnection(conn);
478 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
481 attachDocument(conn, document);
483 logger.debug("Sending 'POST' request to: " + conn.getURL());
485 DocumentOperationResult response = getOperationResult(conn);
486 buildDocumentResult(response, indexName);
488 logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
490 shutdownConnection(conn);
495 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
496 throws DocumentStoreOperationException {
497 conn.setRequestProperty("Connection", "Close");
498 attachContent(conn, doc.getContentInJson());
501 private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
502 throws DocumentStoreOperationException {
503 // Grab the current time so we can use it to generate a metrics log.
504 MdcOverride override = getStartTime(new MdcOverride());
506 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
507 HttpURLConnection conn = initializeConnection(fullUrl);
510 conn.setRequestMethod("HEAD");
511 } catch (ProtocolException e) {
512 shutdownConnection(conn);
513 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
516 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
520 resultCode = conn.getResponseCode();
521 } catch (IOException e) {
522 shutdownConnection(conn);
523 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
526 logger.debug(MSG_RESPONSE_CODE + resultCode);
528 DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
529 .useDefaults().resultCode(resultCode).build();
531 logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
532 shutdownConnection(conn);
538 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
539 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
541 if (!allowImplicitIndexCreation) {
542 // Before we do anything, make sure that the specified index actually exists in the
543 // document store - we don't want to rely on ElasticSearch to fail the document
544 // create because it could be configured to implicitly create a non-existent index,
545 // which can lead to hard-to-debug behaviour with queries down the road.
546 OperationResult indexExistsResult = checkIndexExistence(indexName);
547 if (!isSuccess(indexExistsResult)) {
548 DocumentOperationResult opResult = new DocumentOperationResult();
549 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
550 String resultMsg = "Document Index '" + indexName + "' does not exist.";
551 opResult.setResult(resultMsg);
552 opResult.setFailureCause(resultMsg);
557 // Grab the current time so we can use it to generate a metrics log.
558 MdcOverride override = getStartTime(new MdcOverride());
560 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
561 + document.getVersion(), false);
562 HttpURLConnection conn = initializeConnection(fullUrl);
565 conn.setRequestMethod("PUT");
566 } catch (ProtocolException e) {
567 shutdownConnection(conn);
568 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
571 attachDocument(conn, document);
573 logger.debug("Sending 'PUT' request to: " + conn.getURL());
575 DocumentOperationResult opResult = getOperationResult(conn);
576 buildDocumentResult(opResult, indexName);
578 logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
580 shutdownConnection(conn);
586 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
587 throws DocumentStoreOperationException {
588 // Grab the current time so we can use it to generate a metrics log.
589 MdcOverride override = getStartTime(new MdcOverride());
591 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
592 + document.getVersion(), false);
593 HttpURLConnection conn = initializeConnection(fullUrl);
596 conn.setRequestMethod("DELETE");
597 } catch (ProtocolException e) {
598 shutdownConnection(conn);
599 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
602 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
604 DocumentOperationResult opResult = getOperationResult(conn);
605 buildDocumentResult(opResult, indexName);
606 // supress the etag and url in response for delete as they are not required
607 if (opResult.getDocument() != null) {
608 opResult.getDocument().setEtag(null);
609 opResult.getDocument().setUrl(null);
612 logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
614 shutdownConnection(conn);
620 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
621 throws DocumentStoreOperationException {
622 // Grab the current time so we can use it to generate a metrics log.
623 MdcOverride override = getStartTime(new MdcOverride());
625 String fullUrl = null;
626 if (document.getVersion() == null) {
627 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
629 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
630 + document.getVersion(), false);
632 HttpURLConnection conn = initializeConnection(fullUrl);
634 logger.debug("\nSending 'GET' request to: " + conn.getURL());
636 DocumentOperationResult opResult = getOperationResult(conn);
637 buildDocumentResult(opResult, indexName);
639 logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
641 shutdownConnection(conn);
647 public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
649 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
651 // Grab the current time so we can use it to generate a metrics log.
652 MdcOverride override = getStartTime(new MdcOverride());
654 HttpURLConnection conn = initializeConnection(fullUrl);
657 conn.setRequestMethod("GET");
658 } catch (ProtocolException e) {
659 shutdownConnection(conn);
660 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
663 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
665 SearchOperationResult opResult = getSearchOperationResult(conn);
666 buildSearchResult(opResult, indexName);
668 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
674 public SearchOperationResult searchWithPayload(String indexName, String query)
675 throws DocumentStoreOperationException {
676 if (logger.isDebugEnabled()) {
677 logger.debug("Querying index: " + indexName + " with query string: " + query);
680 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
682 // Grab the current time so we can use it to generate a metrics log.
683 MdcOverride override = getStartTime(new MdcOverride());
685 HttpURLConnection conn = initializeConnection(fullUrl);
688 conn.setRequestMethod("POST");
689 } catch (ProtocolException e) {
690 shutdownConnection(conn);
691 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
694 attachContent(conn, query);
696 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
697 logger.debug("Request body = Elasticsearch query = " + query);
699 SearchOperationResult opResult = getSearchOperationResult(conn);
700 buildSearchResult(opResult, indexName);
702 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
704 shutdownConnection(conn);
711 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
712 throws DocumentStoreOperationException {
713 if (logger.isDebugEnabled()) {
714 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
717 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
719 // Grab the current time so we can use it to generate a metrics log.
720 MdcOverride override = getStartTime(new MdcOverride());
722 HttpURLConnection conn = initializeConnection(fullUrl);
725 conn.setRequestMethod("POST");
726 } catch (ProtocolException e) {
727 shutdownConnection(conn);
728 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
731 attachContent(conn, query);
733 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
734 logger.debug("Request body = Elasticsearch query = " + query);
736 SearchOperationResult opResult = getSearchOperationResult(conn);
737 buildSuggestResult(opResult, indexName);
739 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
741 shutdownConnection(conn);
746 private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
747 OutputStream outputStream = null;
748 OutputStreamWriter out = null;
751 outputStream = conn.getOutputStream();
752 } catch (IOException e) {
753 shutdownConnection(conn);
754 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
757 out = new OutputStreamWriter(outputStream);
762 } catch (IOException e) {
763 shutdownConnection(conn);
764 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
768 private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
770 HttpURLConnection conn = null;
773 url = new URL(fullUrl);
774 } catch (MalformedURLException e) {
775 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
779 conn = (HttpURLConnection) url.openConnection();
780 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
781 conn.setDoOutput(true);
782 } catch (IOException e) {
783 shutdownConnection(conn);
784 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
790 private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
791 return handleResponse(conn, new OperationResultBuilder().useDefaults());
794 private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
795 throws DocumentStoreOperationException {
799 resultCode = conn.getResponseCode();
800 } catch (IOException e) {
801 shutdownConnection(conn);
802 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
805 logger.debug(MSG_RESPONSE_CODE + resultCode);
807 InputStream inputStream = null;
809 if (!isSuccessCode(resultCode)) {
810 inputStream = conn.getErrorStream();
813 inputStream = conn.getInputStream();
814 } catch (IOException e) {
815 shutdownConnection(conn);
816 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
820 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
821 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
823 StringBuilder result = new StringBuilder(128);
824 String string = null;
827 while ((string = bufferedreader.readLine()) != null) {
828 result.append(string).append("\n");
830 } catch (IOException e) {
831 shutdownConnection(conn);
832 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
835 if (resultCode == Status.CONFLICT.getStatusCode()) {
836 rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
838 rb.resultCode(resultCode);
840 if (logger.isDebugEnabled()) {
841 logger.debug("Raw result string from ElasticSearch = " + result.toString());
843 rb.result(result.toString());
844 rb.resultVersion(extractVersion(result.toString()));
848 private String extractVersion(String result) {
849 JSONParser parser = new JSONParser();
850 String version = null;
852 JSONObject root = (JSONObject) parser.parse(result);
853 if (root.get(JSON_ATTR_VERSION) != null) {
854 version = root.get(JSON_ATTR_VERSION).toString();
856 } catch (ParseException e) {
857 // Not all responses from ElasticSearch include a version, so
858 // if we don't get one back, just return an empty string rather
859 // than trigger a false failure.
866 * This convenience method gets the current system time and stores it in an attribute in the supplied
867 * {@link MdcOverride} object so that it can be used later by the metrics logger.
869 * @param override - The {@link MdcOverride} object to update.
870 * @return - The supplied {@link MdcOverride} object.
872 private MdcOverride getStartTime(MdcOverride override) {
874 // Grab the current time...
875 long startTimeInMs = System.currentTimeMillis();
877 // ...and add it as an attribute to the supplied MDC Override
879 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
880 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
882 // Return the MdcOverride object that we were passed.
883 // This looks odd, but it allows us to do stuff like:
885 // MdcOverride ov = getStartTime(new MdcOverride())
887 // which is quite handy, but also allows us to pass in an existing
888 // MdcOverride object which already has some attributes set.
892 private boolean isSuccess(OperationResult result) {
893 return isSuccessCode(result.getResultCode());
896 private boolean isSuccessCode(int statusCode) {
897 return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
901 public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
902 if (logger.isDebugEnabled()) {
903 StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
905 for (BulkRequest request : requests) {
906 dbgString.append("[").append(request).append("] ");
909 logger.debug(dbgString.toString());
912 // Grab the current time so we can use it to generate a metrics log.
913 MdcOverride override = getStartTime(new MdcOverride());
915 // Parse the supplied set of operations.
916 // Iterate over the list of operations which we were provided and
917 // translate them into a format that ElasticSearh understands.
919 StringBuilder esOperationSet = new StringBuilder(128);
920 List<ElasticSearchResultItem> rejected = new ArrayList<>();
921 for (BulkRequest request : requests) {
923 // Convert the request to the syntax ElasticSearch likes.
924 if (buildEsOperation(request, esOperationSet, rejected)) {
929 ElasticSearchBulkOperationResult opResult = null;
932 // Open an HTTP connection to the ElasticSearch back end.
933 String fullUrl = getFullUrl("/_bulk", false);
935 HttpURLConnection conn;
938 url = new URL(fullUrl);
939 conn = (HttpURLConnection) url.openConnection();
940 conn.setRequestMethod("PUT");
941 conn.setDoOutput(true);
942 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
943 conn.setRequestProperty("Connection", "Close");
945 } catch (IOException e) {
947 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
948 if (logger.isDebugEnabled()) {
949 logger.debug(Throwables.getStackTraceAsString(e));
952 throw new DocumentStoreOperationException(
953 "Failed to open connection to document store. Cause: " + e.getMessage(), e);
956 StringBuilder bulkResult = new StringBuilder(128);
958 // Create an output stream to write our request to.
959 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
961 if (logger.isDebugEnabled()) {
962 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
963 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
966 // Write the resulting request string to our output stream. (this sends the request to ES?)
967 out.write(esOperationSet.toString());
970 // Open an input stream on our connection in order to read back the results.
971 InputStream is = conn.getInputStream();
972 InputStreamReader inputstreamreader = new InputStreamReader(is);
973 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
975 // Read the contents of the input stream into our result string...
976 String esResponseString = null;
978 while ((esResponseString = bufferedreader.readLine()) != null) {
979 bulkResult.append(esResponseString).append("\n");
982 } catch (IOException e) {
984 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
985 if (logger.isDebugEnabled()) {
986 StringWriter sw = new StringWriter();
987 e.printStackTrace(new PrintWriter(sw));
988 logger.debug(sw.toString());
991 throw new DocumentStoreOperationException(
992 "Failure interacting with document store. Cause: " + e.getMessage(), e);
995 if (logger.isDebugEnabled()) {
996 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
999 // ...and marshal the resulting string into a Java object.
1001 opResult = marshallEsBulkResult(bulkResult.toString());
1003 } catch (IOException e) {
1005 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1006 if (logger.isDebugEnabled()) {
1007 logger.debug(Throwables.getStackTraceAsString(e));
1010 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
1015 OperationResult result = new OperationResultBuilder() //
1016 .resultCode(HttpStatus.MULTI_STATUS_207) //
1017 .result(buildGenericBulkResultSet(opResult, rejected)) //
1020 // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
1022 String resultStringForMetricsLog = result.getResult();
1023 if (isSuccess(result)) {
1024 resultStringForMetricsLog =
1025 resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1028 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1030 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1031 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1038 * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1040 * @param request - The request to be performed.
1041 * @param sb - The string builder to append the json data to
1042 * @throws DocumentStoreOperationException
1044 private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1045 throws DocumentStoreOperationException {
1047 boolean retVal = true;
1048 // What kind of operation are we performing?
1049 switch (request.getOperationType()) {
1051 // Create a new document.
1054 // Make sure that we were supplied a document payload.
1055 if (request.getOperation().getDocument() == null) {
1057 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1058 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1062 // Make sure that the supplied document URL is formatted
1064 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1065 fails.add(generateRejectionEntry(request.getOperationType(),
1066 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1067 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1071 // Validate that the specified index actually exists before we
1072 // try to perform the create.
1073 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1075 fails.add(generateRejectionEntry(request.getOperationType(),
1076 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1077 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1081 // If we were supplied an id for the new document, then
1082 // include it in the bulk operation to Elastic Search
1083 if (request.getId() == null) {
1085 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1087 // Otherwise, we just leave that parameter off and ElasticSearch
1088 // will generate one for us.
1090 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1095 // Append the document that we want to create.
1096 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1097 } catch (JsonProcessingException e) {
1098 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1103 // Update an existing document.
1106 // Make sure that we were supplied a document payload.
1107 if (request.getOperation().getDocument() == null) {
1109 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1110 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1114 // Make sure that the supplied document URL is formatted
1116 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1117 fails.add(generateRejectionEntry(request.getOperationType(),
1118 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1119 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1123 // Validate that the specified index actually exists before we
1124 // try to perform the update.
1125 if (!indexExists(request.getIndex())) {
1127 fails.add(generateRejectionEntry(request.getOperationType(),
1128 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1129 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1133 // Validate that the document we are trying to update actually
1134 // exists before we try to perform the update.
1135 if (!documentExists(request.getIndex(), request.getId())) {
1137 fails.add(generateRejectionEntry(request.getOperationType(),
1138 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1139 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1143 // It is mandatory that a version be supplied for an update operation,
1144 // so validate that now.
1145 if (request.getOperation().getMetaData().getEtag() == null) {
1147 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1148 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1152 // Generate the update request...
1153 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1154 request.getOperation().getMetaData().getEtag()));
1156 // ...and append the document that we want to update.
1158 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1159 } catch (JsonProcessingException e) {
1160 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1164 // Delete an existing document.
1167 // Make sure that the supplied document URL is formatted
1169 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1170 fails.add(generateRejectionEntry(request.getOperationType(),
1171 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1172 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1176 // Validate that the specified index actually exists before we
1177 // try to perform the delete.
1178 if (!indexExists(request.getIndex())) {
1180 fails.add(generateRejectionEntry(request.getOperationType(),
1181 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1182 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1186 // Validate that the document we are trying to update actually
1187 // exists before we try to perform the delete.
1188 if (!documentExists(request.getIndex(), request.getId())) {
1190 fails.add(generateRejectionEntry(request.getOperationType(),
1191 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1192 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1196 // It is mandatory that a version be supplied for a delete operation,
1197 // so validate that now.
1198 if (request.getOperation().getMetaData().getEtag() == null) {
1200 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1201 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1205 // Generate the delete request.
1206 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1207 request.getOperation().getMetaData().getEtag()));
1215 private boolean indexExists(String index) throws DocumentStoreOperationException {
1216 return isSuccess(checkIndexExistence(index));
1219 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1220 return isSuccess(checkDocumentExistence(index, id));
1224 * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1227 * @param rejectReason - A message describing why the operation was rejected.
1228 * @param anId - The identifier associated with the document being acted on.
1229 * @param statusCode - An HTTP status code.
1230 * @return - A result set item.
1232 private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1233 String anId, int statusCode, String originalUrl) {
1235 ElasticSearchError err = new ElasticSearchError();
1236 err.setReason(rejectReason);
1238 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1241 op.setStatus(statusCode);
1243 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1245 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1249 rejectionResult.setCreate(op);
1252 rejectionResult.setIndex(op);
1255 rejectionResult.setDelete(op);
1260 return rejectionResult;
1264 * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1265 * marshals it into a Java object.
1267 * @param jsonResult - The bulk operations response returned from ElasticSearch.
1268 * @return - The marshalled response.
1269 * @throws JsonParseException
1270 * @throws JsonMappingException
1271 * @throws IOException
1273 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1274 if (jsonResult != null) {
1275 if (logger.isDebugEnabled()) {
1276 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1279 ObjectMapper mapper = new ObjectMapper();
1280 mapper.setSerializationInclusion(Include.NON_EMPTY);
1282 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1289 * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1291 * @param esResult - ElasticSearch bulk operations response.
1292 * @return - A generic result set.
1294 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1295 List<ElasticSearchResultItem> rejectedOps) {
1297 int totalSuccess = 0;
1300 if (logger.isDebugEnabled()) {
1302 logger.debug("ESController: Build generic result set. ES Results: "
1303 + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1306 // Build a combined list of result items from the results returned
1307 // from ElasticSearch and the list of operations that we rejected
1308 // without sending to ElasticSearch.
1309 List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1310 if (esResult != null) {
1311 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1313 combinedResults.addAll(rejectedOps);
1315 // Iterate over the individual results in the resulting result set.
1316 StringBuilder resultsBuilder = new StringBuilder();
1317 AtomicBoolean firstItem = new AtomicBoolean(true);
1318 for (ElasticSearchResultItem item : combinedResults) {
1320 // Increment the operation counts.
1322 if (isSuccessCode(item.operationStatus().getStatus())) {
1328 // Prepend a comma to our response string unless this it the
1329 // first result in the set.
1330 if (!firstItem.compareAndSet(true, false)) {
1331 resultsBuilder.append(", ");
1334 // Append the current result as a generic json structure.
1335 resultsBuilder.append(item.toJson());
1338 return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1339 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1344 * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1346 * @param indexName - The index to look for.
1347 * @return - An operation result indicating the success or failure of the check.
1348 * @throws DocumentStoreOperationException
1350 public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1351 // Grab the current time so we can use it to generate a metrics log.
1352 MdcOverride override = getStartTime(new MdcOverride());
1354 String fullUrl = getFullUrl("/" + indexName, false);
1355 HttpURLConnection conn = initializeConnection(fullUrl);
1358 conn.setRequestMethod("HEAD");
1360 } catch (ProtocolException e) {
1361 shutdownConnection(conn);
1362 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1365 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1369 resultCode = conn.getResponseCode();
1370 } catch (IOException e) {
1371 shutdownConnection(conn);
1372 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
1374 logger.debug(MSG_RESPONSE_CODE + resultCode);
1376 OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
1377 logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
1378 shutdownConnection(conn);
1383 private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1384 return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1387 private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1388 throws DocumentStoreOperationException {
1389 return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1392 private void buildDocumentResult(DocumentOperationResult result, String index)
1393 throws DocumentStoreOperationException {
1395 JSONParser parser = new JSONParser();
1398 root = (JSONObject) parser.parse(result.getResult());
1399 if (isSuccess(result)) {
1400 // Success response object
1401 Document doc = new Document();
1402 doc.setEtag(result.getResultVersion());
1403 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1405 doc.setContent((JSONObject) root.get("_source"));
1406 result.setDocument(doc);
1409 // Error response object
1410 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1411 if (error != null) {
1413 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1417 } catch (Exception e) {
1418 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1422 private String buildDocumentResponseUrl(String index, String id) {
1423 return ApiUtils.buildDocumentUri(index, id);
1426 private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1427 JSONParser parser = new JSONParser();
1431 root = (JSONObject) parser.parse(result.getResult());
1432 if (isSuccess(result)) {
1433 JSONObject hits = (JSONObject) root.get("hits");
1434 JSONArray hitArray = (JSONArray) hits.get("hits");
1435 SearchHits searchHits = new SearchHits();
1436 searchHits.setTotalHits(hits.get("total").toString());
1437 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1439 for (int i = 0; i < hitArray.size(); i++) {
1440 JSONObject hit = (JSONObject) hitArray.get(i);
1441 SearchHit searchHit = new SearchHit();
1442 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1443 Document doc = new Document();
1444 if (hit.get(JSON_ATTR_VERSION) != null) {
1445 doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1449 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1450 doc.setContent((JSONObject) hit.get("_source"));
1451 searchHit.setDocument(doc);
1452 searchHitArray.add(searchHit);
1454 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1455 result.setSearchResult(searchHits);
1457 JSONObject aggregations = (JSONObject) root.get("aggregations");
1458 if (aggregations != null) {
1459 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1460 AggregationResults aggs = new AggregationResults();
1461 aggs.setAggregations(aggResults);
1462 result.setAggregationResult(aggs);
1467 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1468 if (error != null) {
1470 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1473 } catch (Exception e) {
1474 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1478 private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1479 JSONParser parser = new JSONParser();
1482 root = (JSONObject) parser.parse(result.getResult());
1483 if (isSuccess(result)) {
1484 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1485 JSONObject hitdata = (JSONObject) hitArray.get(0);
1486 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1487 SuggestHits suggestHits = new SuggestHits();
1488 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1490 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1492 for (int i = 0; i < optionsArray.size(); i++) {
1493 JSONObject hit = (JSONObject) optionsArray.get(i);
1495 SuggestHit suggestHit = new SuggestHit();
1496 suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1497 suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1498 Document doc = new Document();
1499 if (hit.get(JSON_ATTR_VERSION) != null) {
1500 doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1503 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1505 doc.setContent((JSONObject) hit.get("payload"));
1506 suggestHit.setDocument(doc);
1507 suggestHitArray.add(suggestHit);
1509 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1510 result.setSuggestResult(suggestHits);
1512 JSONObject aggregations = (JSONObject) root.get("aggregations");
1513 if (aggregations != null) {
1514 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1515 AggregationResults aggs = new AggregationResults();
1516 aggs.setAggregations(aggResults);
1517 result.setAggregationResult(aggs);
1522 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1523 if (error != null) {
1525 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1528 } catch (Exception e) {
1529 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1534 * Record the timing of the operation in the metrics log.
1537 private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1539 metricsLogger.info(message,
1541 .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1542 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),