2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.eclipse.jetty.http.HttpStatus;
57 import org.json.simple.JSONArray;
58 import org.json.simple.JSONObject;
59 import org.json.simple.parser.JSONParser;
60 import org.json.simple.parser.ParseException;
61 import org.onap.aai.cl.api.LogFields;
62 import org.onap.aai.cl.api.LogLine;
63 import org.onap.aai.cl.api.Logger;
64 import org.onap.aai.cl.eelf.LoggerFactory;
65 import org.onap.aai.cl.mdc.MdcContext;
66 import org.onap.aai.cl.mdc.MdcOverride;
67 import org.onap.aai.sa.rest.AnalysisConfiguration;
68 import org.onap.aai.sa.rest.ApiUtils;
69 import org.onap.aai.sa.rest.BulkRequest;
70 import org.onap.aai.sa.rest.BulkRequest.OperationType;
71 import org.onap.aai.sa.rest.DocumentSchema;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
73 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
75 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
76 import org.onap.aai.sa.searchdbabstraction.entity.Document;
77 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
80 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
81 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
82 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
84 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
85 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
86 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
87 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
88 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
89 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
90 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
91 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
94 * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
96 public class ElasticSearchHttpController implements DocumentStoreInterface {
98 private static ElasticSearchHttpController instance = null;
100 private static final Logger logger =
101 LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
102 private static final Logger metricsLogger =
103 LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
105 private static final String JSON_ATTR_VERSION = "_version";
106 private static final String JSON_ATTR_ERROR = "error";
107 private static final String JSON_ATTR_REASON = "reason";
109 private static final String DEFAULT_TYPE = "default";
110 private static final String QUERY_PARAM_VERSION = "?version=";
112 private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
113 private static final String MSG_RESPONSE_CODE = "Response Code : ";
114 private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
115 private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
116 private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
117 private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
118 "Failed to get the response code from the connection.";
119 private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
121 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
122 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
123 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
124 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
125 private static final String BULK_IMPORT_INDEX_TEMPLATE =
126 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
127 private static final String BULK_DELETE_TEMPLATE =
128 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
130 private final ElasticSearchConfig config;
132 protected AnalysisConfiguration analysisConfig;
135 public ElasticSearchHttpController(ElasticSearchConfig config) {
136 this.config = config;
137 analysisConfig = new AnalysisConfiguration();
140 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
142 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
143 } catch (Exception e) {
144 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
149 public static ElasticSearchHttpController getInstance() {
150 synchronized (ElasticSearchHttpController.class) {
151 if (instance == null) {
152 Properties properties = new Properties();
153 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
155 properties.load(new FileInputStream(file));
156 } catch (Exception e) {
157 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
158 e.getLocalizedMessage());
161 ElasticSearchConfig config = new ElasticSearchConfig(properties);
162 instance = new ElasticSearchHttpController(config);
169 public AnalysisConfiguration getAnalysisConfig() {
170 return analysisConfig;
174 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
176 // Submit the request to ElasticSearch to create the index using a default document type.
177 OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
178 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
180 // ElasticSearch will return us a 200 code on success when we
181 // want to report a 201, so translate the result here.
182 if (result.getResultCode() == Status.OK.getStatusCode()) {
183 result.setResultCode(Status.CREATED.getStatusCode());
186 if (isSuccess(result)) {
187 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
190 } catch (DocumentStoreOperationException | IOException e) {
191 return new OperationResultBuilder().useDefaults()
192 .failureCause("Document store operation failure. Cause: " + e.getMessage()).build();
198 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
200 OperationResult result = createTable(index, dynamicSchema);
202 // ElasticSearch will return us a 200 code on success when we
203 // want to report a 201, so translate the result here.
204 if (result.getResultCode() == Status.OK.getStatusCode()) {
205 result.setResultCode(Status.CREATED.getStatusCode());
207 if (isSuccess(result)) {
208 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
211 } catch (DocumentStoreOperationException e) {
212 return new OperationResultBuilder().useDefaults()
213 .failureCause("Document store operation failure. Cause: " + e.getMessage()).build();
218 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
219 // Grab the current time so we can use it to generate a metrics log.
220 MdcOverride override = getStartTime(new MdcOverride());
222 String fullUrl = getFullUrl("/" + indexName + "/", false);
223 HttpURLConnection conn = initializeConnection(fullUrl);
225 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
228 conn.setRequestMethod("DELETE");
229 } catch (ProtocolException e) {
230 shutdownConnection(conn);
231 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
234 OperationResult opResult = handleResponse(conn);
235 logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
236 shutdownConnection(conn);
242 protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
243 throws DocumentStoreOperationException {
244 if (indexSettings == null) {
245 logger.debug("No settings provided.");
248 if (indexMappings == null) {
249 logger.debug("No mappings provided.");
252 // Grab the current time so we can use it to generate a metrics log.
253 MdcOverride override = getStartTime(new MdcOverride());
255 String fullUrl = getFullUrl("/" + indexName + "/", false);
256 HttpURLConnection conn = initializeConnection(fullUrl);
259 conn.setRequestMethod("PUT");
260 } catch (ProtocolException e) {
261 shutdownConnection(conn);
262 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
265 StringBuilder sb = new StringBuilder(128);
266 sb.append("{ \"settings\" : ");
267 sb.append(indexSettings);
270 sb.append("\"mappings\" : {");
271 sb.append("\"" + typeName + "\" :");
272 sb.append(indexMappings);
276 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
277 } catch (IOException e) {
278 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
279 throw new DocumentStoreOperationException(e.getMessage(), e);
282 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
283 logger.debug("Request content: " + sb);
285 OperationResult opResult = handleResponse(conn);
286 shutdownConnection(conn);
287 logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
293 * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
296 * @param indexName - The name of the index to be created
297 * @param settingsAndMappings - The actual JSON object that will define the index
298 * @return - The operation result of writing into Elasticsearch
299 * @throws DocumentStoreOperationException
301 protected OperationResult createTable(String indexName, String settingsAndMappings)
302 throws DocumentStoreOperationException {
303 // Grab the current time so we can use it to generate a metrics log.
304 MdcOverride override = getStartTime(new MdcOverride());
306 String fullUrl = getFullUrl("/" + indexName + "/", false);
307 HttpURLConnection conn = initializeConnection(fullUrl);
310 conn.setRequestMethod("PUT");
311 } catch (ProtocolException e) {
312 shutdownConnection(conn);
313 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
317 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
318 } catch (IOException e) {
319 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
320 throw new DocumentStoreOperationException(e.getMessage());
323 OperationResult result = handleResponse(conn);
324 logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
330 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
331 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
333 if (!allowImplicitIndexCreation) {
334 // Before we do anything, make sure that the specified index actually exists in the
335 // document store - we don't want to rely on ElasticSearch to fail the document
336 // create because it could be configured to implicitly create a non-existent index,
337 // which can lead to hard-to-debug behaviour with queries down the road.
338 OperationResult indexExistsResult = checkIndexExistence(indexName);
339 if (!isSuccess(indexExistsResult)) {
340 String resultMsg = "Document Index '" + indexName + "' does not exist.";
341 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
342 .result(resultMsg).failureCause(resultMsg).build();
346 if (document.getId() == null || document.getId().isEmpty()) {
347 return createDocumentWithoutId(indexName, document);
349 return createDocumentWithId(indexName, document);
354 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
355 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
357 if (!allowImplicitIndexCreation) {
358 // Before we do anything, make sure that the specified index actually exists in the
359 // document store - we don't want to rely on ElasticSearch to fail the document
360 // create because it could be configured to implicitly create a non-existent index,
361 // which can lead to hard-to-debug behaviour with queries down the road.
362 OperationResult indexExistsResult = checkIndexExistence(indexName);
363 if (!isSuccess(indexExistsResult)) {
364 DocumentOperationResult opResult = new DocumentOperationResult();
365 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
366 String resultMsg = "Document Index '" + indexName + "' does not exist.";
367 opResult.setResult(resultMsg);
368 opResult.setFailureCause(resultMsg);
373 // Grab the current time so we can use it to generate a metrics log.
374 MdcOverride override = getStartTime(new MdcOverride());
376 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
377 + document.getVersion(), false);
378 HttpURLConnection conn = initializeConnection(fullUrl);
381 conn.setRequestMethod("PUT");
382 } catch (ProtocolException e) {
383 shutdownConnection(conn);
384 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
387 attachDocument(conn, document);
389 logger.debug("Sending 'PUT' request to: " + conn.getURL());
391 DocumentOperationResult opResult = getOperationResult(conn);
392 buildDocumentResult(opResult, indexName);
394 logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
396 shutdownConnection(conn);
402 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
403 throws DocumentStoreOperationException {
404 // Grab the current time so we can use it to generate a metrics log.
405 MdcOverride override = getStartTime(new MdcOverride());
407 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
408 + document.getVersion(), false);
409 HttpURLConnection conn = initializeConnection(fullUrl);
412 conn.setRequestMethod("DELETE");
413 } catch (ProtocolException e) {
414 shutdownConnection(conn);
415 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
418 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
420 DocumentOperationResult opResult = getOperationResult(conn);
421 buildDocumentResult(opResult, indexName);
422 // supress the etag and url in response for delete as they are not required
423 if (opResult.getDocument() != null) {
424 opResult.getDocument().setEtag(null);
425 opResult.getDocument().setUrl(null);
428 logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
430 shutdownConnection(conn);
436 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
437 throws DocumentStoreOperationException {
438 // Grab the current time so we can use it to generate a metrics log.
439 MdcOverride override = getStartTime(new MdcOverride());
441 String fullUrl = null;
442 if (document.getVersion() == null) {
443 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
445 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
446 + document.getVersion(), false);
448 HttpURLConnection conn = initializeConnection(fullUrl);
450 logger.debug("\nSending 'GET' request to: " + conn.getURL());
452 DocumentOperationResult opResult = getOperationResult(conn);
453 buildDocumentResult(opResult, indexName);
455 logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
457 shutdownConnection(conn);
463 public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
465 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
467 // Grab the current time so we can use it to generate a metrics log.
468 MdcOverride override = getStartTime(new MdcOverride());
470 HttpURLConnection conn = initializeConnection(fullUrl);
473 conn.setRequestMethod("GET");
474 } catch (ProtocolException e) {
475 shutdownConnection(conn);
476 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
479 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
481 SearchOperationResult opResult = getSearchOperationResult(conn);
482 buildSearchResult(opResult, indexName);
484 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
490 public SearchOperationResult searchWithPayload(String indexName, String query)
491 throws DocumentStoreOperationException {
492 if (logger.isDebugEnabled()) {
493 logger.debug("Querying index: " + indexName + " with query string: " + query);
496 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
498 // Grab the current time so we can use it to generate a metrics log.
499 MdcOverride override = getStartTime(new MdcOverride());
501 HttpURLConnection conn = initializeConnection(fullUrl);
504 conn.setRequestMethod("POST");
505 } catch (ProtocolException e) {
506 shutdownConnection(conn);
507 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
510 attachContent(conn, query);
512 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
513 logger.debug("Request body = Elasticsearch query = " + query);
515 SearchOperationResult opResult = getSearchOperationResult(conn);
516 buildSearchResult(opResult, indexName);
518 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
520 shutdownConnection(conn);
527 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
528 throws DocumentStoreOperationException {
529 if (logger.isDebugEnabled()) {
530 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
533 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
535 // Grab the current time so we can use it to generate a metrics log.
536 MdcOverride override = getStartTime(new MdcOverride());
538 HttpURLConnection conn = initializeConnection(fullUrl);
541 conn.setRequestMethod("POST");
542 } catch (ProtocolException e) {
543 shutdownConnection(conn);
544 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
547 attachContent(conn, query);
549 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
550 logger.debug("Request body = Elasticsearch query = " + query);
552 SearchOperationResult opResult = getSearchOperationResult(conn);
553 buildSuggestResult(opResult, indexName);
555 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
557 shutdownConnection(conn);
563 public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
564 if (logger.isDebugEnabled()) {
565 StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
567 for (BulkRequest request : requests) {
568 dbgString.append("[").append(request).append("] ");
571 logger.debug(dbgString.toString());
574 // Grab the current time so we can use it to generate a metrics log.
575 MdcOverride override = getStartTime(new MdcOverride());
577 // Parse the supplied set of operations.
578 // Iterate over the list of operations which we were provided and
579 // translate them into a format that ElasticSearh understands.
581 StringBuilder esOperationSet = new StringBuilder(128);
582 List<ElasticSearchResultItem> rejected = new ArrayList<>();
583 for (BulkRequest request : requests) {
585 // Convert the request to the syntax ElasticSearch likes.
586 if (buildEsOperation(request, esOperationSet, rejected)) {
591 ElasticSearchBulkOperationResult opResult = null;
594 // Open an HTTP connection to the ElasticSearch back end.
595 String fullUrl = getFullUrl("/_bulk", false);
597 HttpURLConnection conn;
600 url = new URL(fullUrl);
601 conn = (HttpURLConnection) url.openConnection();
602 conn.setRequestMethod("PUT");
603 conn.setDoOutput(true);
604 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
605 conn.setRequestProperty("Connection", "Close");
607 } catch (IOException e) {
609 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
610 if (logger.isDebugEnabled()) {
611 logger.debug(Throwables.getStackTraceAsString(e));
614 throw new DocumentStoreOperationException(
615 "Failed to open connection to document store. Cause: " + e.getMessage(), e);
618 StringBuilder bulkResult = new StringBuilder(128);
620 // Create an output stream to write our request to.
621 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
623 if (logger.isDebugEnabled()) {
624 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
625 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
628 // Write the resulting request string to our output stream. (this sends the request to ES?)
629 out.write(esOperationSet.toString());
632 // Open an input stream on our connection in order to read back the results.
633 InputStream is = conn.getInputStream();
634 InputStreamReader inputstreamreader = new InputStreamReader(is);
635 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
637 // Read the contents of the input stream into our result string...
638 String esResponseString = null;
640 while ((esResponseString = bufferedreader.readLine()) != null) {
641 bulkResult.append(esResponseString).append("\n");
644 } catch (IOException e) {
646 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
647 if (logger.isDebugEnabled()) {
648 StringWriter sw = new StringWriter();
649 e.printStackTrace(new PrintWriter(sw));
650 logger.debug(sw.toString());
653 throw new DocumentStoreOperationException(
654 "Failure interacting with document store. Cause: " + e.getMessage(), e);
657 if (logger.isDebugEnabled()) {
658 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
661 // ...and marshal the resulting string into a Java object.
663 opResult = marshallEsBulkResult(bulkResult.toString());
665 } catch (IOException e) {
666 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
667 if (logger.isDebugEnabled()) {
668 logger.debug(Throwables.getStackTraceAsString(e));
671 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
676 OperationResult result = new OperationResultBuilder() //
677 .resultCode(HttpStatus.MULTI_STATUS_207) //
678 .result(buildGenericBulkResultSet(opResult, rejected)) //
681 // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
683 String resultStringForMetricsLog = result.getResult();
684 if (isSuccess(result)) {
685 resultStringForMetricsLog =
686 resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
689 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
691 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
692 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
700 * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
702 * @param indexName - The index to look for.
703 * @return - An operation result indicating the success or failure of the check.
704 * @throws DocumentStoreOperationException
706 private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
707 // Grab the current time so we can use it to generate a metrics log.
708 MdcOverride override = getStartTime(new MdcOverride());
710 String fullUrl = getFullUrl("/" + indexName, false);
711 HttpURLConnection conn = initializeConnection(fullUrl);
714 conn.setRequestMethod("HEAD");
716 } catch (ProtocolException e) {
717 shutdownConnection(conn);
718 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
721 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
725 resultCode = conn.getResponseCode();
726 } catch (IOException e) {
727 shutdownConnection(conn);
728 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
730 logger.debug(MSG_RESPONSE_CODE + resultCode);
732 OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
733 logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
734 shutdownConnection(conn);
739 private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
740 throws DocumentStoreOperationException {
741 // check if the document already exists
742 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
744 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
745 if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
746 opResult.setFailureCause("A document with the same id already exists.");
748 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
750 opResult.setResultCode(Status.CONFLICT.getStatusCode());
754 // Grab the current time so we can use it to generate a metrics log.
755 MdcOverride override = getStartTime(new MdcOverride());
757 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
758 HttpURLConnection conn = initializeConnection(fullUrl);
761 conn.setRequestMethod("PUT");
762 } catch (ProtocolException e) {
763 shutdownConnection(conn);
764 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
767 attachDocument(conn, document);
769 logger.debug("Sending 'PUT' request to: " + conn.getURL());
771 opResult = getOperationResult(conn);
772 buildDocumentResult(opResult, indexName);
774 logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
776 shutdownConnection(conn);
781 private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
782 throws DocumentStoreOperationException {
783 // Grab the current time so we can use it to generate a metrics log.
784 MdcOverride override = getStartTime(new MdcOverride());
786 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
787 HttpURLConnection conn = initializeConnection(fullUrl);
790 conn.setRequestMethod("POST");
791 } catch (ProtocolException e) {
792 shutdownConnection(conn);
793 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
796 attachDocument(conn, document);
798 logger.debug("Sending 'POST' request to: " + conn.getURL());
800 DocumentOperationResult response = getOperationResult(conn);
801 buildDocumentResult(response, indexName);
803 logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
805 shutdownConnection(conn);
810 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
811 throws DocumentStoreOperationException {
812 conn.setRequestProperty("Connection", "Close");
813 attachContent(conn, doc.getContentInJson());
816 private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
817 throws DocumentStoreOperationException {
818 // Grab the current time so we can use it to generate a metrics log.
819 MdcOverride override = getStartTime(new MdcOverride());
821 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
822 HttpURLConnection conn = initializeConnection(fullUrl);
825 conn.setRequestMethod("HEAD");
826 } catch (ProtocolException e) {
827 shutdownConnection(conn);
828 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
831 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
835 resultCode = conn.getResponseCode();
836 } catch (IOException e) {
837 shutdownConnection(conn);
838 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
841 logger.debug(MSG_RESPONSE_CODE + resultCode);
843 DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
844 .useDefaults().resultCode(resultCode).build();
846 logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
847 shutdownConnection(conn);
852 private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
853 OutputStream outputStream = null;
854 OutputStreamWriter out = null;
857 outputStream = conn.getOutputStream();
858 } catch (IOException e) {
859 shutdownConnection(conn);
860 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
863 out = new OutputStreamWriter(outputStream);
868 } catch (IOException e) {
869 shutdownConnection(conn);
870 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
874 private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
876 HttpURLConnection conn = null;
879 url = new URL(fullUrl);
880 } catch (MalformedURLException e) {
881 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
885 conn = (HttpURLConnection) url.openConnection();
886 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
887 conn.setDoOutput(true);
888 } catch (IOException e) {
889 shutdownConnection(conn);
890 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
896 private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
897 return handleResponse(conn, new OperationResultBuilder().useDefaults());
900 private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
901 throws DocumentStoreOperationException {
905 resultCode = conn.getResponseCode();
906 } catch (IOException e) {
907 shutdownConnection(conn);
908 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
911 logger.debug(MSG_RESPONSE_CODE + resultCode);
913 InputStream inputStream = null;
915 if (!isSuccessCode(resultCode)) {
916 inputStream = conn.getErrorStream();
919 inputStream = conn.getInputStream();
920 } catch (IOException e) {
921 shutdownConnection(conn);
922 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
926 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
927 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
929 StringBuilder result = new StringBuilder(128);
930 String string = null;
933 while ((string = bufferedreader.readLine()) != null) {
934 result.append(string).append("\n");
936 } catch (IOException e) {
937 shutdownConnection(conn);
938 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
941 if (resultCode == Status.CONFLICT.getStatusCode()) {
942 rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
944 rb.resultCode(resultCode);
946 if (logger.isDebugEnabled()) {
947 logger.debug("Raw result string from ElasticSearch = " + result.toString());
949 rb.result(result.toString());
950 rb.resultVersion(extractVersion(result.toString()));
954 private String extractVersion(String result) {
955 JSONParser parser = new JSONParser();
956 String version = null;
958 JSONObject root = (JSONObject) parser.parse(result);
959 if (root.get(JSON_ATTR_VERSION) != null) {
960 version = root.get(JSON_ATTR_VERSION).toString();
962 } catch (ParseException e) {
963 // Not all responses from ElasticSearch include a version, so
964 // if we don't get one back, just return an empty string rather
965 // than trigger a false failure.
972 * This convenience method gets the current system time and stores it in an attribute in the supplied
973 * {@link MdcOverride} object so that it can be used later by the metrics logger.
975 * @param override - The {@link MdcOverride} object to update.
976 * @return - The supplied {@link MdcOverride} object.
978 private MdcOverride getStartTime(MdcOverride override) {
980 // Grab the current time...
981 long startTimeInMs = System.currentTimeMillis();
983 // ...and add it as an attribute to the supplied MDC Override
985 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
986 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
988 // Return the MdcOverride object that we were passed.
989 // This looks odd, but it allows us to do stuff like:
991 // MdcOverride ov = getStartTime(new MdcOverride())
993 // which is quite handy, but also allows us to pass in an existing
994 // MdcOverride object which already has some attributes set.
998 private boolean isSuccess(OperationResult result) {
999 return isSuccessCode(result.getResultCode());
1002 private boolean isSuccessCode(int statusCode) {
1003 return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
1006 private OperationResult checkConnection() throws IOException {
1007 String fullUrl = getFullUrl("/_cluster/health", false);
1009 HttpURLConnection conn = null;
1011 url = new URL(fullUrl);
1012 conn = (HttpURLConnection) url.openConnection();
1013 conn.setRequestMethod("GET");
1014 conn.setDoOutput(true);
1015 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
1017 int resultCode = conn.getResponseCode();
1018 logger.debug("getClusterHealth() response Code : " + resultCode);
1020 shutdownConnection(conn);
1022 return new OperationResultBuilder().resultCode(resultCode).build();
1025 private String getFullUrl(String resourceUrl, boolean isSecure) {
1027 final String host = config.getIpAddress();
1028 final String port = config.getHttpPort();
1031 return String.format("https://%s:%s%s", host, port, resourceUrl);
1033 return String.format("http://%s:%s%s", host, port, resourceUrl);
1037 private void shutdownConnection(HttpURLConnection connection) {
1038 if (connection == null) {
1042 final String methodName = "shutdownConnection";
1043 InputStream inputstream = null;
1044 OutputStream outputstream = null;
1047 inputstream = connection.getInputStream();
1048 } catch (IOException e) {
1049 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
1051 if (inputstream != null) {
1053 inputstream.close();
1054 } catch (IOException e) {
1055 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
1061 outputstream = connection.getOutputStream();
1062 } catch (IOException e) {
1063 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
1065 if (outputstream != null) {
1067 outputstream.close();
1068 } catch (IOException e) {
1069 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
1074 connection.disconnect();
1078 * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1080 * @param request - The request to be performed.
1081 * @param sb - The string builder to append the json data to
1082 * @throws DocumentStoreOperationException
1084 private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1085 throws DocumentStoreOperationException {
1087 boolean retVal = true;
1088 // What kind of operation are we performing?
1089 switch (request.getOperationType()) {
1091 // Create a new document.
1094 // Make sure that we were supplied a document payload.
1095 if (request.getOperation().getDocument() == null) {
1097 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1098 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1102 // Make sure that the supplied document URL is formatted
1104 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1105 fails.add(generateRejectionEntry(request.getOperationType(),
1106 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1107 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1111 // Validate that the specified index actually exists before we
1112 // try to perform the create.
1113 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1115 fails.add(generateRejectionEntry(request.getOperationType(),
1116 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1117 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1121 // If we were supplied an id for the new document, then
1122 // include it in the bulk operation to Elastic Search
1123 if (request.getId() == null) {
1125 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1127 // Otherwise, we just leave that parameter off and ElasticSearch
1128 // will generate one for us.
1130 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1135 // Append the document that we want to create.
1136 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1137 } catch (JsonProcessingException e) {
1138 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1143 // Update an existing document.
1146 // Make sure that we were supplied a document payload.
1147 if (request.getOperation().getDocument() == null) {
1149 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1150 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1154 // Make sure that the supplied document URL is formatted
1156 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1157 fails.add(generateRejectionEntry(request.getOperationType(),
1158 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1159 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1163 // Validate that the specified index actually exists before we
1164 // try to perform the update.
1165 if (!indexExists(request.getIndex())) {
1167 fails.add(generateRejectionEntry(request.getOperationType(),
1168 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1169 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1173 // Validate that the document we are trying to update actually
1174 // exists before we try to perform the update.
1175 if (!documentExists(request.getIndex(), request.getId())) {
1177 fails.add(generateRejectionEntry(request.getOperationType(),
1178 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1179 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1183 // It is mandatory that a version be supplied for an update operation,
1184 // so validate that now.
1185 if (request.getOperation().getMetaData().getEtag() == null) {
1187 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1188 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1192 // Generate the update request...
1193 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1194 request.getOperation().getMetaData().getEtag()));
1196 // ...and append the document that we want to update.
1198 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1199 } catch (JsonProcessingException e) {
1200 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1204 // Delete an existing document.
1207 // Make sure that the supplied document URL is formatted
1209 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1210 fails.add(generateRejectionEntry(request.getOperationType(),
1211 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1212 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1216 // Validate that the specified index actually exists before we
1217 // try to perform the delete.
1218 if (!indexExists(request.getIndex())) {
1220 fails.add(generateRejectionEntry(request.getOperationType(),
1221 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1222 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1226 // Validate that the document we are trying to update actually
1227 // exists before we try to perform the delete.
1228 if (!documentExists(request.getIndex(), request.getId())) {
1230 fails.add(generateRejectionEntry(request.getOperationType(),
1231 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1232 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1236 // It is mandatory that a version be supplied for a delete operation,
1237 // so validate that now.
1238 if (request.getOperation().getMetaData().getEtag() == null) {
1240 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1241 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1245 // Generate the delete request.
1246 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1247 request.getOperation().getMetaData().getEtag()));
1255 private boolean indexExists(String index) throws DocumentStoreOperationException {
1256 return isSuccess(checkIndexExistence(index));
1259 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1260 return isSuccess(checkDocumentExistence(index, id));
1264 * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1267 * @param rejectReason - A message describing why the operation was rejected.
1268 * @param anId - The identifier associated with the document being acted on.
1269 * @param statusCode - An HTTP status code.
1270 * @return - A result set item.
1272 private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1273 String anId, int statusCode, String originalUrl) {
1275 ElasticSearchError err = new ElasticSearchError();
1276 err.setReason(rejectReason);
1278 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1281 op.setStatus(statusCode);
1283 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1285 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1289 rejectionResult.setCreate(op);
1292 rejectionResult.setIndex(op);
1295 rejectionResult.setDelete(op);
1300 return rejectionResult;
1304 * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1305 * marshals it into a Java object.
1307 * @param jsonResult - The bulk operations response returned from ElasticSearch.
1308 * @return - The marshalled response.
1309 * @throws JsonParseException
1310 * @throws JsonMappingException
1311 * @throws IOException
1313 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1314 if (jsonResult != null) {
1315 if (logger.isDebugEnabled()) {
1316 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1319 ObjectMapper mapper = new ObjectMapper();
1320 mapper.setSerializationInclusion(Include.NON_EMPTY);
1322 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1329 * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1331 * @param esResult - ElasticSearch bulk operations response.
1332 * @return - A generic result set.
1334 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1335 List<ElasticSearchResultItem> rejectedOps) {
1337 int totalSuccess = 0;
1340 if (logger.isDebugEnabled()) {
1342 logger.debug("ESController: Build generic result set. ES Results: "
1343 + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1346 // Build a combined list of result items from the results returned
1347 // from ElasticSearch and the list of operations that we rejected
1348 // without sending to ElasticSearch.
1349 List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1350 if (esResult != null) {
1351 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1353 combinedResults.addAll(rejectedOps);
1355 // Iterate over the individual results in the resulting result set.
1356 StringBuilder resultsBuilder = new StringBuilder();
1357 AtomicBoolean firstItem = new AtomicBoolean(true);
1358 for (ElasticSearchResultItem item : combinedResults) {
1360 // Increment the operation counts.
1362 if (isSuccessCode(item.operationStatus().getStatus())) {
1368 // Prepend a comma to our response string unless this it the
1369 // first result in the set.
1370 if (!firstItem.compareAndSet(true, false)) {
1371 resultsBuilder.append(", ");
1374 // Append the current result as a generic json structure.
1375 resultsBuilder.append(item.toJson());
1378 return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1379 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1382 private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1383 return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1386 private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1387 throws DocumentStoreOperationException {
1388 return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1391 private void buildDocumentResult(DocumentOperationResult result, String index)
1392 throws DocumentStoreOperationException {
1394 JSONParser parser = new JSONParser();
1397 root = (JSONObject) parser.parse(result.getResult());
1398 if (isSuccess(result)) {
1399 // Success response object
1400 Document doc = new Document();
1401 doc.setEtag(result.getResultVersion());
1402 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1404 doc.setContent((JSONObject) root.get("_source"));
1405 result.setDocument(doc);
1408 // Error response object
1409 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1410 if (error != null) {
1412 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1416 } catch (Exception e) {
1417 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1421 private String buildDocumentResponseUrl(String index, String id) {
1422 return ApiUtils.buildDocumentUri(index, id);
1425 private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1426 JSONParser parser = new JSONParser();
1430 root = (JSONObject) parser.parse(result.getResult());
1431 if (isSuccess(result)) {
1432 JSONObject hits = (JSONObject) root.get("hits");
1433 JSONArray hitArray = (JSONArray) hits.get("hits");
1434 SearchHits searchHits = new SearchHits();
1435 searchHits.setTotalHits(hits.get("total").toString());
1436 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1438 for (int i = 0; i < hitArray.size(); i++) {
1439 JSONObject hit = (JSONObject) hitArray.get(i);
1440 SearchHit searchHit = new SearchHit();
1441 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1442 Document doc = new Document();
1443 if (hit.get(JSON_ATTR_VERSION) != null) {
1444 doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1448 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1449 doc.setContent((JSONObject) hit.get("_source"));
1450 searchHit.setDocument(doc);
1451 searchHitArray.add(searchHit);
1453 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1454 result.setSearchResult(searchHits);
1456 JSONObject aggregations = (JSONObject) root.get("aggregations");
1457 if (aggregations != null) {
1458 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1459 AggregationResults aggs = new AggregationResults();
1460 aggs.setAggregations(aggResults);
1461 result.setAggregationResult(aggs);
1466 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1467 if (error != null) {
1469 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1472 } catch (Exception e) {
1473 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1477 private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1478 JSONParser parser = new JSONParser();
1481 root = (JSONObject) parser.parse(result.getResult());
1482 if (isSuccess(result)) {
1483 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1484 JSONObject hitdata = (JSONObject) hitArray.get(0);
1485 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1486 SuggestHits suggestHits = new SuggestHits();
1487 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1489 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1491 for (int i = 0; i < optionsArray.size(); i++) {
1492 JSONObject hit = (JSONObject) optionsArray.get(i);
1494 SuggestHit suggestHit = new SuggestHit();
1495 suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1496 suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1497 Document doc = new Document();
1498 if (hit.get(JSON_ATTR_VERSION) != null) {
1499 doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1502 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1504 doc.setContent((JSONObject) hit.get("payload"));
1505 suggestHit.setDocument(doc);
1506 suggestHitArray.add(suggestHit);
1508 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1509 result.setSuggestResult(suggestHits);
1511 JSONObject aggregations = (JSONObject) root.get("aggregations");
1512 if (aggregations != null) {
1513 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1514 AggregationResults aggs = new AggregationResults();
1515 aggs.setAggregations(aggResults);
1516 result.setAggregationResult(aggs);
1521 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1522 if (error != null) {
1524 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1527 } catch (Exception e) {
1528 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1533 * Record the timing of the operation in the metrics log.
1536 private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1538 metricsLogger.info(message,
1540 .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1541 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),