2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.json.simple.JSONArray;
57 import org.json.simple.JSONObject;
58 import org.json.simple.parser.JSONParser;
59 import org.json.simple.parser.ParseException;
60 import org.onap.aai.cl.api.LogFields;
61 import org.onap.aai.cl.api.LogLine;
62 import org.onap.aai.cl.api.Logger;
63 import org.onap.aai.cl.eelf.LoggerFactory;
64 import org.onap.aai.cl.mdc.MdcContext;
65 import org.onap.aai.cl.mdc.MdcOverride;
66 import org.onap.aai.sa.rest.AnalysisConfiguration;
67 import org.onap.aai.sa.rest.ApiUtils;
68 import org.onap.aai.sa.rest.BulkRequest;
69 import org.onap.aai.sa.rest.BulkRequest.OperationType;
70 import org.onap.aai.sa.rest.DocumentSchema;
71 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
73 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
75 import org.onap.aai.sa.searchdbabstraction.entity.Document;
76 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
77 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
80 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
81 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
84 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
85 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
86 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
87 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
88 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
91 * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
93 public class ElasticSearchHttpController implements DocumentStoreInterface {
95 private static ElasticSearchHttpController instance = null;
97 private static final Logger logger =
98 LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
99 private static final Logger metricsLogger =
100 LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
102 private static final String JSON_ATTR_VERSION = "_version";
103 private static final String JSON_ATTR_ERROR = "error";
104 private static final String JSON_ATTR_REASON = "reason";
106 private static final String DEFAULT_TYPE = "default";
107 private static final String QUERY_PARAM_VERSION = "?version=";
109 private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
110 private static final String MSG_RESPONSE_CODE = "Response Code : ";
111 private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
112 private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
113 private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
114 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
115 "Internal Error: ElasticSearch operation fault occurred";
116 private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
117 "Failed to get the response code from the connection.";
118 private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
120 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
121 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
122 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
123 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
124 private static final String BULK_IMPORT_INDEX_TEMPLATE =
125 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
126 private static final String BULK_DELETE_TEMPLATE =
127 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
129 private final ElasticSearchConfig config;
131 protected AnalysisConfiguration analysisConfig;
134 public ElasticSearchHttpController(ElasticSearchConfig config) {
135 this.config = config;
136 analysisConfig = new AnalysisConfiguration();
139 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
141 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
142 } catch (Exception e) {
143 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
148 public static ElasticSearchHttpController getInstance() {
149 synchronized (ElasticSearchHttpController.class) {
150 if (instance == null) {
151 Properties properties = new Properties();
152 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
154 properties.load(new FileInputStream(file));
155 } catch (Exception e) {
156 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
157 e.getLocalizedMessage());
160 ElasticSearchConfig config = new ElasticSearchConfig(properties);
161 instance = new ElasticSearchHttpController(config);
168 public AnalysisConfiguration getAnalysisConfig() {
169 return analysisConfig;
173 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
174 OperationResult result = new OperationResult();
175 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
178 // Submit the request to ElasticSearch to create the index using a
179 // default document type.
180 result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
181 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
183 // ElasticSearch will return us a 200 code on success when we
184 // want to report a 201, so translate the result here.
185 if (result.getResultCode() == Status.OK.getStatusCode()) {
186 result.setResultCode(Status.CREATED.getStatusCode());
189 if (isSuccess(result)) {
190 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
192 } catch (DocumentStoreOperationException | IOException e) {
193 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
200 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
201 OperationResult result = new OperationResult();
202 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
205 result = createTable(index, dynamicSchema);
207 // ElasticSearch will return us a 200 code on success when we
208 // want to report a 201, so translate the result here.
209 if (result.getResultCode() == Status.OK.getStatusCode()) {
210 result.setResultCode(Status.CREATED.getStatusCode());
212 if (isSuccess(result)) {
213 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
215 } catch (DocumentStoreOperationException e) {
216 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
223 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
225 // Initialize operation result with a failure codes / fault string
226 OperationResult opResult = new OperationResult();
227 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
228 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
230 // Grab the current time so we can use it to generate a metrics log.
231 MdcOverride override = getStartTime(new MdcOverride());
233 String fullUrl = getFullUrl("/" + indexName + "/", false);
234 HttpURLConnection conn = initializeConnection(fullUrl);
236 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
239 conn.setRequestMethod("DELETE");
240 } catch (ProtocolException e) {
241 shutdownConnection(conn);
242 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
245 handleResponse(conn, opResult);
247 logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
249 shutdownConnection(conn);
254 private OperationResult checkConnection() throws IOException {
255 String fullUrl = getFullUrl("/_cluster/health", false);
257 HttpURLConnection conn = null;
259 url = new URL(fullUrl);
260 conn = (HttpURLConnection) url.openConnection();
261 conn.setRequestMethod("GET");
262 conn.setDoOutput(true);
263 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
265 int resultCode = conn.getResponseCode();
266 logger.debug("getClusterHealth() response Code : " + resultCode);
267 OperationResult opResult = new OperationResult();
268 opResult.setResultCode(resultCode);
270 shutdownConnection(conn);
275 private String getFullUrl(String resourceUrl, boolean isSecure) {
277 final String host = config.getIpAddress();
278 final String port = config.getHttpPort();
281 return String.format("https://%s:%s%s", host, port, resourceUrl);
283 return String.format("http://%s:%s%s", host, port, resourceUrl);
287 private void shutdownConnection(HttpURLConnection connection) {
288 if (connection == null) {
292 final String methodName = "shutdownConnection";
293 InputStream inputstream = null;
294 OutputStream outputstream = null;
297 inputstream = connection.getInputStream();
298 } catch (IOException e) {
299 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
301 if (inputstream != null) {
304 } catch (IOException e) {
305 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
311 outputstream = connection.getOutputStream();
312 } catch (IOException e) {
313 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
315 if (outputstream != null) {
317 outputstream.close();
318 } catch (IOException e) {
319 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
324 connection.disconnect();
328 protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
329 throws DocumentStoreOperationException {
331 if (indexSettings == null) {
332 logger.debug("No settings provided.");
335 if (indexMappings == null) {
336 logger.debug("No mappings provided.");
339 OperationResult opResult = new OperationResult();
341 // Initialize operation result with a failure codes / fault string
342 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
343 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
345 // Grab the current time so we can use it to generate a metrics log.
346 MdcOverride override = getStartTime(new MdcOverride());
348 String fullUrl = getFullUrl("/" + indexName + "/", false);
349 HttpURLConnection conn = initializeConnection(fullUrl);
352 conn.setRequestMethod("PUT");
353 } catch (ProtocolException e) {
354 shutdownConnection(conn);
355 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
358 StringBuilder sb = new StringBuilder(128);
359 sb.append("{ \"settings\" : ");
360 sb.append(indexSettings);
363 sb.append("\"mappings\" : {");
364 sb.append("\"" + typeName + "\" :");
365 sb.append(indexMappings);
369 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
370 } catch (IOException e) {
371 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
372 throw new DocumentStoreOperationException(e.getMessage(), e);
375 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
376 logger.debug("Request content: " + sb.toString());
378 handleResponse(conn, opResult);
380 shutdownConnection(conn);
382 logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
388 * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
391 * @param indexName - The name of the index to be created
392 * @param settingsAndMappings - The actual JSON object that will define the index
393 * @return - The operation result of writing into Elasticsearch
394 * @throws DocumentStoreOperationException
396 protected OperationResult createTable(String indexName, String settingsAndMappings)
397 throws DocumentStoreOperationException {
398 OperationResult result = new OperationResult();
399 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
400 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
402 // Grab the current time so we can use it to generate a metrics log.
403 MdcOverride override = getStartTime(new MdcOverride());
405 String fullUrl = getFullUrl("/" + indexName + "/", false);
406 HttpURLConnection conn = initializeConnection(fullUrl);
409 conn.setRequestMethod("PUT");
410 } catch (ProtocolException e) {
411 shutdownConnection(conn);
412 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
416 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
417 } catch (IOException e) {
418 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
419 throw new DocumentStoreOperationException(e.getMessage());
421 handleResponse(conn, result);
423 logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
429 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
430 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
432 if (!allowImplicitIndexCreation) {
433 // Before we do anything, make sure that the specified index actually exists in the
434 // document store - we don't want to rely on ElasticSearch to fail the document
435 // create because it could be configured to implicitly create a non-existent index,
436 // which can lead to hard-to-debug behaviour with queries down the road.
437 OperationResult indexExistsResult = checkIndexExistence(indexName);
438 if (!isSuccess(indexExistsResult)) {
439 DocumentOperationResult opResult = new DocumentOperationResult();
440 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
441 String resultMsg = "Document Index '" + indexName + "' does not exist.";
442 opResult.setResult(resultMsg);
443 opResult.setFailureCause(resultMsg);
448 if (document.getId() == null || document.getId().isEmpty()) {
449 return createDocumentWithoutId(indexName, document);
451 return createDocumentWithId(indexName, document);
455 private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
456 throws DocumentStoreOperationException {
457 // check if the document already exists
458 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
460 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
461 if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
462 opResult.setFailureCause("A document with the same id already exists.");
464 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
466 opResult.setResultCode(Status.CONFLICT.getStatusCode());
471 // Grab the current time so we can use it to generate a metrics log.
472 MdcOverride override = getStartTime(new MdcOverride());
474 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
475 HttpURLConnection conn = initializeConnection(fullUrl);
478 conn.setRequestMethod("PUT");
479 } catch (ProtocolException e) {
480 shutdownConnection(conn);
481 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
484 attachDocument(conn, document);
486 logger.debug("Sending 'PUT' request to: " + conn.getURL());
488 opResult = getOperationResult(conn);
489 buildDocumentResult(opResult, indexName);
491 logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
493 shutdownConnection(conn);
498 private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
499 throws DocumentStoreOperationException {
500 // Grab the current time so we can use it to generate a metrics log.
501 MdcOverride override = getStartTime(new MdcOverride());
503 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
504 HttpURLConnection conn = initializeConnection(fullUrl);
507 conn.setRequestMethod("POST");
508 } catch (ProtocolException e) {
509 shutdownConnection(conn);
510 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
513 attachDocument(conn, document);
515 logger.debug("Sending 'POST' request to: " + conn.getURL());
517 DocumentOperationResult response = getOperationResult(conn);
518 buildDocumentResult(response, indexName);
520 logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
522 shutdownConnection(conn);
527 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
528 throws DocumentStoreOperationException {
529 conn.setRequestProperty("Connection", "Close");
530 attachContent(conn, doc.getContentInJson());
533 private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
534 throws DocumentStoreOperationException {
535 // Grab the current time so we can use it to generate a metrics log.
536 MdcOverride override = getStartTime(new MdcOverride());
538 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
539 HttpURLConnection conn = initializeConnection(fullUrl);
542 conn.setRequestMethod("HEAD");
543 } catch (ProtocolException e) {
544 shutdownConnection(conn);
545 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
548 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
552 resultCode = conn.getResponseCode();
553 } catch (IOException e) {
554 shutdownConnection(conn);
555 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
558 logger.debug(MSG_RESPONSE_CODE + resultCode);
560 DocumentOperationResult opResult = createDefaultOperationResult();
561 opResult.setResultCode(resultCode);
563 logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
565 shutdownConnection(conn);
571 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
572 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
574 if (!allowImplicitIndexCreation) {
575 // Before we do anything, make sure that the specified index actually exists in the
576 // document store - we don't want to rely on ElasticSearch to fail the document
577 // create because it could be configured to implicitly create a non-existent index,
578 // which can lead to hard-to-debug behaviour with queries down the road.
579 OperationResult indexExistsResult = checkIndexExistence(indexName);
580 if (!isSuccess(indexExistsResult)) {
581 DocumentOperationResult opResult = new DocumentOperationResult();
582 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
583 String resultMsg = "Document Index '" + indexName + "' does not exist.";
584 opResult.setResult(resultMsg);
585 opResult.setFailureCause(resultMsg);
590 // Grab the current time so we can use it to generate a metrics log.
591 MdcOverride override = getStartTime(new MdcOverride());
593 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
594 + document.getVersion(), false);
595 HttpURLConnection conn = initializeConnection(fullUrl);
598 conn.setRequestMethod("PUT");
599 } catch (ProtocolException e) {
600 shutdownConnection(conn);
601 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
604 attachDocument(conn, document);
606 logger.debug("Sending 'PUT' request to: " + conn.getURL());
608 DocumentOperationResult opResult = getOperationResult(conn);
609 buildDocumentResult(opResult, indexName);
611 logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
613 shutdownConnection(conn);
619 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
620 throws DocumentStoreOperationException {
621 // Grab the current time so we can use it to generate a metrics log.
622 MdcOverride override = getStartTime(new MdcOverride());
624 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
625 + document.getVersion(), false);
626 HttpURLConnection conn = initializeConnection(fullUrl);
629 conn.setRequestMethod("DELETE");
630 } catch (ProtocolException e) {
631 shutdownConnection(conn);
632 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
635 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
637 DocumentOperationResult opResult = getOperationResult(conn);
638 buildDocumentResult(opResult, indexName);
639 // supress the etag and url in response for delete as they are not required
640 if (opResult.getDocument() != null) {
641 opResult.getDocument().setEtag(null);
642 opResult.getDocument().setUrl(null);
645 logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
647 shutdownConnection(conn);
653 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
654 throws DocumentStoreOperationException {
655 // Grab the current time so we can use it to generate a metrics log.
656 MdcOverride override = getStartTime(new MdcOverride());
658 String fullUrl = null;
659 if (document.getVersion() == null) {
660 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
662 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
663 + document.getVersion(), false);
665 HttpURLConnection conn = initializeConnection(fullUrl);
667 logger.debug("\nSending 'GET' request to: " + conn.getURL());
669 DocumentOperationResult opResult = getOperationResult(conn);
670 buildDocumentResult(opResult, indexName);
672 logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
674 shutdownConnection(conn);
680 public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
682 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
684 // Grab the current time so we can use it to generate a metrics log.
685 MdcOverride override = getStartTime(new MdcOverride());
687 HttpURLConnection conn = initializeConnection(fullUrl);
690 conn.setRequestMethod("GET");
691 } catch (ProtocolException e) {
692 shutdownConnection(conn);
693 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
696 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
698 SearchOperationResult opResult = getSearchOperationResult(conn);
699 buildSearchResult(opResult, indexName);
701 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
707 public SearchOperationResult searchWithPayload(String indexName, String query)
708 throws DocumentStoreOperationException {
709 if (logger.isDebugEnabled()) {
710 logger.debug("Querying index: " + indexName + " with query string: " + query);
713 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
715 // Grab the current time so we can use it to generate a metrics log.
716 MdcOverride override = getStartTime(new MdcOverride());
718 HttpURLConnection conn = initializeConnection(fullUrl);
721 conn.setRequestMethod("POST");
722 } catch (ProtocolException e) {
723 shutdownConnection(conn);
724 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
727 attachContent(conn, query);
729 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
730 logger.debug("Request body = Elasticsearch query = " + query);
732 SearchOperationResult opResult = getSearchOperationResult(conn);
734 buildSearchResult(opResult, indexName);
736 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
738 shutdownConnection(conn);
745 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
746 throws DocumentStoreOperationException {
747 if (logger.isDebugEnabled()) {
748 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
751 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
753 // Grab the current time so we can use it to generate a metrics log.
754 MdcOverride override = getStartTime(new MdcOverride());
756 HttpURLConnection conn = initializeConnection(fullUrl);
759 conn.setRequestMethod("POST");
760 } catch (ProtocolException e) {
761 shutdownConnection(conn);
762 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
765 attachContent(conn, query);
767 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
768 logger.debug("Request body = Elasticsearch query = " + query);
770 SearchOperationResult opResult = getSearchOperationResult(conn);
771 buildSuggestResult(opResult, indexName);
773 logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
775 shutdownConnection(conn);
780 private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
781 OutputStream outputStream = null;
782 OutputStreamWriter out = null;
785 outputStream = conn.getOutputStream();
786 } catch (IOException e) {
787 shutdownConnection(conn);
788 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
791 out = new OutputStreamWriter(outputStream);
796 } catch (IOException e) {
797 shutdownConnection(conn);
798 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
802 private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
804 HttpURLConnection conn = null;
807 url = new URL(fullUrl);
808 } catch (MalformedURLException e) {
809 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
813 conn = (HttpURLConnection) url.openConnection();
814 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
815 conn.setDoOutput(true);
816 } catch (IOException e) {
817 shutdownConnection(conn);
818 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
824 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
825 throws DocumentStoreOperationException {
829 resultCode = conn.getResponseCode();
830 } catch (IOException e) {
831 shutdownConnection(conn);
832 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
835 logger.debug(MSG_RESPONSE_CODE + resultCode);
837 InputStream inputStream = null;
839 if (!isSuccessCode(resultCode)) {
840 inputStream = conn.getErrorStream();
843 inputStream = conn.getInputStream();
844 } catch (IOException e) {
845 shutdownConnection(conn);
846 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
850 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
851 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
853 StringBuilder result = new StringBuilder(128);
854 String string = null;
857 while ((string = bufferedreader.readLine()) != null) {
858 result.append(string).append("\n");
860 } catch (IOException e) {
861 shutdownConnection(conn);
862 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
865 if (resultCode == Status.CONFLICT.getStatusCode()) {
866 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
868 opResult.setResultCode(resultCode);
870 if (logger.isDebugEnabled()) {
871 logger.debug("Raw result string from ElasticSearch = " + result.toString());
873 opResult.setResult(result.toString());
874 opResult.setResultVersion(extractVersion(result.toString()));
877 private String extractVersion(String result) {
878 JSONParser parser = new JSONParser();
879 String version = null;
881 JSONObject root = (JSONObject) parser.parse(result);
882 if (root.get(JSON_ATTR_VERSION) != null) {
883 version = root.get(JSON_ATTR_VERSION).toString();
885 } catch (ParseException e) {
886 // Not all responses from ElasticSearch include a version, so
887 // if we don't get one back, just return an empty string rather
888 // than trigger a false failure.
895 * This convenience method gets the current system time and stores it in an attribute in the supplied
896 * {@link MdcOverride} object so that it can be used later by the metrics logger.
898 * @param override - The {@link MdcOverride} object to update.
899 * @return - The supplied {@link MdcOverride} object.
901 private MdcOverride getStartTime(MdcOverride override) {
903 // Grab the current time...
904 long startTimeInMs = System.currentTimeMillis();
906 // ...and add it as an attribute to the supplied MDC Override
908 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
909 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
911 // Return the MdcOverride object that we were passed.
912 // This looks odd, but it allows us to do stuff like:
914 // MdcOverride ov = getStartTime(new MdcOverride())
916 // which is quite handy, but also allows us to pass in an existing
917 // MdcOverride object which already has some attributes set.
921 private boolean isSuccess(OperationResult result) {
922 return isSuccessCode(result.getResultCode());
925 private boolean isSuccessCode(int statusCode) {
926 return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
930 public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
931 if (logger.isDebugEnabled()) {
932 StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
934 for (BulkRequest request : requests) {
935 dbgString.append("[").append(request).append("] ");
938 logger.debug(dbgString.toString());
941 // Grab the current time so we can use it to generate a metrics log.
942 MdcOverride override = getStartTime(new MdcOverride());
944 // Parse the supplied set of operations.
945 // Iterate over the list of operations which we were provided and
946 // translate them into a format that ElasticSearh understands.
948 StringBuilder esOperationSet = new StringBuilder(128);
949 List<ElasticSearchResultItem> rejected = new ArrayList<>();
950 for (BulkRequest request : requests) {
952 // Convert the request to the syntax ElasticSearch likes.
953 if (buildEsOperation(request, esOperationSet, rejected)) {
958 ElasticSearchBulkOperationResult opResult = null;
961 // Open an HTTP connection to the ElasticSearch back end.
962 String fullUrl = getFullUrl("/_bulk", false);
964 HttpURLConnection conn;
967 url = new URL(fullUrl);
968 conn = (HttpURLConnection) url.openConnection();
969 conn.setRequestMethod("PUT");
970 conn.setDoOutput(true);
971 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
972 conn.setRequestProperty("Connection", "Close");
974 } catch (IOException e) {
976 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
977 if (logger.isDebugEnabled()) {
978 logger.debug(Throwables.getStackTraceAsString(e));
981 throw new DocumentStoreOperationException(
982 "Failed to open connection to document store. Cause: " + e.getMessage(), e);
985 StringBuilder bulkResult = new StringBuilder(128);
987 // Create an output stream to write our request to.
988 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
990 if (logger.isDebugEnabled()) {
991 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
992 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
995 // Write the resulting request string to our output stream. (this sends the request to ES?)
996 out.write(esOperationSet.toString());
999 // Open an input stream on our connection in order to read back the results.
1000 InputStream is = conn.getInputStream();
1001 InputStreamReader inputstreamreader = new InputStreamReader(is);
1002 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1004 // Read the contents of the input stream into our result string...
1005 String esResponseString = null;
1007 while ((esResponseString = bufferedreader.readLine()) != null) {
1008 bulkResult.append(esResponseString).append("\n");
1011 } catch (IOException e) {
1013 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1014 if (logger.isDebugEnabled()) {
1015 StringWriter sw = new StringWriter();
1016 e.printStackTrace(new PrintWriter(sw));
1017 logger.debug(sw.toString());
1020 throw new DocumentStoreOperationException(
1021 "Failure interacting with document store. Cause: " + e.getMessage(), e);
1024 if (logger.isDebugEnabled()) {
1025 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1028 // ...and marshal the resulting string into a Java object.
1030 opResult = marshallEsBulkResult(bulkResult.toString());
1032 } catch (IOException e) {
1034 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1035 if (logger.isDebugEnabled()) {
1036 logger.debug(Throwables.getStackTraceAsString(e));
1039 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
1044 // Finally, build the operation result and return it to the caller.
1045 OperationResult result = new OperationResult();
1046 result.setResultCode(207);
1047 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1049 // In the success case we don't want the entire result string to be
1050 // dumped into the metrics log, so concatenate it.
1051 String resultStringForMetricsLog = result.getResult();
1052 if (isSuccess(result)) {
1053 resultStringForMetricsLog =
1054 resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1057 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1059 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1060 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1067 * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1069 * @param request - The request to be performed.
1070 * @param sb - The string builder to append the json data to
1071 * @throws DocumentStoreOperationException
1073 private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1074 throws DocumentStoreOperationException {
1076 boolean retVal = true;
1077 // What kind of operation are we performing?
1078 switch (request.getOperationType()) {
1080 // Create a new document.
1083 // Make sure that we were supplied a document payload.
1084 if (request.getOperation().getDocument() == null) {
1086 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1087 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1091 // Make sure that the supplied document URL is formatted
1093 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1094 fails.add(generateRejectionEntry(request.getOperationType(),
1095 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1096 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1100 // Validate that the specified index actually exists before we
1101 // try to perform the create.
1102 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1104 fails.add(generateRejectionEntry(request.getOperationType(),
1105 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1106 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1110 // If we were supplied an id for the new document, then
1111 // include it in the bulk operation to Elastic Search
1112 if (request.getId() == null) {
1114 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1116 // Otherwise, we just leave that parameter off and ElasticSearch
1117 // will generate one for us.
1119 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1124 // Append the document that we want to create.
1125 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1126 } catch (JsonProcessingException e) {
1127 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1132 // Update an existing document.
1135 // Make sure that we were supplied a document payload.
1136 if (request.getOperation().getDocument() == null) {
1138 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1139 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1143 // Make sure that the supplied document URL is formatted
1145 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1146 fails.add(generateRejectionEntry(request.getOperationType(),
1147 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1148 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1152 // Validate that the specified index actually exists before we
1153 // try to perform the update.
1154 if (!indexExists(request.getIndex())) {
1156 fails.add(generateRejectionEntry(request.getOperationType(),
1157 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1158 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1162 // Validate that the document we are trying to update actually
1163 // exists before we try to perform the update.
1164 if (!documentExists(request.getIndex(), request.getId())) {
1166 fails.add(generateRejectionEntry(request.getOperationType(),
1167 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1168 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1172 // It is mandatory that a version be supplied for an update operation,
1173 // so validate that now.
1174 if (request.getOperation().getMetaData().getEtag() == null) {
1176 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1177 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1181 // Generate the update request...
1182 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1183 request.getOperation().getMetaData().getEtag()));
1185 // ...and append the document that we want to update.
1187 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1188 } catch (JsonProcessingException e) {
1189 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1193 // Delete an existing document.
1196 // Make sure that the supplied document URL is formatted
1198 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1199 fails.add(generateRejectionEntry(request.getOperationType(),
1200 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1201 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1205 // Validate that the specified index actually exists before we
1206 // try to perform the delete.
1207 if (!indexExists(request.getIndex())) {
1209 fails.add(generateRejectionEntry(request.getOperationType(),
1210 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1211 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1215 // Validate that the document we are trying to update actually
1216 // exists before we try to perform the delete.
1217 if (!documentExists(request.getIndex(), request.getId())) {
1219 fails.add(generateRejectionEntry(request.getOperationType(),
1220 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1221 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1225 // It is mandatory that a version be supplied for a delete operation,
1226 // so validate that now.
1227 if (request.getOperation().getMetaData().getEtag() == null) {
1229 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1230 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1234 // Generate the delete request.
1235 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1236 request.getOperation().getMetaData().getEtag()));
1244 private boolean indexExists(String index) throws DocumentStoreOperationException {
1245 return isSuccess(checkIndexExistence(index));
1248 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1249 return isSuccess(checkDocumentExistence(index, id));
1253 * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1256 * @param rejectReason - A message describing why the operation was rejected.
1257 * @param anId - The identifier associated with the document being acted on.
1258 * @param statusCode - An HTTP status code.
1259 * @return - A result set item.
1261 private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1262 String anId, int statusCode, String originalUrl) {
1264 ElasticSearchError err = new ElasticSearchError();
1265 err.setReason(rejectReason);
1267 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1270 op.setStatus(statusCode);
1272 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1274 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1278 rejectionResult.setCreate(op);
1281 rejectionResult.setIndex(op);
1284 rejectionResult.setDelete(op);
1289 return rejectionResult;
1293 * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1294 * marshals it into a Java object.
1296 * @param jsonResult - The bulk operations response returned from ElasticSearch.
1297 * @return - The marshalled response.
1298 * @throws JsonParseException
1299 * @throws JsonMappingException
1300 * @throws IOException
1302 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1303 if (jsonResult != null) {
1304 if (logger.isDebugEnabled()) {
1305 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1308 ObjectMapper mapper = new ObjectMapper();
1309 mapper.setSerializationInclusion(Include.NON_EMPTY);
1311 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1318 * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1320 * @param esResult - ElasticSearch bulk operations response.
1321 * @return - A generic result set.
1323 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1324 List<ElasticSearchResultItem> rejectedOps) {
1326 int totalSuccess = 0;
1329 if (logger.isDebugEnabled()) {
1331 logger.debug("ESController: Build generic result set. ES Results: "
1332 + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1335 // Build a combined list of result items from the results returned
1336 // from ElasticSearch and the list of operations that we rejected
1337 // without sending to ElasticSearch.
1338 List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1339 if (esResult != null) {
1340 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1342 combinedResults.addAll(rejectedOps);
1344 // Iterate over the individual results in the resulting result set.
1345 StringBuilder resultsBuilder = new StringBuilder();
1346 AtomicBoolean firstItem = new AtomicBoolean(true);
1347 for (ElasticSearchResultItem item : combinedResults) {
1349 // Increment the operation counts.
1351 if (isSuccessCode(item.operationStatus().getStatus())) {
1357 // Prepend a comma to our response string unless this it the
1358 // first result in the set.
1359 if (!firstItem.compareAndSet(true, false)) {
1360 resultsBuilder.append(", ");
1363 // Append the current result as a generic json structure.
1364 resultsBuilder.append(item.toJson());
1367 return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1368 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1373 * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1375 * @param indexName - The index to look for.
1376 * @return - An operation result indicating the success or failure of the check.
1377 * @throws DocumentStoreOperationException
1379 public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1380 // Grab the current time so we can use it to generate a metrics log.
1381 MdcOverride override = getStartTime(new MdcOverride());
1383 String fullUrl = getFullUrl("/" + indexName, false);
1384 HttpURLConnection conn = initializeConnection(fullUrl);
1387 conn.setRequestMethod("HEAD");
1389 } catch (ProtocolException e) {
1390 shutdownConnection(conn);
1391 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1394 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1398 resultCode = conn.getResponseCode();
1399 } catch (IOException e) {
1400 shutdownConnection(conn);
1401 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
1403 logger.debug(MSG_RESPONSE_CODE + resultCode);
1405 // Initialize operation result with a failure codes / fault string
1406 OperationResult opResult = new OperationResult();
1407 setDefaultOperationResultValues(opResult);
1408 opResult.setResultCode(resultCode);
1410 logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
1412 shutdownConnection(conn);
1417 private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1418 DocumentOperationResult opResult = createDefaultOperationResult();
1419 handleResponse(conn, opResult);
1423 private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1424 throws DocumentStoreOperationException {
1425 SearchOperationResult opResult = createDefaultSearchOperationResult();
1426 handleResponse(conn, opResult);
1430 private DocumentOperationResult createDefaultOperationResult() {
1431 DocumentOperationResult opResult = new DocumentOperationResult();
1432 setDefaultOperationResultValues(opResult);
1436 private SearchOperationResult createDefaultSearchOperationResult() {
1437 SearchOperationResult opResult = new SearchOperationResult();
1438 setDefaultOperationResultValues(opResult);
1442 private void setDefaultOperationResultValues(OperationResult opResult) {
1443 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
1444 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
1447 private void buildDocumentResult(DocumentOperationResult result, String index)
1448 throws DocumentStoreOperationException {
1450 JSONParser parser = new JSONParser();
1453 root = (JSONObject) parser.parse(result.getResult());
1454 if (isSuccess(result)) {
1455 // Success response object
1456 Document doc = new Document();
1457 doc.setEtag(result.getResultVersion());
1458 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1460 doc.setContent((JSONObject) root.get("_source"));
1461 result.setDocument(doc);
1464 // Error response object
1465 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1466 if (error != null) {
1468 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1472 } catch (Exception e) {
1473 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1477 private String buildDocumentResponseUrl(String index, String id) {
1478 return ApiUtils.buildDocumentUri(index, id);
1481 private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1483 JSONParser parser = new JSONParser();
1487 root = (JSONObject) parser.parse(result.getResult());
1488 if (isSuccess(result)) {
1489 JSONObject hits = (JSONObject) root.get("hits");
1490 JSONArray hitArray = (JSONArray) hits.get("hits");
1491 SearchHits searchHits = new SearchHits();
1492 searchHits.setTotalHits(hits.get("total").toString());
1493 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1495 for (int i = 0; i < hitArray.size(); i++) {
1496 JSONObject hit = (JSONObject) hitArray.get(i);
1497 SearchHit searchHit = new SearchHit();
1498 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1499 Document doc = new Document();
1500 if (hit.get(JSON_ATTR_VERSION) != null) {
1501 doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1505 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1506 doc.setContent((JSONObject) hit.get("_source"));
1507 searchHit.setDocument(doc);
1508 searchHitArray.add(searchHit);
1510 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1511 result.setSearchResult(searchHits);
1513 JSONObject aggregations = (JSONObject) root.get("aggregations");
1514 if (aggregations != null) {
1515 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1516 AggregationResults aggs = new AggregationResults();
1517 aggs.setAggregations(aggResults);
1518 result.setAggregationResult(aggs);
1523 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1524 if (error != null) {
1526 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1529 } catch (Exception e) {
1530 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1534 private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1535 JSONParser parser = new JSONParser();
1538 root = (JSONObject) parser.parse(result.getResult());
1539 if (isSuccess(result)) {
1540 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1541 JSONObject hitdata = (JSONObject) hitArray.get(0);
1542 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1543 SuggestHits suggestHits = new SuggestHits();
1544 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1546 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1548 for (int i = 0; i < optionsArray.size(); i++) {
1549 JSONObject hit = (JSONObject) optionsArray.get(i);
1551 SuggestHit suggestHit = new SuggestHit();
1552 suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1553 suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1554 Document doc = new Document();
1555 if (hit.get(JSON_ATTR_VERSION) != null) {
1556 doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1559 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1561 doc.setContent((JSONObject) hit.get("payload"));
1562 suggestHit.setDocument(doc);
1563 suggestHitArray.add(suggestHit);
1565 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1566 result.setSuggestResult(suggestHits);
1568 JSONObject aggregations = (JSONObject) root.get("aggregations");
1569 if (aggregations != null) {
1570 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1571 AggregationResults aggs = new AggregationResults();
1572 aggs.setAggregations(aggResults);
1573 result.setAggregationResult(aggs);
1578 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1579 if (error != null) {
1581 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1584 } catch (Exception e) {
1585 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1590 * Record the timing of the operation in the metrics log.
1593 private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1595 metricsLogger.info(message,
1597 .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1598 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),