2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.json.simple.JSONArray;
57 import org.json.simple.JSONObject;
58 import org.json.simple.parser.JSONParser;
59 import org.json.simple.parser.ParseException;
60 import org.onap.aai.cl.api.LogFields;
61 import org.onap.aai.cl.api.LogLine;
62 import org.onap.aai.cl.api.Logger;
63 import org.onap.aai.cl.eelf.LoggerFactory;
64 import org.onap.aai.cl.mdc.MdcContext;
65 import org.onap.aai.cl.mdc.MdcOverride;
66 import org.onap.aai.sa.rest.AnalysisConfiguration;
67 import org.onap.aai.sa.rest.ApiUtils;
68 import org.onap.aai.sa.rest.BulkRequest;
69 import org.onap.aai.sa.rest.BulkRequest.OperationType;
70 import org.onap.aai.sa.rest.DocumentSchema;
71 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
73 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
75 import org.onap.aai.sa.searchdbabstraction.entity.Document;
76 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
77 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
80 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
81 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
84 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
85 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
86 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
87 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
88 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
91 * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
93 public class ElasticSearchHttpController implements DocumentStoreInterface {
95 private static ElasticSearchHttpController instance = null;
97 private static final Logger logger =
98 LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
99 private static final Logger metricsLogger =
100 LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
102 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
103 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
104 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
105 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
106 private static final String BULK_IMPORT_INDEX_TEMPLATE =
107 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
108 private static final String BULK_DELETE_TEMPLATE =
109 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
111 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
112 "Internal Error: ElasticSearch operation fault occurred";
113 private final ElasticSearchConfig config;
115 private static final String DEFAULT_TYPE = "default";
117 protected AnalysisConfiguration analysisConfig;
119 public static ElasticSearchHttpController getInstance() {
121 synchronized (ElasticSearchHttpController.class) {
123 if (instance == null) {
124 Properties properties = new Properties();
125 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
127 properties.load(new FileInputStream(file));
128 } catch (Exception e) {
129 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
130 e.getLocalizedMessage());
133 ElasticSearchConfig config = new ElasticSearchConfig(properties);
134 instance = new ElasticSearchHttpController(config);
141 public ElasticSearchHttpController(ElasticSearchConfig config) {
142 this.config = config;
143 analysisConfig = new AnalysisConfiguration();
146 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
148 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
149 } catch (Exception e) {
150 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
156 public AnalysisConfiguration getAnalysisConfig() {
157 return analysisConfig;
161 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
162 OperationResult result = new OperationResult();
163 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
167 // Submit the request to ElasticSearch to create the index using a
168 // default document type.
169 result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
170 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
172 // ElasticSearch will return us a 200 code on success when we
173 // want to report a 201, so translate the result here.
174 if (result.getResultCode() == Status.OK.getStatusCode()) {
175 result.setResultCode(Status.CREATED.getStatusCode());
178 if (isSuccess(result)) {
179 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
181 } catch (DocumentStoreOperationException | IOException e) {
182 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
189 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
190 OperationResult result = new OperationResult();
191 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
194 result = createTable(index, dynamicSchema);
196 // ElasticSearch will return us a 200 code on success when we
197 // want to report a 201, so translate the result here.
198 if (result.getResultCode() == Status.OK.getStatusCode()) {
199 result.setResultCode(Status.CREATED.getStatusCode());
201 if (isSuccess(result)) {
202 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
204 } catch (DocumentStoreOperationException e) {
205 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
212 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
214 // Initialize operation result with a failure codes / fault string
215 OperationResult opResult = new OperationResult();
216 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
217 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
219 // Grab the current time so we can use it to generate a metrics log.
220 MdcOverride override = getStartTime(new MdcOverride());
222 String fullUrl = getFullUrl("/" + indexName + "/", false);
223 HttpURLConnection conn = initializeConnection(fullUrl);
225 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
228 conn.setRequestMethod("DELETE");
229 } catch (ProtocolException e) {
230 shutdownConnection(conn);
231 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
234 handleResponse(conn, opResult);
236 // Generate a metrics log so we can track how long the operation took.
238 .info(SearchDbMsgs.DELETE_INDEX_TIME,
239 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
240 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
241 override, indexName);
243 shutdownConnection(conn);
248 private OperationResult checkConnection() throws IOException {
249 String fullUrl = getFullUrl("/_cluster/health", false);
251 HttpURLConnection conn = null;
253 url = new URL(fullUrl);
254 conn = (HttpURLConnection) url.openConnection();
255 conn.setRequestMethod("GET");
256 conn.setDoOutput(true);
257 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
259 int resultCode = conn.getResponseCode();
260 logger.debug("getClusterHealth() response Code : " + resultCode);
261 OperationResult opResult = new OperationResult();
262 opResult.setResultCode(resultCode);
264 shutdownConnection(conn);
269 private String getFullUrl(String resourceUrl, boolean isSecure) {
271 final String host = config.getIpAddress();
272 final String port = config.getHttpPort();
275 return String.format("https://%s:%s%s", host, port, resourceUrl);
277 return String.format("http://%s:%s%s", host, port, resourceUrl);
281 private void shutdownConnection(HttpURLConnection connection) {
282 if (connection == null) {
286 InputStream inputstream = null;
287 OutputStream outputstream = null;
290 inputstream = connection.getInputStream();
291 } catch (IOException e) {
292 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
294 if (inputstream != null) {
297 } catch (IOException e) {
298 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
299 e.getLocalizedMessage());
305 outputstream = connection.getOutputStream();
306 } catch (IOException e) {
307 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
309 if (outputstream != null) {
311 outputstream.close();
312 } catch (IOException e) {
313 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
314 e.getLocalizedMessage());
319 connection.disconnect();
323 protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
324 throws DocumentStoreOperationException {
326 if (indexSettings == null) {
327 logger.debug("No settings provided.");
330 if (indexMappings == null) {
331 logger.debug("No mappings provided.");
334 OperationResult opResult = new OperationResult();
336 // Initialize operation result with a failure codes / fault string
337 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
338 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
340 // Grab the current time so we can use it to generate a metrics log.
341 MdcOverride override = getStartTime(new MdcOverride());
343 String fullUrl = getFullUrl("/" + indexName + "/", false);
344 HttpURLConnection conn = initializeConnection(fullUrl);
347 conn.setRequestMethod("PUT");
348 } catch (ProtocolException e) {
349 shutdownConnection(conn);
350 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
353 StringBuilder sb = new StringBuilder(128);
354 sb.append("{ \"settings\" : ");
355 sb.append(indexSettings);
358 sb.append("\"mappings\" : {");
359 sb.append("\"" + typeName + "\" :");
360 sb.append(indexMappings);
364 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
365 } catch (IOException e) {
366 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
367 throw new DocumentStoreOperationException(e.getMessage(), e);
370 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
371 logger.debug("Request content: " + sb.toString());
373 handleResponse(conn, opResult);
375 shutdownConnection(conn);
377 // Generate a metrics log so we can track how long the operation took.
379 .info(SearchDbMsgs.CREATE_INDEX_TIME,
380 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
381 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
382 override, indexName);
388 * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
391 * @param indexName - The name of the index to be created
392 * @param settingsAndMappings - The actual JSON object that will define the index
393 * @return - The operation result of writing into Elasticsearch
394 * @throws DocumentStoreOperationException
396 protected OperationResult createTable(String indexName, String settingsAndMappings)
397 throws DocumentStoreOperationException {
398 OperationResult result = new OperationResult();
399 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
400 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
402 // Grab the current time so we can use it to generate a metrics log.
403 MdcOverride override = getStartTime(new MdcOverride());
405 String fullUrl = getFullUrl("/" + indexName + "/", false);
406 HttpURLConnection conn = initializeConnection(fullUrl);
409 conn.setRequestMethod("PUT");
410 } catch (ProtocolException e) {
411 shutdownConnection(conn);
412 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
416 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
417 } catch (IOException e) {
418 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
419 throw new DocumentStoreOperationException(e.getMessage());
421 handleResponse(conn, result);
423 // Generate a metrics log so we can track how long the operation took.
425 .info(SearchDbMsgs.CREATE_INDEX_TIME,
426 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
427 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
428 override, indexName);
434 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
435 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
437 if (!allowImplicitIndexCreation) {
438 // Before we do anything, make sure that the specified index actually exists in the
439 // document store - we don't want to rely on ElasticSearch to fail the document
440 // create because it could be configured to implicitly create a non-existent index,
441 // which can lead to hard-to-debug behaviour with queries down the road.
442 OperationResult indexExistsResult = checkIndexExistence(indexName);
443 if (!isSuccess(indexExistsResult)) {
444 DocumentOperationResult opResult = new DocumentOperationResult();
445 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
446 opResult.setResult("Document Index '" + indexName + "' does not exist.");
447 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
452 if (document.getId() == null || document.getId().isEmpty()) {
453 return createDocumentWithoutId(indexName, document);
455 return createDocumentWithId(indexName, document);
459 private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
460 throws DocumentStoreOperationException {
461 // check if the document already exists
462 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
464 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
465 if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
466 opResult.setFailureCause("A document with the same id already exists.");
468 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
470 opResult.setResultCode(Status.CONFLICT.getStatusCode());
474 opResult = new DocumentOperationResult();
475 // Initialize operation result with a failure codes / fault string
476 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
477 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
479 // Grab the current time so we can use it to generate a metrics log.
480 MdcOverride override = getStartTime(new MdcOverride());
482 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
483 HttpURLConnection conn = initializeConnection(fullUrl);
486 conn.setRequestMethod("PUT");
487 } catch (ProtocolException e) {
488 shutdownConnection(conn);
489 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
492 attachDocument(conn, document);
494 logger.debug("Sending 'PUT' request to: " + conn.getURL());
496 handleResponse(conn, opResult);
497 buildDocumentResult(opResult, indexName);
499 // Generate a metrics log so we can track how long the operation took.
501 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
502 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
503 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
504 override, indexName);
506 shutdownConnection(conn);
511 private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
512 throws DocumentStoreOperationException {
514 DocumentOperationResult response = new DocumentOperationResult();
515 // Initialize operation result with a failure codes / fault string
516 response.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
517 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
519 // Grab the current time so we can use it to generate a metrics log.
520 MdcOverride override = getStartTime(new MdcOverride());
522 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
523 HttpURLConnection conn = initializeConnection(fullUrl);
526 conn.setRequestMethod("POST");
527 } catch (ProtocolException e) {
528 shutdownConnection(conn);
529 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
532 attachDocument(conn, document);
534 logger.debug("Sending 'POST' request to: " + conn.getURL());
536 handleResponse(conn, response);
537 buildDocumentResult(response, indexName);
539 // Generate a metrics log so we can track how long the operation took.
541 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
542 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
543 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
544 override, indexName);
546 shutdownConnection(conn);
551 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
552 throws DocumentStoreOperationException {
553 conn.setRequestProperty("Connection", "Close");
554 attachContent(conn, doc.getContentInJson());
557 private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
558 throws DocumentStoreOperationException {
559 DocumentOperationResult opResult = new DocumentOperationResult();
561 // Initialize operation result with a failure codes / fault string
562 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
564 // Grab the current time so we can use it to generate a metrics log.
565 MdcOverride override = getStartTime(new MdcOverride());
567 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
568 HttpURLConnection conn = initializeConnection(fullUrl);
571 conn.setRequestMethod("HEAD");
572 } catch (ProtocolException e) {
573 shutdownConnection(conn);
574 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
577 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
581 resultCode = conn.getResponseCode();
582 } catch (IOException e) {
583 shutdownConnection(conn);
584 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
587 logger.debug("Response Code : " + resultCode);
589 opResult.setResultCode(resultCode);
591 // Generate a metrics log so we can track how long the operation took.
593 .info(SearchDbMsgs.GET_DOCUMENT_TIME,
594 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
595 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
596 override, indexName, docId);
598 shutdownConnection(conn);
604 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
605 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
607 if (!allowImplicitIndexCreation) {
608 // Before we do anything, make sure that the specified index actually exists in the
609 // document store - we don't want to rely on ElasticSearch to fail the document
610 // create because it could be configured to implicitly create a non-existent index,
611 // which can lead to hard-to-debug behaviour with queries down the road.
612 OperationResult indexExistsResult = checkIndexExistence(indexName);
613 if (!isSuccess(indexExistsResult)) {
614 DocumentOperationResult opResult = new DocumentOperationResult();
615 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
616 opResult.setResult("Document Index '" + indexName + "' does not exist.");
617 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
622 DocumentOperationResult opResult = new DocumentOperationResult();
624 // Initialize operation result with a failure codes / fault string
625 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
626 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
628 // Grab the current time so we can use it to generate a metrics log.
629 MdcOverride override = getStartTime(new MdcOverride());
631 String fullUrl = getFullUrl(
632 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
634 HttpURLConnection conn = initializeConnection(fullUrl);
637 conn.setRequestMethod("PUT");
638 } catch (ProtocolException e) {
639 shutdownConnection(conn);
640 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
643 attachDocument(conn, document);
645 logger.debug("Sending 'PUT' request to: " + conn.getURL());
647 handleResponse(conn, opResult);
648 buildDocumentResult(opResult, indexName);
650 // Generate a metrics log so we can track how long the operation took.
651 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
652 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
653 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
654 override, indexName, document.getId());
656 shutdownConnection(conn);
662 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
663 throws DocumentStoreOperationException {
664 DocumentOperationResult opResult = new DocumentOperationResult();
666 // Initialize operation result with a failure codes / fault string
667 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
668 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
670 // Grab the current time so we can use it to generate a metrics log.
671 MdcOverride override = getStartTime(new MdcOverride());
673 String fullUrl = getFullUrl(
674 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
676 HttpURLConnection conn = initializeConnection(fullUrl);
679 conn.setRequestMethod("DELETE");
680 } catch (ProtocolException e) {
681 shutdownConnection(conn);
682 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
685 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
687 handleResponse(conn, opResult);
688 buildDocumentResult(opResult, indexName);
689 // supress the etag and url in response for delete as they are not required
690 if (opResult.getDocument() != null) {
691 opResult.getDocument().setEtag(null);
692 opResult.getDocument().setUrl(null);
695 // Generate a metrics log so we can track how long the operation took.
696 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
697 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
698 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
699 override, indexName, document.getId());
701 shutdownConnection(conn);
707 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
708 throws DocumentStoreOperationException {
709 DocumentOperationResult opResult = new DocumentOperationResult();
711 // Initialize operation result with a failure codes / fault string
712 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
713 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
715 // Grab the current time so we can use it to generate a metrics log.
716 MdcOverride override = getStartTime(new MdcOverride());
718 String fullUrl = null;
719 if (document.getVersion() == null) {
720 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
722 fullUrl = getFullUrl(
723 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
726 HttpURLConnection conn = initializeConnection(fullUrl);
728 logger.debug("\nSending 'GET' request to: " + conn.getURL());
730 handleResponse(conn, opResult);
731 buildDocumentResult(opResult, indexName);
733 // Generate a metrics log so we can track how long the operation took.
734 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
735 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
736 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
737 override, indexName, document.getId());
739 shutdownConnection(conn);
745 public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
746 SearchOperationResult opResult = new SearchOperationResult();
748 // Initialize operation result with a failure codes / fault string
749 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
750 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
752 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
754 // Grab the current time so we can use it to generate a metrics log.
755 MdcOverride override = getStartTime(new MdcOverride());
757 HttpURLConnection conn = initializeConnection(fullUrl);
760 conn.setRequestMethod("GET");
761 } catch (ProtocolException e) {
762 shutdownConnection(conn);
763 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
766 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
768 handleResponse(conn, opResult);
769 buildSearchResult(opResult, indexName);
772 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
773 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
774 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
775 override, indexName, queryString);
781 public SearchOperationResult searchWithPayload(String indexName, String query)
782 throws DocumentStoreOperationException {
783 SearchOperationResult opResult = new SearchOperationResult();
785 if (logger.isDebugEnabled()) {
786 logger.debug("Querying index: " + indexName + " with query string: " + query);
789 // Initialize operation result with a failure codes / fault string
790 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
791 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
793 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
795 // Grab the current time so we can use it to generate a metrics log.
796 MdcOverride override = getStartTime(new MdcOverride());
798 HttpURLConnection conn = initializeConnection(fullUrl);
801 conn.setRequestMethod("POST");
802 } catch (ProtocolException e) {
803 shutdownConnection(conn);
804 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
807 attachContent(conn, query);
809 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
810 logger.debug("Request body = Elasticsearch query = " + query);
812 handleResponse(conn, opResult);
813 buildSearchResult(opResult, indexName);
816 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
817 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
818 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
819 override, indexName, query);
821 shutdownConnection(conn);
828 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
829 throws DocumentStoreOperationException {
831 SearchOperationResult opResult = new SearchOperationResult();
833 if (logger.isDebugEnabled()) {
834 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
837 // Initialize operation result with a failure codes / fault string
838 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
839 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
841 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
843 // Grab the current time so we can use it to generate a metrics log.
844 MdcOverride override = getStartTime(new MdcOverride());
846 HttpURLConnection conn = initializeConnection(fullUrl);
849 conn.setRequestMethod("POST");
850 } catch (ProtocolException e) {
851 shutdownConnection(conn);
852 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
855 attachContent(conn, query);
857 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
858 logger.debug("Request body = Elasticsearch query = " + query);
860 handleResponse(conn, opResult);
861 buildSuggestResult(opResult, indexName);
864 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
865 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
866 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
867 override, indexName, query);
869 shutdownConnection(conn);
874 private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
875 OutputStream outputStream = null;
876 OutputStreamWriter out = null;
879 outputStream = conn.getOutputStream();
880 } catch (IOException e) {
881 shutdownConnection(conn);
882 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
885 out = new OutputStreamWriter(outputStream);
890 } catch (IOException e) {
891 shutdownConnection(conn);
892 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
896 private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
898 HttpURLConnection conn = null;
901 url = new URL(fullUrl);
902 } catch (MalformedURLException e) {
903 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
907 conn = (HttpURLConnection) url.openConnection();
908 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
909 conn.setDoOutput(true);
910 } catch (IOException e) {
911 shutdownConnection(conn);
912 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
918 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
919 throws DocumentStoreOperationException {
923 resultCode = conn.getResponseCode();
924 } catch (IOException e) {
925 shutdownConnection(conn);
926 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
929 logger.debug("Response Code : " + resultCode);
931 InputStream inputStream = null;
933 if (!isSuccessCode(resultCode)) {
934 inputStream = conn.getErrorStream();
937 inputStream = conn.getInputStream();
938 } catch (IOException e) {
939 shutdownConnection(conn);
940 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
944 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
945 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
947 StringBuilder result = new StringBuilder(128);
948 String string = null;
951 while ((string = bufferedreader.readLine()) != null) {
952 result.append(string).append("\n");
954 } catch (IOException e) {
955 shutdownConnection(conn);
956 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
959 if (resultCode == Status.CONFLICT.getStatusCode()) {
960 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
962 opResult.setResultCode(resultCode);
964 if (logger.isDebugEnabled()) {
965 logger.debug("Raw result string from ElasticSearch = " + result.toString());
967 opResult.setResult(result.toString());
968 opResult.setResultVersion(extractVersion(result.toString()));
971 private String extractVersion(String result) {
972 JSONParser parser = new JSONParser();
973 String version = null;
975 JSONObject root = (JSONObject) parser.parse(result);
976 if (root.get("_version") != null) {
977 version = root.get("_version").toString();
979 } catch (ParseException e) {
980 // Not all responses from ElasticSearch include a version, so
981 // if we don't get one back, just return an empty string rather
982 // than trigger a false failure.
989 * This convenience method gets the current system time and stores it in an attribute in the supplied
990 * {@link MdcOverride} object so that it can be used later by the metrics logger.
992 * @param override - The {@link MdcOverride} object to update.
993 * @return - The supplied {@link MdcOverride} object.
995 private MdcOverride getStartTime(MdcOverride override) {
997 // Grab the current time...
998 long startTimeInMs = System.currentTimeMillis();
1000 // ...and add it as an attribute to the supplied MDC Override
1002 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1003 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1005 // Return the MdcOverride object that we were passed.
1006 // This looks odd, but it allows us to do stuff like:
1008 // MdcOverride ov = getStartTime(new MdcOverride())
1010 // which is quite handy, but also allows us to pass in an existing
1011 // MdcOverride object which already has some attributes set.
1015 private boolean isSuccess(OperationResult result) {
1016 return isSuccessCode(result.getResultCode());
1019 private boolean isSuccessCode(int statusCode) {
1020 return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
1024 public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
1026 if (logger.isDebugEnabled()) {
1027 String dbgString = "ESController: performBulkOperations - Operations: ";
1029 for (BulkRequest request : requests) {
1030 dbgString += "[" + request.toString() + "] ";
1033 logger.debug(dbgString);
1036 // Grab the current time so we can use it to generate a metrics log.
1037 MdcOverride override = getStartTime(new MdcOverride());
1039 // Parse the supplied set of operations.
1040 // Iterate over the list of operations which we were provided and
1041 // translate them into a format that ElasticSearh understands.
1043 StringBuilder esOperationSet = new StringBuilder(128);
1044 List<ElasticSearchResultItem> rejected = new ArrayList<>();
1045 for (BulkRequest request : requests) {
1047 // Convert the request to the syntax ElasticSearch likes.
1048 if (buildEsOperation(request, esOperationSet, rejected)) {
1053 ElasticSearchBulkOperationResult opResult = null;
1056 // Open an HTTP connection to the ElasticSearch back end.
1057 String fullUrl = getFullUrl("/_bulk", false);
1059 HttpURLConnection conn;
1062 url = new URL(fullUrl);
1063 conn = (HttpURLConnection) url.openConnection();
1064 conn.setRequestMethod("PUT");
1065 conn.setDoOutput(true);
1066 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
1067 conn.setRequestProperty("Connection", "Close");
1069 } catch (IOException e) {
1071 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1072 if (logger.isDebugEnabled()) {
1073 logger.debug(Throwables.getStackTraceAsString(e));
1076 throw new DocumentStoreOperationException(
1077 "Failed to open connection to document store. Cause: " + e.getMessage(), e);
1080 StringBuilder bulkResult = new StringBuilder(128);
1082 // Create an output stream to write our request to.
1083 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1085 if (logger.isDebugEnabled()) {
1086 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1087 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1090 // Write the resulting request string to our output stream. (this sends the request to ES?)
1091 out.write(esOperationSet.toString());
1094 // Open an input stream on our connection in order to read back the results.
1095 InputStream is = conn.getInputStream();
1096 InputStreamReader inputstreamreader = new InputStreamReader(is);
1097 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1099 // Read the contents of the input stream into our result string...
1100 String esResponseString = null;
1102 while ((esResponseString = bufferedreader.readLine()) != null) {
1103 bulkResult.append(esResponseString).append("\n");
1106 } catch (IOException e) {
1108 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1109 if (logger.isDebugEnabled()) {
1110 StringWriter sw = new StringWriter();
1111 e.printStackTrace(new PrintWriter(sw));
1112 logger.debug(sw.toString());
1115 throw new DocumentStoreOperationException(
1116 "Failure interacting with document store. Cause: " + e.getMessage(), e);
1119 if (logger.isDebugEnabled()) {
1120 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1123 // ...and marshal the resulting string into a Java object.
1125 opResult = marshallEsBulkResult(bulkResult.toString());
1127 } catch (IOException e) {
1129 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1130 if (logger.isDebugEnabled()) {
1131 logger.debug(Throwables.getStackTraceAsString(e));
1134 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
1139 // Finally, build the operation result and return it to the caller.
1140 OperationResult result = new OperationResult();
1141 result.setResultCode(207);
1142 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1144 // In the success case we don't want the entire result string to be
1145 // dumped into the metrics log, so concatenate it.
1146 String resultStringForMetricsLog = result.getResult();
1147 if (isSuccess(result)) {
1148 resultStringForMetricsLog =
1149 resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1152 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1153 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1154 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1162 * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1164 * @param request - The request to be performed.
1165 * @param sb - The string builder to append the json data to
1166 * @throws DocumentStoreOperationException
1168 private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1169 throws DocumentStoreOperationException {
1171 boolean retVal = true;
1172 // What kind of operation are we performing?
1173 switch (request.getOperationType()) {
1175 // Create a new document.
1178 // Make sure that we were supplied a document payload.
1179 if (request.getOperation().getDocument() == null) {
1181 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1182 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1186 // Make sure that the supplied document URL is formatted
1188 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1189 fails.add(generateRejectionEntry(request.getOperationType(),
1190 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1191 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1195 // Validate that the specified index actually exists before we
1196 // try to perform the create.
1197 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1199 fails.add(generateRejectionEntry(request.getOperationType(),
1200 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1201 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1205 // If we were supplied an id for the new document, then
1206 // include it in the bulk operation to Elastic Search
1207 if (request.getId() == null) {
1209 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1211 // Otherwise, we just leave that parameter off and ElasticSearch
1212 // will generate one for us.
1214 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1219 // Append the document that we want to create.
1220 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1221 } catch (JsonProcessingException e) {
1222 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1227 // Update an existing document.
1230 // Make sure that we were supplied a document payload.
1231 if (request.getOperation().getDocument() == null) {
1233 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1234 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1238 // Make sure that the supplied document URL is formatted
1240 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1241 fails.add(generateRejectionEntry(request.getOperationType(),
1242 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1243 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1247 // Validate that the specified index actually exists before we
1248 // try to perform the update.
1249 if (!indexExists(request.getIndex())) {
1251 fails.add(generateRejectionEntry(request.getOperationType(),
1252 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1253 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1257 // Validate that the document we are trying to update actually
1258 // exists before we try to perform the update.
1259 if (!documentExists(request.getIndex(), request.getId())) {
1261 fails.add(generateRejectionEntry(request.getOperationType(),
1262 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1263 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1267 // It is mandatory that a version be supplied for an update operation,
1268 // so validate that now.
1269 if (request.getOperation().getMetaData().getEtag() == null) {
1271 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1272 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1276 // Generate the update request...
1277 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1278 request.getOperation().getMetaData().getEtag()));
1280 // ...and append the document that we want to update.
1282 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1283 } catch (JsonProcessingException e) {
1284 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1288 // Delete an existing document.
1291 // Make sure that the supplied document URL is formatted
1293 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1294 fails.add(generateRejectionEntry(request.getOperationType(),
1295 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1296 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1300 // Validate that the specified index actually exists before we
1301 // try to perform the delete.
1302 if (!indexExists(request.getIndex())) {
1304 fails.add(generateRejectionEntry(request.getOperationType(),
1305 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1306 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1310 // Validate that the document we are trying to update actually
1311 // exists before we try to perform the delete.
1312 if (!documentExists(request.getIndex(), request.getId())) {
1314 fails.add(generateRejectionEntry(request.getOperationType(),
1315 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1316 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1320 // It is mandatory that a version be supplied for a delete operation,
1321 // so validate that now.
1322 if (request.getOperation().getMetaData().getEtag() == null) {
1324 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1325 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1329 // Generate the delete request.
1330 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1331 request.getOperation().getMetaData().getEtag()));
1339 private boolean indexExists(String index) throws DocumentStoreOperationException {
1340 return isSuccess(checkIndexExistence(index));
1343 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1344 return isSuccess(checkDocumentExistence(index, id));
1348 * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1351 * @param rejectReason - A message describing why the operation was rejected.
1352 * @param anId - The identifier associated with the document being acted on.
1353 * @param statusCode - An HTTP status code.
1354 * @return - A result set item.
1356 private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1357 String anId, int statusCode, String originalUrl) {
1359 ElasticSearchError err = new ElasticSearchError();
1360 err.setReason(rejectReason);
1362 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1365 op.setStatus(statusCode);
1367 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1369 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1373 rejectionResult.setCreate(op);
1376 rejectionResult.setIndex(op);
1379 rejectionResult.setDelete(op);
1384 return rejectionResult;
1388 * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1389 * marshals it into a Java object.
1391 * @param jsonResult - The bulk operations response returned from ElasticSearch.
1392 * @return - The marshalled response.
1393 * @throws JsonParseException
1394 * @throws JsonMappingException
1395 * @throws IOException
1397 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1398 if (jsonResult != null) {
1399 if (logger.isDebugEnabled()) {
1400 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1403 ObjectMapper mapper = new ObjectMapper();
1404 mapper.setSerializationInclusion(Include.NON_EMPTY);
1406 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1413 * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1415 * @param esResult - ElasticSearch bulk operations response.
1416 * @return - A generic result set.
1418 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1419 List<ElasticSearchResultItem> rejectedOps) {
1421 int totalSuccess = 0;
1424 if (logger.isDebugEnabled()) {
1426 logger.debug("ESController: Build generic result set. ES Results: "
1427 + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1430 // Build a combined list of result items from the results returned
1431 // from ElasticSearch and the list of operations that we rejected
1432 // without sending to ElasticSearch.
1433 List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1434 if (esResult != null) {
1435 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1437 combinedResults.addAll(rejectedOps);
1439 // Iterate over the individual results in the resulting result set.
1440 StringBuilder resultsBuilder = new StringBuilder();
1441 AtomicBoolean firstItem = new AtomicBoolean(true);
1442 for (ElasticSearchResultItem item : combinedResults) {
1444 // Increment the operation counts.
1446 if (isSuccessCode(item.operationStatus().getStatus())) {
1452 // Prepend a comma to our response string unless this it the
1453 // first result in the set.
1454 if (!firstItem.compareAndSet(true, false)) {
1455 resultsBuilder.append(", ");
1458 // Append the current result as a generic json structure.
1459 resultsBuilder.append(item.toJson());
1462 return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1463 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1468 * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1470 * @param indexName - The index to look for.
1471 * @return - An operation result indicating the success or failure of the check.
1472 * @throws DocumentStoreOperationException
1474 public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1476 // Initialize operation result with a failure codes / fault string
1477 OperationResult opResult = new OperationResult();
1478 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
1480 // Grab the current time so we can use it to generate a metrics log.
1481 MdcOverride override = getStartTime(new MdcOverride());
1483 String fullUrl = getFullUrl("/" + indexName, false);
1484 HttpURLConnection conn = initializeConnection(fullUrl);
1487 conn.setRequestMethod("HEAD");
1489 } catch (ProtocolException e) {
1490 shutdownConnection(conn);
1491 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1494 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1498 resultCode = conn.getResponseCode();
1499 } catch (IOException e) {
1500 shutdownConnection(conn);
1501 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1503 logger.debug("Response Code : " + resultCode);
1505 opResult.setResultCode(resultCode);
1507 // Generate a metrics log so we can track how long the operation took.
1509 .info(SearchDbMsgs.CHECK_INDEX_TIME,
1510 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1511 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1512 override, indexName);
1514 shutdownConnection(conn);
1520 private void buildDocumentResult(DocumentOperationResult result, String index)
1521 throws DocumentStoreOperationException {
1523 JSONParser parser = new JSONParser();
1526 root = (JSONObject) parser.parse(result.getResult());
1527 if (isSuccess(result)) {
1528 // Success response object
1529 Document doc = new Document();
1530 doc.setEtag(result.getResultVersion());
1531 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1533 doc.setContent((JSONObject) root.get("_source"));
1534 result.setDocument(doc);
1537 // Error response object
1538 JSONObject error = (JSONObject) root.get("error");
1539 if (error != null) {
1540 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1544 } catch (Exception e) {
1545 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1551 private String buildDocumentResponseUrl(String index, String id) {
1552 return ApiUtils.buildDocumentUri(index, id);
1555 private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1557 JSONParser parser = new JSONParser();
1561 root = (JSONObject) parser.parse(result.getResult());
1562 if (isSuccess(result)) {
1563 JSONObject hits = (JSONObject) root.get("hits");
1564 JSONArray hitArray = (JSONArray) hits.get("hits");
1565 SearchHits searchHits = new SearchHits();
1566 searchHits.setTotalHits(hits.get("total").toString());
1567 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1569 for (int i = 0; i < hitArray.size(); i++) {
1570 JSONObject hit = (JSONObject) hitArray.get(i);
1571 SearchHit searchHit = new SearchHit();
1572 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1573 Document doc = new Document();
1574 if (hit.get("_version") != null) {
1575 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1579 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1580 doc.setContent((JSONObject) hit.get("_source"));
1581 searchHit.setDocument(doc);
1582 searchHitArray.add(searchHit);
1584 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1585 result.setSearchResult(searchHits);
1587 JSONObject aggregations = (JSONObject) root.get("aggregations");
1588 if (aggregations != null) {
1589 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1590 AggregationResults aggs = new AggregationResults();
1591 aggs.setAggregations(aggResults);
1592 result.setAggregationResult(aggs);
1597 JSONObject error = (JSONObject) root.get("error");
1598 if (error != null) {
1599 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1602 } catch (Exception e) {
1603 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1608 private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1609 JSONParser parser = new JSONParser();
1612 root = (JSONObject) parser.parse(result.getResult());
1613 if (isSuccess(result)) {
1614 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1615 JSONObject hitdata = (JSONObject) hitArray.get(0);
1616 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1617 SuggestHits suggestHits = new SuggestHits();
1618 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1620 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1622 for (int i = 0; i < optionsArray.size(); i++) {
1623 JSONObject hit = (JSONObject) optionsArray.get(i);
1625 SuggestHit suggestHit = new SuggestHit();
1626 suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1627 suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1628 Document doc = new Document();
1629 if (hit.get("_version") != null) {
1630 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1633 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1635 doc.setContent((JSONObject) hit.get("payload"));
1636 suggestHit.setDocument(doc);
1637 suggestHitArray.add(suggestHit);
1639 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1640 result.setSuggestResult(suggestHits);
1642 JSONObject aggregations = (JSONObject) root.get("aggregations");
1643 if (aggregations != null) {
1644 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1645 AggregationResults aggs = new AggregationResults();
1646 aggs.setAggregations(aggResults);
1647 result.setAggregationResult(aggs);
1652 JSONObject error = (JSONObject) root.get("error");
1653 if (error != null) {
1654 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1657 } catch (Exception e) {
1658 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());