2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import com.google.common.base.Throwables;
30 import java.io.BufferedReader;
32 import java.io.FileInputStream;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.InputStreamReader;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.io.PrintWriter;
39 import java.io.StringWriter;
40 import java.net.HttpURLConnection;
41 import java.net.MalformedURLException;
42 import java.net.ProtocolException;
44 import java.text.SimpleDateFormat;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.List;
48 import java.util.Properties;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import org.json.simple.JSONArray;
51 import org.json.simple.JSONObject;
52 import org.json.simple.parser.JSONParser;
53 import org.json.simple.parser.ParseException;
54 import org.onap.aai.cl.api.LogFields;
55 import org.onap.aai.cl.api.LogLine;
56 import org.onap.aai.cl.api.Logger;
57 import org.onap.aai.cl.eelf.LoggerFactory;
58 import org.onap.aai.cl.mdc.MdcContext;
59 import org.onap.aai.cl.mdc.MdcOverride;
60 import org.onap.aai.sa.rest.AnalysisConfiguration;
61 import org.onap.aai.sa.rest.ApiUtils;
62 import org.onap.aai.sa.rest.BulkRequest;
63 import org.onap.aai.sa.rest.BulkRequest.OperationType;
64 import org.onap.aai.sa.rest.DocumentSchema;
65 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
66 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
67 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
68 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
69 import org.onap.aai.sa.searchdbabstraction.entity.Document;
70 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
71 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
72 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
73 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
74 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
75 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
76 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
77 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
78 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
79 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
80 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
81 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
82 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
83 import org.springframework.http.HttpStatus;
86 * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
88 public class ElasticSearchHttpController implements DocumentStoreInterface {
90 private static ElasticSearchHttpController instance = null;
92 private static final Logger logger =
93 LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
94 private static final Logger metricsLogger =
95 LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
97 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
98 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
99 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
100 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
101 private static final String BULK_IMPORT_INDEX_TEMPLATE =
102 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
103 private static final String BULK_DELETE_TEMPLATE =
104 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
106 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
107 "Internal Error: ElasticSearch operation fault occurred";
108 private final ElasticSearchConfig config;
110 private static final String DEFAULT_TYPE = "default";
112 protected AnalysisConfiguration analysisConfig;
114 public static ElasticSearchHttpController getInstance() {
116 synchronized (ElasticSearchHttpController.class) {
118 if (instance == null) {
120 Properties properties = new Properties();
121 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
123 properties.load(new FileInputStream(file));
124 } catch (Exception e) {
125 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
126 e.getLocalizedMessage());
129 ElasticSearchConfig config = new ElasticSearchConfig(properties);
130 instance = new ElasticSearchHttpController(config);
137 public ElasticSearchHttpController(ElasticSearchConfig config) {
138 this.config = config;
139 analysisConfig = new AnalysisConfiguration();
142 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
144 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
145 } catch (Exception e) {
146 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
152 public AnalysisConfiguration getAnalysisConfig() {
153 return analysisConfig;
157 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
159 OperationResult result = new OperationResult();
160 result.setResultCode(500);
164 // Submit the request to ElasticSearch to create the index using a
165 // default document type.
166 result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
167 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
169 // ElasticSearch will return us a 200 code on success when we
170 // want to report a 201, so translate the result here.
171 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
172 if (isSuccess(result)) {
173 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
176 } catch (DocumentStoreOperationException | IOException e) {
178 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
185 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
186 OperationResult result = new OperationResult();
187 result.setResultCode(500);
190 result = createTable(index, dynamicSchema);
192 // ElasticSearch will return us a 200 code on success when we
193 // want to report a 201, so translate the result here.
194 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
195 if (isSuccess(result)) {
196 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
198 } catch (DocumentStoreOperationException e) {
199 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
207 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
209 // Initialize operation result with a failure codes / fault string
210 OperationResult opResult = new OperationResult();
211 opResult.setResultCode(500);
212 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
214 // Grab the current time so we can use it to generate a metrics log.
215 MdcOverride override = getStartTime(new MdcOverride());
217 String fullUrl = getFullUrl("/" + indexName + "/", false);
218 HttpURLConnection conn = initializeConnection(fullUrl);
220 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
223 conn.setRequestMethod("DELETE");
224 } catch (ProtocolException e) {
225 shutdownConnection(conn);
226 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
229 handleResponse(conn, opResult);
231 // Generate a metrics log so we can track how long the operation took.
233 .info(SearchDbMsgs.DELETE_INDEX_TIME,
234 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
235 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
236 override, indexName);
238 shutdownConnection(conn);
244 private OperationResult checkConnection() throws Exception {
246 String fullUrl = getFullUrl("/_cluster/health", false);
248 HttpURLConnection conn = null;
250 url = new URL(fullUrl);
251 conn = (HttpURLConnection) url.openConnection();
252 conn.setRequestMethod("GET");
253 conn.setDoOutput(true);
254 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
256 int resultCode = conn.getResponseCode();
257 logger.debug("getClusterHealth() response Code : " + resultCode);
258 OperationResult opResult = new OperationResult();
259 opResult.setResultCode(resultCode);
261 shutdownConnection(conn);
266 private String getFullUrl(String resourceUrl, boolean isSecure) {
268 final String host = config.getIpAddress();
269 final String port = config.getHttpPort();
272 return String.format("https://%s:%s%s", host, port, resourceUrl);
274 return String.format("http://%s:%s%s", host, port, resourceUrl);
278 private void shutdownConnection(HttpURLConnection connection) {
279 if (connection == null) {
283 InputStream inputstream = null;
284 OutputStream outputstream = null;
287 inputstream = connection.getInputStream();
288 } catch (IOException e) {
289 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
291 if (inputstream != null) {
294 } catch (IOException e) {
295 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
296 e.getLocalizedMessage());
302 outputstream = connection.getOutputStream();
303 } catch (IOException e) {
304 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
306 if (outputstream != null) {
308 outputstream.close();
309 } catch (IOException e) {
310 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
311 e.getLocalizedMessage());
316 connection.disconnect();
320 protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
321 throws DocumentStoreOperationException {
323 if (indexSettings == null) {
324 logger.debug("No settings provided.");
327 if (indexMappings == null) {
328 logger.debug("No mappings provided.");
331 OperationResult opResult = new OperationResult();
333 // Initialize operation result with a failure codes / fault string
334 opResult.setResultCode(500);
335 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
337 // Grab the current time so we can use it to generate a metrics log.
338 MdcOverride override = getStartTime(new MdcOverride());
340 String fullUrl = getFullUrl("/" + indexName + "/", false);
341 HttpURLConnection conn = initializeConnection(fullUrl);
344 conn.setRequestMethod("PUT");
345 } catch (ProtocolException e) {
346 shutdownConnection(conn);
347 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
350 StringBuilder sb = new StringBuilder(128);
351 sb.append("{ \"settings\" : ");
352 sb.append(indexSettings);
355 sb.append("\"mappings\" : {");
356 sb.append("\"" + typeName + "\" :");
357 sb.append(indexMappings);
361 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
362 } catch (IOException e) {
363 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
364 throw new DocumentStoreOperationException(e.getMessage(), e);
367 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
368 logger.debug("Request content: " + sb.toString());
370 handleResponse(conn, opResult);
372 shutdownConnection(conn);
374 // Generate a metrics log so we can track how long the operation took.
376 .info(SearchDbMsgs.CREATE_INDEX_TIME,
377 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
378 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
379 override, indexName);
385 * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
388 * @param indexName - The name of the index to be created
389 * @param settingsAndMappings - The actual JSON object that will define the index
390 * @return - The operation result of writing into Elasticsearch
391 * @throws DocumentStoreOperationException
393 protected OperationResult createTable(String indexName, String settingsAndMappings)
394 throws DocumentStoreOperationException {
395 OperationResult result = new OperationResult();
396 result.setResultCode(500);
397 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
399 // Grab the current time so we can use it to generate a metrics log.
400 MdcOverride override = getStartTime(new MdcOverride());
402 String fullUrl = getFullUrl("/" + indexName + "/", false);
403 HttpURLConnection conn = initializeConnection(fullUrl);
406 conn.setRequestMethod("PUT");
407 } catch (ProtocolException e) {
408 shutdownConnection(conn);
409 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
413 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
414 } catch (IOException e) {
415 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
416 throw new DocumentStoreOperationException(e.getMessage());
418 handleResponse(conn, result);
420 // Generate a metrics log so we can track how long the operation took.
422 .info(SearchDbMsgs.CREATE_INDEX_TIME,
423 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
424 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
425 override, indexName);
431 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
432 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
434 if (!allowImplicitIndexCreation) {
436 // Before we do anything, make sure that the specified index actually exists in the
437 // document store - we don't want to rely on ElasticSearch to fail the document
438 // create because it could be configured to implicitly create a non-existent index,
439 // which can lead to hard-to-debug behaviour with queries down the road.
440 OperationResult indexExistsResult = checkIndexExistence(indexName);
441 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
443 DocumentOperationResult opResult = new DocumentOperationResult();
444 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
445 opResult.setResult("Document Index '" + indexName + "' does not exist.");
446 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
451 if (document.getId() == null || document.getId().isEmpty()) {
452 return createDocumentWithoutId(indexName, document);
454 return createDocumentWithId(indexName, document);
458 private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
459 throws DocumentStoreOperationException {
460 // check if the document already exists
461 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
464 if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
465 if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
466 opResult.setFailureCause("A document with the same id already exists.");
468 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
470 opResult.setResultCode(HttpStatus.CONFLICT.value());
474 opResult = new DocumentOperationResult();
475 // Initialize operation result with a failure codes / fault string
476 opResult.setResultCode(500);
477 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
479 // Grab the current time so we can use it to generate a metrics log.
480 MdcOverride override = getStartTime(new MdcOverride());
482 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
483 HttpURLConnection conn = initializeConnection(fullUrl);
486 conn.setRequestMethod("PUT");
487 } catch (ProtocolException e) {
488 shutdownConnection(conn);
489 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
492 attachDocument(conn, document);
494 logger.debug("Sending 'PUT' request to: " + conn.getURL());
496 handleResponse(conn, opResult);
497 buildDocumentResult(opResult, indexName);
499 // Generate a metrics log so we can track how long the operation took.
501 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
502 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
503 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
504 override, indexName);
506 shutdownConnection(conn);
512 private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
513 throws DocumentStoreOperationException {
515 DocumentOperationResult response = new DocumentOperationResult();
516 // Initialize operation result with a failure codes / fault string
517 response.setResultCode(500);
518 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
520 // Grab the current time so we can use it to generate a metrics log.
521 MdcOverride override = getStartTime(new MdcOverride());
523 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
524 HttpURLConnection conn = initializeConnection(fullUrl);
527 conn.setRequestMethod("POST");
528 } catch (ProtocolException e) {
529 shutdownConnection(conn);
530 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
533 attachDocument(conn, document);
535 logger.debug("Sending 'POST' request to: " + conn.getURL());
537 handleResponse(conn, response);
538 buildDocumentResult(response, indexName);
540 // Generate a metrics log so we can track how long the operation took.
542 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
543 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
544 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
545 override, indexName);
547 shutdownConnection(conn);
552 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
553 throws DocumentStoreOperationException {
554 conn.setRequestProperty("Connection", "Close");
555 attachContent(conn, doc.getContentInJson());
558 private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
559 throws DocumentStoreOperationException {
560 DocumentOperationResult opResult = new DocumentOperationResult();
562 // Initialize operation result with a failure codes / fault string
563 opResult.setResultCode(500);
565 // Grab the current time so we can use it to generate a metrics log.
566 MdcOverride override = getStartTime(new MdcOverride());
568 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
569 HttpURLConnection conn = initializeConnection(fullUrl);
572 conn.setRequestMethod("HEAD");
573 } catch (ProtocolException e) {
574 shutdownConnection(conn);
575 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
578 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
582 resultCode = conn.getResponseCode();
583 } catch (IOException e) {
584 shutdownConnection(conn);
585 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
588 logger.debug("Response Code : " + resultCode);
590 opResult.setResultCode(resultCode);
592 // Generate a metrics log so we can track how long the operation took.
594 .info(SearchDbMsgs.GET_DOCUMENT_TIME,
595 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
596 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
597 override, indexName, docId);
599 shutdownConnection(conn);
605 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
606 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
608 if (!allowImplicitIndexCreation) {
609 // Before we do anything, make sure that the specified index actually exists in the
610 // document store - we don't want to rely on ElasticSearch to fail the document
611 // create because it could be configured to implicitly create a non-existent index,
612 // which can lead to hard-to-debug behaviour with queries down the road.
613 OperationResult indexExistsResult = checkIndexExistence(indexName);
614 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
616 DocumentOperationResult opResult = new DocumentOperationResult();
617 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
618 opResult.setResult("Document Index '" + indexName + "' does not exist.");
619 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
624 DocumentOperationResult opResult = new DocumentOperationResult();
626 // Initialize operation result with a failure codes / fault string
627 opResult.setResultCode(500);
628 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
630 // Grab the current time so we can use it to generate a metrics log.
631 MdcOverride override = getStartTime(new MdcOverride());
633 String fullUrl = getFullUrl(
634 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
636 HttpURLConnection conn = initializeConnection(fullUrl);
639 conn.setRequestMethod("PUT");
640 } catch (ProtocolException e) {
641 shutdownConnection(conn);
642 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
645 attachDocument(conn, document);
647 logger.debug("Sending 'PUT' request to: " + conn.getURL());
649 handleResponse(conn, opResult);
650 buildDocumentResult(opResult, indexName);
652 // Generate a metrics log so we can track how long the operation took.
653 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
654 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
655 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
656 override, indexName, document.getId());
658 shutdownConnection(conn);
664 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
665 throws DocumentStoreOperationException {
666 DocumentOperationResult opResult = new DocumentOperationResult();
668 // Initialize operation result with a failure codes / fault string
669 opResult.setResultCode(500);
670 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
672 // Grab the current time so we can use it to generate a metrics log.
673 MdcOverride override = getStartTime(new MdcOverride());
675 String fullUrl = getFullUrl(
676 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
678 HttpURLConnection conn = initializeConnection(fullUrl);
681 conn.setRequestMethod("DELETE");
682 } catch (ProtocolException e) {
683 shutdownConnection(conn);
684 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
687 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
689 handleResponse(conn, opResult);
690 buildDocumentResult(opResult, indexName);
691 // supress the etag and url in response for delete as they are not required
692 if (opResult.getDocument() != null) {
693 opResult.getDocument().setEtag(null);
694 opResult.getDocument().setUrl(null);
697 // Generate a metrics log so we can track how long the operation took.
698 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
699 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
700 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
701 override, indexName, document.getId());
703 shutdownConnection(conn);
709 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
710 throws DocumentStoreOperationException {
711 DocumentOperationResult opResult = new DocumentOperationResult();
713 // Initialize operation result with a failure codes / fault string
714 opResult.setResultCode(500);
715 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
717 // Grab the current time so we can use it to generate a metrics log.
718 MdcOverride override = getStartTime(new MdcOverride());
720 String fullUrl = null;
721 if (document.getVersion() == null) {
722 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
724 fullUrl = getFullUrl(
725 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
728 HttpURLConnection conn = initializeConnection(fullUrl);
730 logger.debug("\nSending 'GET' request to: " + conn.getURL());
732 handleResponse(conn, opResult);
733 buildDocumentResult(opResult, indexName);
735 // Generate a metrics log so we can track how long the operation took.
736 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
737 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
738 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
739 override, indexName, document.getId());
741 shutdownConnection(conn);
747 public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
748 SearchOperationResult opResult = new SearchOperationResult();
750 // Initialize operation result with a failure codes / fault string
751 opResult.setResultCode(500);
752 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
754 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
756 // Grab the current time so we can use it to generate a metrics log.
757 MdcOverride override = getStartTime(new MdcOverride());
759 HttpURLConnection conn = initializeConnection(fullUrl);
762 conn.setRequestMethod("GET");
763 } catch (ProtocolException e) {
764 shutdownConnection(conn);
765 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
768 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
770 handleResponse(conn, opResult);
771 buildSearchResult(opResult, indexName);
774 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
775 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
776 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
777 override, indexName, queryString);
783 public SearchOperationResult searchWithPayload(String indexName, String query)
784 throws DocumentStoreOperationException {
785 SearchOperationResult opResult = new SearchOperationResult();
787 if (logger.isDebugEnabled()) {
788 logger.debug("Querying index: " + indexName + " with query string: " + query);
791 // Initialize operation result with a failure codes / fault string
792 opResult.setResultCode(500);
793 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
795 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
797 // Grab the current time so we can use it to generate a metrics log.
798 MdcOverride override = getStartTime(new MdcOverride());
800 HttpURLConnection conn = initializeConnection(fullUrl);
803 conn.setRequestMethod("POST");
804 } catch (ProtocolException e) {
805 shutdownConnection(conn);
806 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
809 attachContent(conn, query);
811 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
812 logger.debug("Request body = Elasticsearch query = " + query);
814 handleResponse(conn, opResult);
815 buildSearchResult(opResult, indexName);
818 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
819 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
820 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
821 override, indexName, query);
823 shutdownConnection(conn);
830 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
831 throws DocumentStoreOperationException {
833 SearchOperationResult opResult = new SearchOperationResult();
835 if (logger.isDebugEnabled()) {
836 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
839 // Initialize operation result with a failure codes / fault string
840 opResult.setResultCode(500);
841 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
843 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
845 // Grab the current time so we can use it to generate a metrics log.
846 MdcOverride override = getStartTime(new MdcOverride());
848 HttpURLConnection conn = initializeConnection(fullUrl);
851 conn.setRequestMethod("POST");
852 } catch (ProtocolException e) {
853 shutdownConnection(conn);
854 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
857 attachContent(conn, query);
859 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
860 logger.debug("Request body = Elasticsearch query = " + query);
862 handleResponse(conn, opResult);
863 buildSuggestResult(opResult, indexName);
866 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
867 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
868 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
869 override, indexName, query);
871 shutdownConnection(conn);
876 private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
877 OutputStream outputStream = null;
878 OutputStreamWriter out = null;
881 outputStream = conn.getOutputStream();
882 } catch (IOException e) {
883 shutdownConnection(conn);
884 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
887 out = new OutputStreamWriter(outputStream);
892 } catch (IOException e) {
893 shutdownConnection(conn);
894 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
898 private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
900 HttpURLConnection conn = null;
903 url = new URL(fullUrl);
904 } catch (MalformedURLException e) {
905 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
909 conn = (HttpURLConnection) url.openConnection();
910 conn.setRequestProperty("Content-Type", "application/json");
911 conn.setDoOutput(true);
912 } catch (IOException e) {
913 shutdownConnection(conn);
914 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
920 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
921 throws DocumentStoreOperationException {
922 int resultCode = 200;
925 resultCode = conn.getResponseCode();
926 } catch (IOException e) {
927 shutdownConnection(conn);
928 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
931 logger.debug("Response Code : " + resultCode);
933 InputStream inputStream = null;
935 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
936 inputStream = conn.getErrorStream();
939 inputStream = conn.getInputStream();
940 } catch (IOException e) {
941 shutdownConnection(conn);
942 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
946 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
947 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
949 StringBuilder result = new StringBuilder(128);
950 String string = null;
953 while ((string = bufferedreader.readLine()) != null) {
954 result.append(string).append("\n");
956 } catch (IOException e) {
957 shutdownConnection(conn);
958 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
961 if (resultCode == HttpStatus.CONFLICT.value()) {
962 opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
964 opResult.setResultCode(resultCode);
966 if (logger.isDebugEnabled()) {
967 logger.debug("Raw result string from ElasticSearch = " + result.toString());
969 opResult.setResult(result.toString());
970 opResult.setResultVersion(extractVersion(result.toString()));
973 private String extractVersion(String result) {
974 JSONParser parser = new JSONParser();
975 String version = null;
977 JSONObject root = (JSONObject) parser.parse(result);
978 if (root.get("_version") != null) {
979 version = root.get("_version").toString();
981 } catch (ParseException e) {
982 // Not all responses from ElasticSearch include a version, so
983 // if we don't get one back, just return an empty string rather
984 // than trigger a false failure.
991 * This convenience method gets the current system time and stores it in an attribute in the supplied
992 * {@link MdcOverride} object so that it can be used later by the metrics logger.
994 * @param override - The {@link MdcOverride} object to update.
995 * @return - The supplied {@link MdcOverride} object.
997 private MdcOverride getStartTime(MdcOverride override) {
999 // Grab the current time...
1000 long startTimeInMs = System.currentTimeMillis();
1002 // ...and add it as an attribute to the supplied MDC Override
1004 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1005 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1007 // Return the MdcOverride object that we were passed.
1008 // This looks odd, but it allows us to do stuff like:
1010 // MdcOverride ov = getStartTime(new MdcOverride())
1012 // which is quite handy, but also allows us to pass in an existing
1013 // MdcOverride object which already has some attributes set.
1017 private boolean isSuccess(OperationResult result) {
1019 return isSuccessCode(result.getResultCode());
1023 private boolean isSuccessCode(int statusCode) {
1024 return ((statusCode >= 200) && (statusCode < 300));
1029 public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
1031 if (logger.isDebugEnabled()) {
1032 String dbgString = "ESController: performBulkOperations - Operations: ";
1034 for (BulkRequest request : requests) {
1035 dbgString += "[" + request.toString() + "] ";
1038 logger.debug(dbgString);
1041 // Grab the current time so we can use it to generate a metrics log.
1042 MdcOverride override = getStartTime(new MdcOverride());
1044 // Parse the supplied set of operations.
1045 // Iterate over the list of operations which we were provided and
1046 // translate them into a format that ElasticSearh understands.
1048 StringBuilder esOperationSet = new StringBuilder(128);
1049 List<ElasticSearchResultItem> rejected = new ArrayList<>();
1050 for (BulkRequest request : requests) {
1052 // Convert the request to the syntax ElasticSearch likes.
1053 if (buildEsOperation(request, esOperationSet, rejected)) {
1058 ElasticSearchBulkOperationResult opResult = null;
1061 // Open an HTTP connection to the ElasticSearch back end.
1062 String fullUrl = getFullUrl("/_bulk", false);
1064 HttpURLConnection conn;
1067 url = new URL(fullUrl);
1068 conn = (HttpURLConnection) url.openConnection();
1069 conn.setRequestMethod("PUT");
1070 conn.setDoOutput(true);
1071 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1072 conn.setRequestProperty("Connection", "Close");
1074 } catch (IOException e) {
1076 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1077 if (logger.isDebugEnabled()) {
1078 logger.debug(Throwables.getStackTraceAsString(e));
1081 throw new DocumentStoreOperationException(
1082 "Failed to open connection to document store. Cause: " + e.getMessage(), e);
1085 StringBuilder bulkResult = new StringBuilder(128);
1087 // Create an output stream to write our request to.
1088 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1090 if (logger.isDebugEnabled()) {
1091 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1092 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1095 // Write the resulting request string to our output stream. (this sends the request to ES?)
1096 out.write(esOperationSet.toString());
1099 // Open an input stream on our connection in order to read back the results.
1100 InputStream is = conn.getInputStream();
1101 InputStreamReader inputstreamreader = new InputStreamReader(is);
1102 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1104 // Read the contents of the input stream into our result string...
1105 String esResponseString = null;
1107 while ((esResponseString = bufferedreader.readLine()) != null) {
1108 bulkResult.append(esResponseString).append("\n");
1111 } catch (IOException e) {
1113 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1114 if (logger.isDebugEnabled()) {
1115 StringWriter sw = new StringWriter();
1116 e.printStackTrace(new PrintWriter(sw));
1117 logger.debug(sw.toString());
1120 throw new DocumentStoreOperationException(
1121 "Failure interacting with document store. Cause: " + e.getMessage(), e);
1124 if (logger.isDebugEnabled()) {
1125 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1128 // ...and marshal the resulting string into a Java object.
1130 opResult = marshallEsBulkResult(bulkResult.toString());
1132 } catch (IOException e) {
1134 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1135 if (logger.isDebugEnabled()) {
1136 logger.debug(Throwables.getStackTraceAsString(e));
1139 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
1144 // Finally, build the operation result and return it to the caller.
1145 OperationResult result = new OperationResult();
1146 result.setResultCode(207);
1147 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1149 // In the success case we don't want the entire result string to be
1150 // dumped into the metrics log, so concatenate it.
1151 String resultStringForMetricsLog = result.getResult();
1152 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1153 resultStringForMetricsLog =
1154 resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1157 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1158 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1159 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1167 * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1169 * @param request - The request to be performed.
1170 * @param sb - The string builder to append the json data to
1171 * @throws DocumentStoreOperationException
1173 private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1174 throws DocumentStoreOperationException {
1176 boolean retVal = true;
1177 // What kind of operation are we performing?
1178 switch (request.getOperationType()) {
1180 // Create a new document.
1183 // Make sure that we were supplied a document payload.
1184 if (request.getOperation().getDocument() == null) {
1186 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1187 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1191 // Make sure that the supplied document URL is formatted
1193 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1194 fails.add(generateRejectionEntry(request.getOperationType(),
1195 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1196 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1200 // Validate that the specified index actually exists before we
1201 // try to perform the create.
1202 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1204 fails.add(generateRejectionEntry(request.getOperationType(),
1205 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1206 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1210 // If we were supplied an id for the new document, then
1211 // include it in the bulk operation to Elastic Search
1212 if (request.getId() == null) {
1214 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1216 // Otherwise, we just leave that parameter off and ElasticSearch
1217 // will generate one for us.
1219 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1224 // Append the document that we want to create.
1225 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1226 } catch (JsonProcessingException e) {
1227 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1232 // Update an existing document.
1235 // Make sure that we were supplied a document payload.
1236 if (request.getOperation().getDocument() == null) {
1238 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1239 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1243 // Make sure that the supplied document URL is formatted
1245 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1246 fails.add(generateRejectionEntry(request.getOperationType(),
1247 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1248 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1252 // Validate that the specified index actually exists before we
1253 // try to perform the update.
1254 if (!indexExists(request.getIndex())) {
1256 fails.add(generateRejectionEntry(request.getOperationType(),
1257 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1258 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1262 // Validate that the document we are trying to update actually
1263 // exists before we try to perform the update.
1264 if (!documentExists(request.getIndex(), request.getId())) {
1266 fails.add(generateRejectionEntry(request.getOperationType(),
1267 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1268 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1272 // It is mandatory that a version be supplied for an update operation,
1273 // so validate that now.
1274 if (request.getOperation().getMetaData().getEtag() == null) {
1276 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1277 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1281 // Generate the update request...
1282 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1283 request.getOperation().getMetaData().getEtag()));
1285 // ...and append the document that we want to update.
1287 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1288 } catch (JsonProcessingException e) {
1289 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1293 // Delete an existing document.
1296 // Make sure that the supplied document URL is formatted
1298 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1299 fails.add(generateRejectionEntry(request.getOperationType(),
1300 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1301 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1305 // Validate that the specified index actually exists before we
1306 // try to perform the delete.
1307 if (!indexExists(request.getIndex())) {
1309 fails.add(generateRejectionEntry(request.getOperationType(),
1310 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1311 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1315 // Validate that the document we are trying to update actually
1316 // exists before we try to perform the delete.
1317 if (!documentExists(request.getIndex(), request.getId())) {
1319 fails.add(generateRejectionEntry(request.getOperationType(),
1320 "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1321 request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1325 // It is mandatory that a version be supplied for a delete operation,
1326 // so validate that now.
1327 if (request.getOperation().getMetaData().getEtag() == null) {
1329 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1330 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1334 // Generate the delete request.
1335 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1336 request.getOperation().getMetaData().getEtag()));
1344 private boolean indexExists(String index) throws DocumentStoreOperationException {
1346 OperationResult indexExistsResult = checkIndexExistence(index);
1348 return ((indexExistsResult.getResultCode() >= 200) && (indexExistsResult.getResultCode() < 300));
1351 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1353 OperationResult docExistsResult = checkDocumentExistence(index, id);
1355 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1359 * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1362 * @param rejectReason - A message describing why the operation was rejected.
1363 * @param anId - The identifier associated with the document being acted on.
1364 * @param statusCode - An HTTP status code.
1365 * @return - A result set item.
1367 private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1368 String anId, int statusCode, String originalUrl) {
1370 ElasticSearchError err = new ElasticSearchError();
1371 err.setReason(rejectReason);
1373 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1376 op.setStatus(statusCode);
1378 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1380 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1384 rejectionResult.setCreate(op);
1387 rejectionResult.setIndex(op);
1390 rejectionResult.setDelete(op);
1395 return rejectionResult;
1399 * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1400 * marshals it into a Java object.
1402 * @param jsonResult - The bulk operations response returned from ElasticSearch.
1403 * @return - The marshalled response.
1404 * @throws JsonParseException
1405 * @throws JsonMappingException
1406 * @throws IOException
1408 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1409 if (jsonResult != null) {
1410 if (logger.isDebugEnabled()) {
1411 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1414 ObjectMapper mapper = new ObjectMapper();
1415 mapper.setSerializationInclusion(Include.NON_EMPTY);
1417 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1424 * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1426 * @param esResult - ElasticSearch bulk operations response.
1427 * @return - A generic result set.
1429 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1430 List<ElasticSearchResultItem> rejectedOps) {
1432 int totalSuccess = 0;
1435 if (logger.isDebugEnabled()) {
1437 logger.debug("ESController: Build generic result set. ES Results: "
1438 + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1441 // Build a combined list of result items from the results returned
1442 // from ElasticSearch and the list of operations that we rejected
1443 // without sending to ElasticSearch.
1444 List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1445 if (esResult != null) {
1446 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1448 combinedResults.addAll(rejectedOps);
1450 // Iterate over the individual results in the resulting result set.
1451 StringBuilder resultsBuilder = new StringBuilder();
1452 AtomicBoolean firstItem = new AtomicBoolean(true);
1453 for (ElasticSearchResultItem item : combinedResults) {
1455 // Increment the operation counts.
1457 if (isSuccessCode(item.operationStatus().getStatus())) {
1463 // Prepend a comma to our response string unless this it the
1464 // first result in the set.
1465 if (!firstItem.compareAndSet(true, false)) {
1466 resultsBuilder.append(", ");
1469 // Append the current result as a generic json structure.
1470 resultsBuilder.append(item.toJson());
1473 return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1474 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1479 * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1481 * @param indexName - The index to look for.
1482 * @return - An operation result indicating the success or failure of the check.
1483 * @throws DocumentStoreOperationException
1485 public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1487 // Initialize operation result with a failure codes / fault string
1488 OperationResult opResult = new OperationResult();
1489 opResult.setResultCode(500);
1491 // Grab the current time so we can use it to generate a metrics log.
1492 MdcOverride override = getStartTime(new MdcOverride());
1494 String fullUrl = getFullUrl("/" + indexName, false);
1495 HttpURLConnection conn = initializeConnection(fullUrl);
1498 conn.setRequestMethod("HEAD");
1500 } catch (ProtocolException e) {
1501 shutdownConnection(conn);
1502 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1505 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1509 resultCode = conn.getResponseCode();
1510 } catch (IOException e) {
1511 shutdownConnection(conn);
1512 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1514 logger.debug("Response Code : " + resultCode);
1516 opResult.setResultCode(resultCode);
1518 // Generate a metrics log so we can track how long the operation took.
1520 .info(SearchDbMsgs.CHECK_INDEX_TIME,
1521 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1522 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1523 override, indexName);
1525 shutdownConnection(conn);
1531 private void buildDocumentResult(DocumentOperationResult result, String index)
1532 throws DocumentStoreOperationException {
1534 JSONParser parser = new JSONParser();
1537 root = (JSONObject) parser.parse(result.getResult());
1539 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1540 // Success response object
1541 Document doc = new Document();
1542 doc.setEtag(result.getResultVersion());
1543 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1545 doc.setContent((JSONObject) root.get("_source"));
1546 result.setDocument(doc);
1549 // Error response object
1550 JSONObject error = (JSONObject) root.get("error");
1551 if (error != null) {
1552 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1556 } catch (Exception e) {
1557 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1563 private String buildDocumentResponseUrl(String index, String id) {
1564 return ApiUtils.buildDocumentUri(index, id);
1567 private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1569 JSONParser parser = new JSONParser();
1573 root = (JSONObject) parser.parse(result.getResult());
1574 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1575 JSONObject hits = (JSONObject) root.get("hits");
1576 JSONArray hitArray = (JSONArray) hits.get("hits");
1577 SearchHits searchHits = new SearchHits();
1578 searchHits.setTotalHits(hits.get("total").toString());
1579 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1581 for (int i = 0; i < hitArray.size(); i++) {
1582 JSONObject hit = (JSONObject) hitArray.get(i);
1583 SearchHit searchHit = new SearchHit();
1584 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1585 Document doc = new Document();
1586 if (hit.get("_version") != null) {
1587 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1591 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1592 doc.setContent((JSONObject) hit.get("_source"));
1593 searchHit.setDocument(doc);
1594 searchHitArray.add(searchHit);
1596 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1597 result.setSearchResult(searchHits);
1599 JSONObject aggregations = (JSONObject) root.get("aggregations");
1600 if (aggregations != null) {
1601 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1602 AggregationResults aggs = new AggregationResults();
1603 aggs.setAggregations(aggResults);
1604 result.setAggregationResult(aggs);
1609 JSONObject error = (JSONObject) root.get("error");
1610 if (error != null) {
1611 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1614 } catch (Exception e) {
1615 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1620 private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1621 JSONParser parser = new JSONParser();
1624 root = (JSONObject) parser.parse(result.getResult());
1625 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1626 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1627 JSONObject hitdata = (JSONObject) hitArray.get(0);
1628 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1629 SuggestHits suggestHits = new SuggestHits();
1630 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1632 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1634 for (int i = 0; i < optionsArray.size(); i++) {
1635 JSONObject hit = (JSONObject) optionsArray.get(i);
1637 SuggestHit suggestHit = new SuggestHit();
1638 suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1639 suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1640 Document doc = new Document();
1641 if (hit.get("_version") != null) {
1642 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1645 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1647 doc.setContent((JSONObject) hit.get("payload"));
1648 suggestHit.setDocument(doc);
1649 suggestHitArray.add(suggestHit);
1651 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1652 result.setSuggestResult(suggestHits);
1654 JSONObject aggregations = (JSONObject) root.get("aggregations");
1655 if (aggregations != null) {
1656 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1657 AggregationResults aggs = new AggregationResults();
1658 aggs.setAggregations(aggResults);
1659 result.setAggregationResult(aggs);
1664 JSONObject error = (JSONObject) root.get("error");
1665 if (error != null) {
1666 result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1669 } catch (Exception e) {
1670 throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());