2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import com.google.common.base.Throwables;
30 import java.io.BufferedReader;
32 import java.io.FileInputStream;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.InputStreamReader;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.io.PrintWriter;
39 import java.io.StringWriter;
40 import java.net.HttpURLConnection;
41 import java.net.MalformedURLException;
42 import java.net.ProtocolException;
44 import java.text.SimpleDateFormat;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.List;
48 import java.util.Properties;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import org.json.simple.JSONArray;
51 import org.json.simple.JSONObject;
52 import org.json.simple.parser.JSONParser;
53 import org.json.simple.parser.ParseException;
54 import org.onap.aai.cl.api.LogFields;
55 import org.onap.aai.cl.api.LogLine;
56 import org.onap.aai.cl.api.Logger;
57 import org.onap.aai.cl.eelf.LoggerFactory;
58 import org.onap.aai.cl.mdc.MdcContext;
59 import org.onap.aai.cl.mdc.MdcOverride;
60 import org.onap.aai.sa.rest.AnalysisConfiguration;
61 import org.onap.aai.sa.rest.ApiUtils;
62 import org.onap.aai.sa.rest.BulkRequest;
63 import org.onap.aai.sa.rest.BulkRequest.OperationType;
64 import org.onap.aai.sa.rest.DocumentSchema;
65 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
66 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
67 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
68 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
69 import org.onap.aai.sa.searchdbabstraction.entity.Document;
70 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
71 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
72 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
73 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
74 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
75 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
76 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
77 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
78 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
79 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
80 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
81 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
82 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
83 import org.springframework.http.HttpStatus;
86 * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
88 public class ElasticSearchHttpController implements DocumentStoreInterface {
90 private static ElasticSearchHttpController instance = null;
92 private static final Logger logger =
93 LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
94 private static final Logger metricsLogger =
95 LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
97 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
98 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
99 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
100 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
101 private static final String BULK_IMPORT_INDEX_TEMPLATE =
102 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
103 private static final String BULK_DELETE_TEMPLATE =
104 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
106 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
107 "Internal Error: ElasticSearch operation fault occurred";
108 private final ElasticSearchConfig config;
110 private static final String DEFAULT_TYPE = "default";
112 protected AnalysisConfiguration analysisConfig;
114 public static ElasticSearchHttpController getInstance() {
116 synchronized (ElasticSearchHttpController.class) {
118 if (instance == null) {
120 Properties properties = new Properties();
121 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
123 properties.load(new FileInputStream(file));
124 } catch (Exception e) {
125 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
126 e.getLocalizedMessage());
129 ElasticSearchConfig config = new ElasticSearchConfig(properties);
130 instance = new ElasticSearchHttpController(config);
137 public ElasticSearchHttpController(ElasticSearchConfig config) {
138 this.config = config;
139 analysisConfig = new AnalysisConfiguration();
142 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
144 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
145 } catch (Exception e) {
146 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
152 public AnalysisConfiguration getAnalysisConfig() {
153 return analysisConfig;
157 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
159 OperationResult result = new OperationResult();
160 result.setResultCode(500);
164 // Submit the request to ElasticSearch to create the index using a
165 // default document type.
166 result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
167 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
169 // ElasticSearch will return us a 200 code on success when we
170 // want to report a 201, so translate the result here.
171 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
172 if (isSuccess(result)) {
173 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
176 } catch (DocumentStoreOperationException | IOException e) {
178 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
185 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
186 OperationResult result = new OperationResult();
187 result.setResultCode(500);
190 result = createTable(index, dynamicSchema);
192 // ElasticSearch will return us a 200 code on success when we
193 // want to report a 201, so translate the result here.
194 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
195 if (isSuccess(result)) {
196 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
198 } catch (DocumentStoreOperationException e) {
199 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
207 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
209 // Initialize operation result with a failure codes / fault string
210 OperationResult opResult = new OperationResult();
211 opResult.setResultCode(500);
212 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
214 // Grab the current time so we can use it to generate a metrics log.
215 MdcOverride override = getStartTime(new MdcOverride());
217 String fullUrl = getFullUrl("/" + indexName + "/", false);
218 HttpURLConnection conn = initializeConnection(fullUrl);
220 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
223 conn.setRequestMethod("DELETE");
224 } catch (ProtocolException e) {
225 shutdownConnection(conn);
226 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
229 handleResponse(conn, opResult);
231 // Generate a metrics log so we can track how long the operation took.
233 .info(SearchDbMsgs.DELETE_INDEX_TIME,
234 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
235 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
236 override, indexName);
238 shutdownConnection(conn);
244 private OperationResult checkConnection() throws Exception {
246 String fullUrl = getFullUrl("/_cluster/health", false);
248 HttpURLConnection conn = null;
250 url = new URL(fullUrl);
251 conn = (HttpURLConnection) url.openConnection();
252 conn.setRequestMethod("GET");
253 conn.setDoOutput(true);
254 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
256 int resultCode = conn.getResponseCode();
257 logger.debug("getClusterHealth() response Code : " + resultCode);
258 OperationResult opResult = new OperationResult();
259 opResult.setResultCode(resultCode);
261 shutdownConnection(conn);
266 private String getFullUrl(String resourceUrl, boolean isSecure) {
268 final String host = config.getIpAddress();
269 final String port = config.getHttpPort();
272 return String.format("https://%s:%s%s", host, port, resourceUrl);
274 return String.format("http://%s:%s%s", host, port, resourceUrl);
278 private void shutdownConnection(HttpURLConnection connection) {
279 if (connection == null) {
283 InputStream inputstream = null;
284 OutputStream outputstream = null;
287 inputstream = connection.getInputStream();
288 } catch (IOException e) {
289 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
291 if (inputstream != null) {
294 } catch (IOException e) {
295 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
296 e.getLocalizedMessage());
302 outputstream = connection.getOutputStream();
303 } catch (IOException e) {
304 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
306 if (outputstream != null) {
308 outputstream.close();
309 } catch (IOException e) {
310 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
311 e.getLocalizedMessage());
316 connection.disconnect();
320 protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
321 throws DocumentStoreOperationException {
323 if (indexSettings == null) {
324 logger.debug("No settings provided.");
327 if (indexMappings == null) {
328 logger.debug("No mappings provided.");
331 OperationResult opResult = new OperationResult();
333 // Initialize operation result with a failure codes / fault string
334 opResult.setResultCode(500);
335 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
337 // Grab the current time so we can use it to generate a metrics log.
338 MdcOverride override = getStartTime(new MdcOverride());
340 String fullUrl = getFullUrl("/" + indexName + "/", false);
341 HttpURLConnection conn = initializeConnection(fullUrl);
344 conn.setRequestMethod("PUT");
345 conn.setRequestProperty("Content-Type", "application/json");
346 } catch (ProtocolException e) {
347 shutdownConnection(conn);
348 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
351 StringBuilder sb = new StringBuilder(128);
352 sb.append("{ \"settings\" : ");
353 sb.append(indexSettings);
356 sb.append("\"mappings\" : {");
357 sb.append("\"" + typeName + "\" :");
358 sb.append(indexMappings);
362 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
363 } catch (IOException e) {
364 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
365 throw new DocumentStoreOperationException(e.getMessage(), e);
368 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
369 logger.debug("Request content: " + sb.toString());
371 handleResponse(conn, opResult);
373 shutdownConnection(conn);
375 // Generate a metrics log so we can track how long the operation took.
377 .info(SearchDbMsgs.CREATE_INDEX_TIME,
378 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
379 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
380 override, indexName);
386 * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
389 * @param indexName - The name of the index to be created
390 * @param settingsAndMappings - The actual JSON object that will define the index
391 * @return - The operation result of writing into Elasticsearch
392 * @throws DocumentStoreOperationException
394 protected OperationResult createTable(String indexName, String settingsAndMappings)
395 throws DocumentStoreOperationException {
396 OperationResult result = new OperationResult();
397 result.setResultCode(500);
398 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
400 // Grab the current time so we can use it to generate a metrics log.
401 MdcOverride override = getStartTime(new MdcOverride());
403 String fullUrl = getFullUrl("/" + indexName + "/", false);
404 HttpURLConnection conn = initializeConnection(fullUrl);
407 conn.setRequestMethod("PUT");
408 conn.setRequestProperty("Content-Type", "application/json");
409 } catch (ProtocolException e) {
410 shutdownConnection(conn);
411 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
415 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
416 } catch (IOException e) {
417 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
418 throw new DocumentStoreOperationException(e.getMessage());
420 handleResponse(conn, result);
422 // Generate a metrics log so we can track how long the operation took.
424 .info(SearchDbMsgs.CREATE_INDEX_TIME,
425 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
426 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
427 override, indexName);
433 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
434 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
436 if (!allowImplicitIndexCreation) {
438 // Before we do anything, make sure that the specified index actually exists in the
439 // document store - we don't want to rely on ElasticSearch to fail the document
440 // create because it could be configured to implicitly create a non-existent index,
441 // which can lead to hard-to-debug behaviour with queries down the road.
442 OperationResult indexExistsResult = checkIndexExistence(indexName);
443 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
445 DocumentOperationResult opResult = new DocumentOperationResult();
446 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
447 opResult.setResult("Document Index '" + indexName + "' does not exist.");
448 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
453 if (document.getId() == null || document.getId().isEmpty()) {
454 return createDocumentWithoutId(indexName, document);
456 return createDocumentWithId(indexName, document);
460 private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
461 throws DocumentStoreOperationException {
462 // check if the document already exists
463 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
466 if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
467 if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
468 opResult.setFailureCause("A document with the same id already exists.");
470 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
472 opResult.setResultCode(HttpStatus.CONFLICT.value());
476 opResult = new DocumentOperationResult();
477 // Initialize operation result with a failure codes / fault string
478 opResult.setResultCode(500);
479 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
481 // Grab the current time so we can use it to generate a metrics log.
482 MdcOverride override = getStartTime(new MdcOverride());
484 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
485 HttpURLConnection conn = initializeConnection(fullUrl);
488 conn.setRequestMethod("PUT");
489 } catch (ProtocolException e) {
490 shutdownConnection(conn);
491 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
494 attachDocument(conn, document);
496 logger.debug("Sending 'PUT' request to: " + conn.getURL());
498 handleResponse(conn, opResult);
499 buildDocumentResult(opResult, indexName);
501 // Generate a metrics log so we can track how long the operation took.
503 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
504 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
505 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
506 override, indexName);
508 shutdownConnection(conn);
514 private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
515 throws DocumentStoreOperationException {
517 DocumentOperationResult response = new DocumentOperationResult();
518 // Initialize operation result with a failure codes / fault string
519 response.setResultCode(500);
520 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
522 // Grab the current time so we can use it to generate a metrics log.
523 MdcOverride override = getStartTime(new MdcOverride());
525 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
526 HttpURLConnection conn = initializeConnection(fullUrl);
529 conn.setRequestMethod("POST");
530 } catch (ProtocolException e) {
531 shutdownConnection(conn);
532 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
535 attachDocument(conn, document);
537 logger.debug("Sending 'POST' request to: " + conn.getURL());
539 handleResponse(conn, response);
540 buildDocumentResult(response, indexName);
542 // Generate a metrics log so we can track how long the operation took.
544 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
545 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
546 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
547 override, indexName);
549 shutdownConnection(conn);
554 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
555 throws DocumentStoreOperationException {
556 conn.setRequestProperty("Content-Type", "application/json");
557 conn.setRequestProperty("Connection", "Close");
558 attachContent(conn, doc.getContentInJson());
561 private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
562 throws DocumentStoreOperationException {
563 DocumentOperationResult opResult = new DocumentOperationResult();
565 // Initialize operation result with a failure codes / fault string
566 opResult.setResultCode(500);
568 // Grab the current time so we can use it to generate a metrics log.
569 MdcOverride override = getStartTime(new MdcOverride());
571 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
572 HttpURLConnection conn = initializeConnection(fullUrl);
575 conn.setRequestMethod("HEAD");
576 } catch (ProtocolException e) {
577 shutdownConnection(conn);
578 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
581 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
585 resultCode = conn.getResponseCode();
586 } catch (IOException e) {
587 shutdownConnection(conn);
588 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
591 logger.debug("Response Code : " + resultCode);
593 opResult.setResultCode(resultCode);
595 // Generate a metrics log so we can track how long the operation took.
597 .info(SearchDbMsgs.GET_DOCUMENT_TIME,
598 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
599 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
600 override, indexName, docId);
602 shutdownConnection(conn);
608 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
609 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
611 if (!allowImplicitIndexCreation) {
612 // Before we do anything, make sure that the specified index actually exists in the
613 // document store - we don't want to rely on ElasticSearch to fail the document
614 // create because it could be configured to implicitly create a non-existent index,
615 // which can lead to hard-to-debug behaviour with queries down the road.
616 OperationResult indexExistsResult = checkIndexExistence(indexName);
617 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
619 DocumentOperationResult opResult = new DocumentOperationResult();
620 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
621 opResult.setResult("Document Index '" + indexName + "' does not exist.");
622 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
627 DocumentOperationResult opResult = new DocumentOperationResult();
629 // Initialize operation result with a failure codes / fault string
630 opResult.setResultCode(500);
631 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
633 // Grab the current time so we can use it to generate a metrics log.
634 MdcOverride override = getStartTime(new MdcOverride());
636 String fullUrl = getFullUrl(
637 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
639 HttpURLConnection conn = initializeConnection(fullUrl);
642 conn.setRequestMethod("PUT");
643 conn.setRequestProperty("Content-Type", "application/json");
644 } catch (ProtocolException e) {
645 shutdownConnection(conn);
646 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
649 attachDocument(conn, document);
651 logger.debug("Sending 'PUT' request to: " + conn.getURL());
653 handleResponse(conn, opResult);
654 buildDocumentResult(opResult, indexName);
656 // Generate a metrics log so we can track how long the operation took.
657 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
658 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
659 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
660 override, indexName, document.getId());
662 shutdownConnection(conn);
668 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
669 throws DocumentStoreOperationException {
670 DocumentOperationResult opResult = new DocumentOperationResult();
672 // Initialize operation result with a failure codes / fault string
673 opResult.setResultCode(500);
674 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
676 // Grab the current time so we can use it to generate a metrics log.
677 MdcOverride override = getStartTime(new MdcOverride());
679 String fullUrl = getFullUrl(
680 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
682 HttpURLConnection conn = initializeConnection(fullUrl);
685 conn.setRequestMethod("DELETE");
686 } catch (ProtocolException e) {
687 shutdownConnection(conn);
688 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
691 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
693 handleResponse(conn, opResult);
694 buildDocumentResult(opResult, indexName);
695 // supress the etag and url in response for delete as they are not required
696 if (opResult.getDocument() != null) {
697 opResult.getDocument().setEtag(null);
698 opResult.getDocument().setUrl(null);
701 // Generate a metrics log so we can track how long the operation took.
702 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
703 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
704 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
705 override, indexName, document.getId());
707 shutdownConnection(conn);
713 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
714 throws DocumentStoreOperationException {
715 DocumentOperationResult opResult = new DocumentOperationResult();
717 // Initialize operation result with a failure codes / fault string
718 opResult.setResultCode(500);
719 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
721 // Grab the current time so we can use it to generate a metrics log.
722 MdcOverride override = getStartTime(new MdcOverride());
724 String fullUrl = null;
725 if (document.getVersion() == null) {
726 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
728 fullUrl = getFullUrl(
729 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
732 HttpURLConnection conn = initializeConnection(fullUrl);
734 logger.debug("\nSending 'GET' request to: " + conn.getURL());
736 handleResponse(conn, opResult);
737 buildDocumentResult(opResult, indexName);
739 // Generate a metrics log so we can track how long the operation took.
740 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
741 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
742 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
743 override, indexName, document.getId());
745 shutdownConnection(conn);
751 public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
752 SearchOperationResult opResult = new SearchOperationResult();
754 // Initialize operation result with a failure codes / fault string
755 opResult.setResultCode(500);
756 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
758 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
760 // Grab the current time so we can use it to generate a metrics log.
761 MdcOverride override = getStartTime(new MdcOverride());
763 HttpURLConnection conn = initializeConnection(fullUrl);
766 conn.setRequestMethod("GET");
767 } catch (ProtocolException e) {
768 shutdownConnection(conn);
769 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
772 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
774 handleResponse(conn, opResult);
775 buildSearchResult(opResult, indexName);
778 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
779 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
780 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
781 override, indexName, queryString);
787 public SearchOperationResult searchWithPayload(String indexName, String query)
788 throws DocumentStoreOperationException {
789 SearchOperationResult opResult = new SearchOperationResult();
791 if (logger.isDebugEnabled()) {
792 logger.debug("Querying index: " + indexName + " with query string: " + query);
795 // Initialize operation result with a failure codes / fault string
796 opResult.setResultCode(500);
797 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
799 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
801 // Grab the current time so we can use it to generate a metrics log.
802 MdcOverride override = getStartTime(new MdcOverride());
804 HttpURLConnection conn = initializeConnection(fullUrl);
807 conn.setRequestMethod("POST");
808 conn.setRequestProperty("Content-Type", "application/json");
809 } catch (ProtocolException e) {
810 shutdownConnection(conn);
811 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
814 attachContent(conn, query);
816 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
817 logger.debug("Request body = Elasticsearch query = " + query);
819 handleResponse(conn, opResult);
820 buildSearchResult(opResult, indexName);
823 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
824 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
825 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
826 override, indexName, query);
828 shutdownConnection(conn);
835 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
836 throws DocumentStoreOperationException {
838 SearchOperationResult opResult = new SearchOperationResult();
840 if (logger.isDebugEnabled()) {
841 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
844 // Initialize operation result with a failure codes / fault string
845 opResult.setResultCode(500);
846 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
848 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
850 // Grab the current time so we can use it to generate a metrics log.
851 MdcOverride override = getStartTime(new MdcOverride());
853 HttpURLConnection conn = initializeConnection(fullUrl);
856 conn.setRequestMethod("POST");
857 conn.setRequestProperty("Content-Type", "application/json");
858 } catch (ProtocolException e) {
859 shutdownConnection(conn);
860 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
863 attachContent(conn, query);
865 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
866 logger.debug("Request body = Elasticsearch query = " + query);
868 handleResponse(conn, opResult);
869 buildSuggestResult(opResult, indexName);
872 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
873 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
874 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
875 override, indexName, query);
877 shutdownConnection(conn);
882 private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
883 OutputStream outputStream = null;
884 OutputStreamWriter out = null;
887 outputStream = conn.getOutputStream();
888 } catch (IOException e) {
889 shutdownConnection(conn);
890 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
893 out = new OutputStreamWriter(outputStream);
898 } catch (IOException e) {
899 shutdownConnection(conn);
900 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
904 private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
906 HttpURLConnection conn = null;
909 url = new URL(fullUrl);
910 } catch (MalformedURLException e) {
911 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
915 conn = (HttpURLConnection) url.openConnection();
916 conn.setDoOutput(true);
917 } catch (IOException e) {
918 shutdownConnection(conn);
919 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
925 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
926 throws DocumentStoreOperationException {
927 int resultCode = 200;
930 resultCode = conn.getResponseCode();
931 } catch (IOException e) {
932 shutdownConnection(conn);
933 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
936 logger.debug("Response Code : " + resultCode);
938 InputStream inputStream = null;
940 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
941 inputStream = conn.getErrorStream();
944 inputStream = conn.getInputStream();
945 } catch (IOException e) {
946 shutdownConnection(conn);
947 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
951 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
952 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
954 StringBuilder result = new StringBuilder(128);
955 String string = null;
958 while ((string = bufferedreader.readLine()) != null) {
959 result.append(string).append("\n");
961 } catch (IOException e) {
962 shutdownConnection(conn);
963 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
966 if (resultCode == HttpStatus.CONFLICT.value()) {
967 opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
969 opResult.setResultCode(resultCode);
971 if (logger.isDebugEnabled()) {
972 logger.debug("Raw result string from ElasticSearch = " + result.toString());
974 opResult.setResult(result.toString());
975 opResult.setResultVersion(extractVersion(result.toString()));
978 private String extractVersion(String result) {
979 JSONParser parser = new JSONParser();
980 String version = null;
982 JSONObject root = (JSONObject) parser.parse(result);
983 if (root.get("_version") != null) {
984 version = root.get("_version").toString();
986 } catch (ParseException e) {
987 // Not all responses from ElasticSearch include a version, so
988 // if we don't get one back, just return an empty string rather
989 // than trigger a false failure.
996 * This convenience method gets the current system time and stores it in an attribute in the supplied
997 * {@link MdcOverride} object so that it can be used later by the metrics logger.
999 * @param override - The {@link MdcOverride} object to update.
1000 * @return - The supplied {@link MdcOverride} object.
1002 private MdcOverride getStartTime(MdcOverride override) {
1004 // Grab the current time...
1005 long startTimeInMs = System.currentTimeMillis();
1007 // ...and add it as an attribute to the supplied MDC Override
1009 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1010 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1012 // Return the MdcOverride object that we were passed.
1013 // This looks odd, but it allows us to do stuff like:
1015 // MdcOverride ov = getStartTime(new MdcOverride())
1017 // which is quite handy, but also allows us to pass in an existing
1018 // MdcOverride object which already has some attributes set.
1022 private boolean isSuccess(OperationResult result) {
1024 return isSuccessCode(result.getResultCode());
1028 private boolean isSuccessCode(int statusCode) {
1029 return ((statusCode >= 200) && (statusCode < 300));
1034 public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
1036 if (logger.isDebugEnabled()) {
1037 String dbgString = "ESController: performBulkOperations - Operations: ";
1039 for (BulkRequest request : requests) {
1040 dbgString += "[" + request.toString() + "] ";
1043 logger.debug(dbgString);
1046 // Grab the current time so we can use it to generate a metrics log.
1047 MdcOverride override = getStartTime(new MdcOverride());
1049 // Parse the supplied set of operations.
1050 // Iterate over the list of operations which we were provided and
1051 // translate them into a format that ElasticSearh understands.
1053 StringBuilder esOperationSet = new StringBuilder(128);
1054 List<ElasticSearchResultItem> rejected = new ArrayList<>();
1055 for (BulkRequest request : requests) {
1057 // Convert the request to the syntax ElasticSearch likes.
1058 if (buildEsOperation(request, esOperationSet, rejected)) {
1063 ElasticSearchBulkOperationResult opResult = null;
1066 // Open an HTTP connection to the ElasticSearch back end.
1067 String fullUrl = getFullUrl("/_bulk", false);
1069 HttpURLConnection conn;
1072 url = new URL(fullUrl);
1073 conn = (HttpURLConnection) url.openConnection();
1074 conn.setRequestMethod("PUT");
1075 conn.setDoOutput(true);
1076 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1077 conn.setRequestProperty("Connection", "Close");
1079 } catch (IOException e) {
1081 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1082 if (logger.isDebugEnabled()) {
1083 logger.debug(Throwables.getStackTraceAsString(e));
1086 throw new DocumentStoreOperationException(
1087 "Failed to open connection to document store. Cause: " + e.getMessage(), e);
1090 StringBuilder bulkResult = new StringBuilder(128);
1092 // Create an output stream to write our request to.
1093 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1095 if (logger.isDebugEnabled()) {
1096 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1097 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1100 // Write the resulting request string to our output stream. (this sends the request to ES?)
1101 out.write(esOperationSet.toString());
1104 // Open an input stream on our connection in order to read back the results.
1105 InputStream is = conn.getInputStream();
1106 InputStreamReader inputstreamreader = new InputStreamReader(is);
1107 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1109 // Read the contents of the input stream into our result string...
1110 String esResponseString = null;
1112 while ((esResponseString = bufferedreader.readLine()) != null) {
1113 bulkResult.append(esResponseString).append("\n");
1116 } catch (IOException e) {
1118 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1119 if (logger.isDebugEnabled()) {
1120 StringWriter sw = new StringWriter();
1121 e.printStackTrace(new PrintWriter(sw));
1122 logger.debug(sw.toString());
1125 throw new DocumentStoreOperationException(
1126 "Failure interacting with document store. Cause: " + e.getMessage(), e);
1129 if (logger.isDebugEnabled()) {
1130 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1133 // ...and marshal the resulting string into a Java object.
1135 opResult = marshallEsBulkResult(bulkResult.toString());
1137 } catch (IOException e) {
1139 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1140 if (logger.isDebugEnabled()) {
1141 logger.debug(Throwables.getStackTraceAsString(e));
1144 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
1149 // Finally, build the operation result and return it to the caller.
1150 OperationResult result = new OperationResult();
1151 result.setResultCode(207);
1152 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1154 // In the success case we don't want the entire result string to be
1155 // dumped into the metrics log, so concatenate it.
1156 String resultStringForMetricsLog = result.getResult();
1157 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1158 resultStringForMetricsLog =
1159 resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1162 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1163 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1164 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1172 * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1174 * @param request - The request to be performed.
1175 * @param sb - The string builder to append the json data to
1176 * @throws DocumentStoreOperationException
1178 private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1179 throws DocumentStoreOperationException {
1181 boolean retVal = true;
1182 // What kind of operation are we performing?
1183 switch (request.getOperationType()) {
1185 // Create a new document.
1188 // Make sure that we were supplied a document payload.
1189 if (request.getOperation().getDocument() == null) {
1191 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1192 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1196 // Make sure that the supplied document URL is formatted
1198 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1199 fails.add(generateRejectionEntry(request.getOperationType(),
1200 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1201 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1205 // Validate that the specified index actually exists before we
1206 // try to perform the create.
1207 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1209 fails.add(generateRejectionEntry(request.getOperationType(),
1210 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1211 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1215 // If we were supplied an id for the new document, then
1216 // include it in the bulk operation to Elastic Search
1217 if (request.getId() == null) {
1219 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1221 // Otherwise, we just leave that parameter off and ElasticSearch
1222 // will generate one for us.
1224 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1229 // Append the document that we want to create.
1230 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1231 } catch (JsonProcessingException e) {
1232 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1237 // Update an existing document.
1240 // Make sure that we were supplied a document payload.
1241 if (request.getOperation().getDocument() == null) {
1243 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1244 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1248 // Make sure that the supplied document URL is formatted
1250 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1251 fails.add(generateRejectionEntry(request.getOperationType(),
1252 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1253 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1257 // Validate that the specified index actually exists before we
1258 // try to perform the update.
1259 if (!indexExists(request.getIndex())) {
1261 fails.add(generateRejectionEntry(request.getOperationType(),
1262 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1263 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1267 // Validate that the document we are trying to update actually
1268 // exists before we try to perform the update.
1269 if (!documentExists(request.getIndex(), request.getId())) {
1271 fails.add(generateRejectionEntry(request.getOperationType(),
1272 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1273 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1277 // It is mandatory that a version be supplied for an update operation,
1278 // so validate that now.
1279 if (request.getOperation().getMetaData().getEtag() == null) {
1281 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1282 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1286 // Generate the update request...
1287 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1288 request.getOperation().getMetaData().getEtag()));
1290 // ...and append the document that we want to update.
1292 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1293 } catch (JsonProcessingException e) {
1294 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1298 // Delete an existing document.
1301 // Make sure that the supplied document URL is formatted
1303 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1304 fails.add(generateRejectionEntry(request.getOperationType(),
1305 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1306 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1310 // Validate that the specified index actually exists before we
1311 // try to perform the delete.
1312 if (!indexExists(request.getIndex())) {
1314 fails.add(generateRejectionEntry(request.getOperationType(),
1315 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1316 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1320 // Validate that the document we are trying to update actually
1321 // exists before we try to perform the delete.
1322 if (!documentExists(request.getIndex(), request.getId())) {
1324 fails.add(generateRejectionEntry(request.getOperationType(),
1325 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1326 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1330 // It is mandatory that a version be supplied for a delete operation,
1331 // so validate that now.
1332 if (request.getOperation().getMetaData().getEtag() == null) {
1334 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1335 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1339 // Generate the delete request.
1340 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1341 request.getOperation().getMetaData().getEtag()));
1349 private boolean indexExists(String index) throws DocumentStoreOperationException {
1351 OperationResult indexExistsResult = checkIndexExistence(index);
1353 return ((indexExistsResult.getResultCode() >= 200) && (indexExistsResult.getResultCode() < 300));
1356 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1358 OperationResult docExistsResult = checkDocumentExistence(index, id);
1360 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1364 * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1367 * @param rejectReason - A message describing why the operation was rejected.
1368 * @param anId - The identifier associated with the document being acted on.
1369 * @param statusCode - An HTTP status code.
1370 * @return - A result set item.
1372 private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1373 String anId, int statusCode, String originalUrl) {
1375 ElasticSearchError err = new ElasticSearchError();
1376 err.setReason(rejectReason);
1378 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1381 op.setStatus(statusCode);
1383 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1385 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1389 rejectionResult.setCreate(op);
1392 rejectionResult.setIndex(op);
1395 rejectionResult.setDelete(op);
1400 return rejectionResult;
1404 * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1405 * marshals it into a Java object.
1407 * @param jsonResult - The bulk operations response returned from ElasticSearch.
1408 * @return - The marshalled response.
1409 * @throws JsonParseException
1410 * @throws JsonMappingException
1411 * @throws IOException
1413 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1414 if (jsonResult != null) {
1415 if (logger.isDebugEnabled()) {
1416 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1419 ObjectMapper mapper = new ObjectMapper();
1420 mapper.setSerializationInclusion(Include.NON_EMPTY);
1422 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1429 * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1431 * @param esResult - ElasticSearch bulk operations response.
1432 * @return - A generic result set.
1434 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1435 List<ElasticSearchResultItem> rejectedOps) {
1437 int totalSuccess = 0;
1440 if (logger.isDebugEnabled()) {
1442 logger.debug("ESController: Build generic result set. ES Results: "
1443 + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1446 // Build a combined list of result items from the results returned
1447 // from ElasticSearch and the list of operations that we rejected
1448 // without sending to ElasticSearch.
1449 List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1450 if (esResult != null) {
1451 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1453 combinedResults.addAll(rejectedOps);
1455 // Iterate over the individual results in the resulting result set.
1456 StringBuilder resultsBuilder = new StringBuilder();
1457 AtomicBoolean firstItem = new AtomicBoolean(true);
1458 for (ElasticSearchResultItem item : combinedResults) {
1460 // Increment the operation counts.
1462 if (isSuccessCode(item.operationStatus().getStatus())) {
1468 // Prepend a comma to our response string unless this it the
1469 // first result in the set.
1470 if (!firstItem.compareAndSet(true, false)) {
1471 resultsBuilder.append(", ");
1474 // Append the current result as a generic json structure.
1475 resultsBuilder.append(item.toJson());
1478 return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1479 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1484 * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1486 * @param indexName - The index to look for.
1487 * @return - An operation result indicating the success or failure of the check.
1488 * @throws DocumentStoreOperationException
1490 public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1492 // Initialize operation result with a failure codes / fault string
1493 OperationResult opResult = new OperationResult();
1494 opResult.setResultCode(500);
1496 // Grab the current time so we can use it to generate a metrics log.
1497 MdcOverride override = getStartTime(new MdcOverride());
1499 String fullUrl = getFullUrl("/" + indexName, false);
1500 HttpURLConnection conn = initializeConnection(fullUrl);
1503 conn.setRequestMethod("HEAD");
1505 } catch (ProtocolException e) {
1506 shutdownConnection(conn);
1507 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1510 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1514 resultCode = conn.getResponseCode();
1515 } catch (IOException e) {
1516 shutdownConnection(conn);
1517 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1519 logger.debug("Response Code : " + resultCode);
1521 opResult.setResultCode(resultCode);
1523 // Generate a metrics log so we can track how long the operation took.
1525 .info(SearchDbMsgs.CHECK_INDEX_TIME,
1526 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1527 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1528 override, indexName);
1530 shutdownConnection(conn);
1536 private void buildDocumentResult(DocumentOperationResult result, String index)
1537 throws DocumentStoreOperationException {
1539 JSONParser parser = new JSONParser();
1542 root = (JSONObject) parser.parse(result.getResult());
1544 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1545 // Success response object
1546 Document doc = new Document();
1547 doc.setEtag(result.getResultVersion());
1548 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1550 doc.setContent((JSONObject) root.get("_source"));
1551 result.setDocument(doc);
1554 // Error response object
1555 JSONObject error = (JSONObject) root.get("error");
1556 if (error != null) {
1557 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1561 } catch (Exception e) {
1562 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1568 private String buildDocumentResponseUrl(String index, String id) {
1569 return ApiUtils.buildDocumentUri(index, id);
1572 private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1574 JSONParser parser = new JSONParser();
1578 root = (JSONObject) parser.parse(result.getResult());
1579 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1580 JSONObject hits = (JSONObject) root.get("hits");
1581 JSONArray hitArray = (JSONArray) hits.get("hits");
1582 SearchHits searchHits = new SearchHits();
1583 searchHits.setTotalHits(hits.get("total").toString());
1584 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1586 for (int i = 0; i < hitArray.size(); i++) {
1587 JSONObject hit = (JSONObject) hitArray.get(i);
1588 SearchHit searchHit = new SearchHit();
1589 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1590 Document doc = new Document();
1591 if (hit.get("_version") != null) {
1592 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1596 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1597 doc.setContent((JSONObject) hit.get("_source"));
1598 searchHit.setDocument(doc);
1599 searchHitArray.add(searchHit);
1601 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1602 result.setSearchResult(searchHits);
1604 JSONObject aggregations = (JSONObject) root.get("aggregations");
1605 if (aggregations != null) {
1606 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1607 AggregationResults aggs = new AggregationResults();
1608 aggs.setAggregations(aggResults);
1609 result.setAggregationResult(aggs);
1614 JSONObject error = (JSONObject) root.get("error");
1615 if (error != null) {
1616 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1619 } catch (Exception e) {
1620 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1625 private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1626 JSONParser parser = new JSONParser();
1629 root = (JSONObject) parser.parse(result.getResult());
1630 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1631 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1632 JSONObject hitdata = (JSONObject) hitArray.get(0);
1633 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1634 SuggestHits suggestHits = new SuggestHits();
1635 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1637 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1639 for (int i = 0; i < optionsArray.size(); i++) {
1640 JSONObject hit = (JSONObject) optionsArray.get(i);
1642 SuggestHit suggestHit = new SuggestHit();
1643 suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1644 suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1645 Document doc = new Document();
1646 if (hit.get("_version") != null) {
1647 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1650 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1652 doc.setContent((JSONObject) hit.get("payload"));
1653 suggestHit.setDocument(doc);
1654 suggestHitArray.add(suggestHit);
1656 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1657 result.setSuggestResult(suggestHits);
1659 JSONObject aggregations = (JSONObject) root.get("aggregations");
1660 if (aggregations != null) {
1661 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1662 AggregationResults aggs = new AggregationResults();
1663 aggs.setAggregations(aggResults);
1664 result.setAggregationResult(aggs);
1669 JSONObject error = (JSONObject) root.get("error");
1670 if (error != null) {
1671 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1674 } catch (Exception e) {
1675 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());