2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23 import com.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import org.json.simple.JSONArray;
30 import org.json.simple.JSONObject;
31 import org.json.simple.parser.JSONParser;
32 import org.json.simple.parser.ParseException;
33 import org.onap.aai.sa.rest.AnalysisConfiguration;
34 import org.onap.aai.sa.rest.ApiUtils;
35 import org.onap.aai.sa.rest.BulkRequest;
36 import org.onap.aai.sa.rest.BulkRequest.OperationType;
37 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
39 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
41 import org.onap.aai.sa.searchdbabstraction.entity.Document;
42 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
48 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
49 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
50 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
52 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
53 import org.onap.aai.cl.api.LogFields;
54 import org.onap.aai.cl.api.LogLine;
55 import org.onap.aai.cl.api.Logger;
56 import org.onap.aai.cl.eelf.LoggerFactory;
57 import org.onap.aai.cl.mdc.MdcContext;
58 import org.onap.aai.cl.mdc.MdcOverride;
59 import org.onap.aai.sa.rest.DocumentSchema;
60 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
61 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
63 import java.io.BufferedReader;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.io.InputStreamReader;
69 import java.io.OutputStream;
70 import java.io.OutputStreamWriter;
71 import java.io.PrintWriter;
72 import java.io.StringWriter;
73 import java.net.HttpURLConnection;
74 import java.net.MalformedURLException;
75 import java.net.ProtocolException;
77 import java.text.SimpleDateFormat;
78 import java.util.ArrayList;
79 import java.util.Arrays;
80 import java.util.List;
81 import java.util.Properties;
82 import java.util.concurrent.atomic.AtomicBoolean;
84 import org.springframework.http.HttpStatus;
89 * This class has the Elasticsearch implementation of the
90 * DB operations defined in DocumentStoreInterface.
92 public class ElasticSearchHttpController implements DocumentStoreInterface {
94 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
95 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
96 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
97 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
98 private static final String BULK_IMPORT_INDEX_TEMPLATE =
99 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
100 private static final String BULK_DELETE_TEMPLATE =
101 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
103 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
104 "Internal Error: ElasticSearch operation fault occurred";
105 private static final Logger logger = LoggerFactory.getInstance()
106 .getLogger(ElasticSearchHttpController.class.getName());
107 private static final Logger metricsLogger = LoggerFactory.getInstance()
108 .getMetricsLogger(ElasticSearchHttpController.class.getName());
109 private final ElasticSearchConfig config;
111 private static final String DEFAULT_TYPE = "default";
113 private static ElasticSearchHttpController instance = null;
115 protected AnalysisConfiguration analysisConfig;
117 public static ElasticSearchHttpController getInstance() {
119 synchronized (ElasticSearchHttpController.class) {
121 if (instance == null) {
123 Properties properties = new Properties();
124 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
126 properties.load(new FileInputStream(file));
127 } catch (Exception e) {
128 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
129 "ElasticSearchHTTPController.getInstance",
130 e.getLocalizedMessage());
133 ElasticSearchConfig config = new ElasticSearchConfig(properties);
134 instance = new ElasticSearchHttpController(config);
141 public ElasticSearchHttpController(ElasticSearchConfig config) {
142 this.config = config;
143 analysisConfig = new AnalysisConfiguration();
146 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
148 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
149 } catch (Exception e) {
150 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
151 getFullUrl("", false), e.getMessage());
156 public AnalysisConfiguration getAnalysisConfig() {
157 return analysisConfig;
161 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
163 OperationResult result = new OperationResult();
164 result.setResultCode(500);
168 // Submit the request to ElasticSearch to create the index using a
169 // default document type.
170 result = createTable(index,
172 analysisConfig.getEsIndexSettings(),
173 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
175 // ElasticSearch will return us a 200 code on success when we
176 // want to report a 201, so translate the result here.
177 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
178 if (isSuccess(result)) {
179 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
180 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
183 } catch (DocumentStoreOperationException | IOException e) {
185 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
192 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
193 OperationResult result = new OperationResult();
194 result.setResultCode(500);
197 result = createTable(index, dynamicSchema);
199 // ElasticSearch will return us a 200 code on success when we
200 // want to report a 201, so translate the result here.
201 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
202 if (isSuccess(result)) {
203 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
205 } catch (DocumentStoreOperationException e) {
206 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
214 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
216 //Initialize operation result with a failure codes / fault string
217 OperationResult opResult = new OperationResult();
218 opResult.setResultCode(500);
219 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
221 // Grab the current time so we can use it to generate a metrics log.
222 MdcOverride override = getStartTime(new MdcOverride());
224 String fullUrl = getFullUrl("/" + indexName + "/", false);
225 HttpURLConnection conn = initializeConnection(fullUrl);
227 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
230 conn.setRequestMethod("DELETE");
231 } catch (ProtocolException e) {
232 shutdownConnection(conn);
233 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
236 handleResponse(conn, opResult);
238 // Generate a metrics log so we can track how long the operation took.
239 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
241 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
242 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
246 shutdownConnection(conn);
252 private OperationResult checkConnection() throws Exception {
254 String fullUrl = getFullUrl("/_cluster/health", false);
256 HttpURLConnection conn = null;
258 url = new URL(fullUrl);
259 conn = (HttpURLConnection) url.openConnection();
260 conn.setRequestMethod("GET");
261 conn.setDoOutput(true);
262 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
264 int resultCode = conn.getResponseCode();
265 logger.debug("getClusterHealth() response Code : " + resultCode);
266 OperationResult opResult = new OperationResult();
267 opResult.setResultCode(resultCode);
269 shutdownConnection(conn);
274 private String getFullUrl(String resourceUrl, boolean isSecure) {
276 final String host = config.getIpAddress();
277 final String port = config.getHttpPort();
280 return String.format("https://%s:%s%s", host, port, resourceUrl);
282 return String.format("http://%s:%s%s", host, port, resourceUrl);
286 private void shutdownConnection(HttpURLConnection connection) {
287 if (connection == null) {
291 InputStream inputstream = null;
292 OutputStream outputstream = null;
295 inputstream = connection.getInputStream();
296 } catch (IOException e) {
297 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
299 if (inputstream != null) {
302 } catch (IOException e) {
303 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
304 e.getLocalizedMessage());
310 outputstream = connection.getOutputStream();
311 } catch (IOException e) {
312 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
314 if (outputstream != null) {
316 outputstream.close();
317 } catch (IOException e) {
318 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
319 e.getLocalizedMessage());
324 connection.disconnect();
328 protected OperationResult createTable(String indexName, String typeName,
329 String indexSettings, String indexMappings)
330 throws DocumentStoreOperationException {
332 if (indexSettings == null) {
333 logger.debug("No settings provided.");
336 if (indexMappings == null) {
337 logger.debug("No mappings provided.");
340 OperationResult opResult = new OperationResult();
342 // Initialize operation result with a failure codes / fault string
343 opResult.setResultCode(500);
344 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
346 // Grab the current time so we can use it to generate a metrics log.
347 MdcOverride override = getStartTime(new MdcOverride());
349 String fullUrl = getFullUrl("/" + indexName + "/", false);
350 HttpURLConnection conn = initializeConnection(fullUrl);
353 conn.setRequestMethod("PUT");
354 conn.setRequestProperty("Content-Type", "application/json");
355 } catch (ProtocolException e) {
356 shutdownConnection(conn);
357 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
360 StringBuilder sb = new StringBuilder(128);
361 sb.append("{ \"settings\" : ");
362 sb.append(indexSettings);
365 sb.append("\"mappings\" : {");
366 sb.append("\"" + typeName + "\" :");
367 sb.append(indexMappings);
371 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
372 } catch(IOException e) {
373 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
374 throw new DocumentStoreOperationException(e.getMessage(), e);
377 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
378 logger.debug("Request content: " + sb.toString());
380 handleResponse(conn, opResult);
382 shutdownConnection(conn);
384 // Generate a metrics log so we can track how long the operation took.
385 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
387 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
388 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
396 * Will send the passed in JSON payload to Elasticsearch using the
397 * provided index name in an attempt to create the index.
399 * @param indexName - The name of the index to be created
400 * @param settingsAndMappings - The actual JSON object that will define the index
401 * @return - The operation result of writing into Elasticsearch
402 * @throws DocumentStoreOperationException
404 protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
405 OperationResult result = new OperationResult();
406 result.setResultCode(500);
407 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
409 // Grab the current time so we can use it to generate a metrics log.
410 MdcOverride override = getStartTime(new MdcOverride());
412 String fullUrl = getFullUrl("/" + indexName + "/", false);
413 HttpURLConnection conn = initializeConnection(fullUrl);
416 conn.setRequestMethod("PUT");
417 conn.setRequestProperty("Content-Type", "application/json");
418 } catch (ProtocolException e) {
419 shutdownConnection(conn);
420 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
424 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
425 } catch(IOException e) {
426 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
427 throw new DocumentStoreOperationException(e.getMessage());
429 handleResponse(conn, result);
431 // Generate a metrics log so we can track how long the operation took.
432 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
434 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
435 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
443 public DocumentOperationResult createDocument(String indexName,
444 DocumentStoreDataEntity document,
445 boolean allowImplicitIndexCreation)
446 throws DocumentStoreOperationException {
448 if(!allowImplicitIndexCreation) {
450 // Before we do anything, make sure that the specified index actually exists in the
451 // document store - we don't want to rely on ElasticSearch to fail the document
452 // create because it could be configured to implicitly create a non-existent index,
453 // which can lead to hard-to-debug behaviour with queries down the road.
454 OperationResult indexExistsResult = checkIndexExistence(indexName);
455 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
457 DocumentOperationResult opResult = new DocumentOperationResult();
458 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
459 opResult.setResult("Document Index '" + indexName + "' does not exist.");
460 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
465 if (document.getId() == null || document.getId().isEmpty()) {
466 return createDocumentWithoutId(indexName, document);
468 return createDocumentWithId(indexName, document);
472 private DocumentOperationResult createDocumentWithId(String indexName,
473 DocumentStoreDataEntity document)
474 throws DocumentStoreOperationException {
475 // check if the document already exists
476 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
479 if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
480 if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
481 opResult.setFailureCause("A document with the same id already exists.");
483 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
485 opResult.setResultCode(HttpStatus.CONFLICT.value());
489 opResult = new DocumentOperationResult();
490 // Initialize operation result with a failure codes / fault string
491 opResult.setResultCode(500);
492 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
494 // Grab the current time so we can use it to generate a metrics log.
495 MdcOverride override = getStartTime(new MdcOverride());
497 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
498 + "/" + document.getId(), false);
499 HttpURLConnection conn = initializeConnection(fullUrl);
502 conn.setRequestMethod("PUT");
503 } catch (ProtocolException e) {
504 shutdownConnection(conn);
505 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
508 attachDocument(conn, document);
510 logger.debug("Sending 'PUT' request to: " + conn.getURL());
512 handleResponse(conn, opResult);
513 buildDocumentResult(opResult, indexName);
515 // Generate a metrics log so we can track how long the operation took.
516 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
518 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
519 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
523 shutdownConnection(conn);
529 private DocumentOperationResult createDocumentWithoutId(String indexName,
530 DocumentStoreDataEntity document)
531 throws DocumentStoreOperationException {
533 DocumentOperationResult response = new DocumentOperationResult();
534 // Initialize operation result with a failure codes / fault string
535 response.setResultCode(500);
536 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
538 // Grab the current time so we can use it to generate a metrics log.
539 MdcOverride override = getStartTime(new MdcOverride());
541 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
542 HttpURLConnection conn = initializeConnection(fullUrl);
545 conn.setRequestMethod("POST");
546 } catch (ProtocolException e) {
547 shutdownConnection(conn);
548 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
551 attachDocument(conn, document);
553 logger.debug("Sending 'POST' request to: " + conn.getURL());
555 handleResponse(conn, response);
556 buildDocumentResult(response, indexName);
558 // Generate a metrics log so we can track how long the operation took.
559 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
561 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
562 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
566 shutdownConnection(conn);
571 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
572 throws DocumentStoreOperationException {
573 // conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
574 conn.setRequestProperty("Content-Type", "application/json");
575 conn.setRequestProperty("Connection", "Close");
576 attachContent(conn, doc.getContentInJson());
579 private DocumentOperationResult checkDocumentExistence(String indexName,
581 throws DocumentStoreOperationException {
582 DocumentOperationResult opResult = new DocumentOperationResult();
584 // Initialize operation result with a failure codes / fault string
585 opResult.setResultCode(500);
587 // Grab the current time so we can use it to generate a metrics log.
588 MdcOverride override = getStartTime(new MdcOverride());
590 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
591 HttpURLConnection conn = initializeConnection(fullUrl);
594 conn.setRequestMethod("HEAD");
595 } catch (ProtocolException e) {
596 shutdownConnection(conn);
597 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
600 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
604 resultCode = conn.getResponseCode();
605 } catch (IOException e) {
606 shutdownConnection(conn);
607 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
610 logger.debug("Response Code : " + resultCode);
612 opResult.setResultCode(resultCode);
614 // Generate a metrics log so we can track how long the operation took.
615 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
617 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
618 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
623 shutdownConnection(conn);
629 public DocumentOperationResult updateDocument(String indexName,
630 DocumentStoreDataEntity document,
631 boolean allowImplicitIndexCreation)
632 throws DocumentStoreOperationException {
634 if(!allowImplicitIndexCreation) {
636 // Before we do anything, make sure that the specified index actually exists in the
637 // document store - we don't want to rely on ElasticSearch to fail the document
638 // create because it could be configured to implicitly create a non-existent index,
639 // which can lead to hard-to-debug behaviour with queries down the road.
640 OperationResult indexExistsResult = checkIndexExistence(indexName);
641 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
643 DocumentOperationResult opResult = new DocumentOperationResult();
644 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
645 opResult.setResult("Document Index '" + indexName + "' does not exist.");
646 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
651 DocumentOperationResult opResult = new DocumentOperationResult();
653 // Initialize operation result with a failure codes / fault string
654 opResult.setResultCode(500);
655 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
657 // Grab the current time so we can use it to generate a metrics log.
658 MdcOverride override = getStartTime(new MdcOverride());
660 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
661 + "?version=" + document.getVersion(), false);
662 HttpURLConnection conn = initializeConnection(fullUrl);
665 conn.setRequestMethod("PUT");
666 conn.setRequestProperty("Content-Type", "application/json");
667 } catch (ProtocolException e) {
668 shutdownConnection(conn);
669 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
672 attachDocument(conn, document);
674 logger.debug("Sending 'PUT' request to: " + conn.getURL());
676 handleResponse(conn, opResult);
677 buildDocumentResult(opResult, indexName);
679 // Generate a metrics log so we can track how long the operation took.
680 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
682 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
683 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
688 shutdownConnection(conn);
694 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
695 throws DocumentStoreOperationException {
696 DocumentOperationResult opResult = new DocumentOperationResult();
698 // Initialize operation result with a failure codes / fault string
699 opResult.setResultCode(500);
700 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
702 // Grab the current time so we can use it to generate a metrics log.
703 MdcOverride override = getStartTime(new MdcOverride());
705 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
706 + "?version=" + document.getVersion(), false);
707 HttpURLConnection conn = initializeConnection(fullUrl);
710 conn.setRequestMethod("DELETE");
711 } catch (ProtocolException e) {
712 shutdownConnection(conn);
713 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
716 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
718 handleResponse(conn, opResult);
719 buildDocumentResult(opResult, indexName);
720 //supress the etag and url in response for delete as they are not required
721 if (opResult.getDocument() != null) {
722 opResult.getDocument().setEtag(null);
723 opResult.getDocument().setUrl(null);
726 // Generate a metrics log so we can track how long the operation took.
727 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
729 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
730 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
735 shutdownConnection(conn);
741 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
742 throws DocumentStoreOperationException {
743 DocumentOperationResult opResult = new DocumentOperationResult();
745 // Initialize operation result with a failure codes / fault string
746 opResult.setResultCode(500);
747 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
749 // Grab the current time so we can use it to generate a metrics log.
750 MdcOverride override = getStartTime(new MdcOverride());
752 String fullUrl = null;
753 if (document.getVersion() == null) {
754 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
756 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
757 + "?version=" + document.getVersion(), false);
759 HttpURLConnection conn = initializeConnection(fullUrl);
761 logger.debug("\nSending 'GET' request to: " + conn.getURL());
763 handleResponse(conn, opResult);
764 buildDocumentResult(opResult, indexName);
766 // Generate a metrics log so we can track how long the operation took.
767 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
769 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
770 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
775 shutdownConnection(conn);
780 public SearchOperationResult search(String indexName, String queryString)
781 throws DocumentStoreOperationException {
782 SearchOperationResult opResult = new SearchOperationResult();
784 // Initialize operation result with a failure codes / fault string
785 opResult.setResultCode(500);
786 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
788 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
790 // Grab the current time so we can use it to generate a metrics log.
791 MdcOverride override = getStartTime(new MdcOverride());
793 HttpURLConnection conn = initializeConnection(fullUrl);
796 conn.setRequestMethod("GET");
797 } catch (ProtocolException e) {
798 shutdownConnection(conn);
799 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
802 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
804 handleResponse(conn, opResult);
805 buildSearchResult(opResult, indexName);
808 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
810 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
811 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
819 public SearchOperationResult searchWithPayload(String indexName, String query)
820 throws DocumentStoreOperationException {
821 SearchOperationResult opResult = new SearchOperationResult();
823 if (logger.isDebugEnabled()) {
824 logger.debug("Querying index: " + indexName + " with query string: " + query);
827 // Initialize operation result with a failure codes / fault string
828 opResult.setResultCode(500);
829 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
831 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
833 // Grab the current time so we can use it to generate a metrics log.
834 MdcOverride override = getStartTime(new MdcOverride());
836 HttpURLConnection conn = initializeConnection(fullUrl);
839 conn.setRequestMethod("POST");
840 conn.setRequestProperty("Content-Type", "application/json");
841 } catch (ProtocolException e) {
842 shutdownConnection(conn);
843 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
846 attachContent(conn, query);
848 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
849 logger.debug("Request body = Elasticsearch query = " + query);
851 handleResponse(conn, opResult);
852 buildSearchResult(opResult, indexName);
854 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
856 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
857 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
862 shutdownConnection(conn);
868 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
869 throws DocumentStoreOperationException {
871 SearchOperationResult opResult = new SearchOperationResult();
873 if (logger.isDebugEnabled()) {
874 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
877 // Initialize operation result with a failure codes / fault string
878 opResult.setResultCode(500);
879 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
881 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
883 // Grab the current time so we can use it to generate a metrics log.
884 MdcOverride override = getStartTime(new MdcOverride());
886 HttpURLConnection conn = initializeConnection(fullUrl);
889 conn.setRequestMethod("POST");
890 conn.setRequestProperty("Content-Type", "application/json");
891 } catch (ProtocolException e) {
892 shutdownConnection(conn);
893 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
896 attachContent(conn, query);
898 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
899 logger.debug("Request body = Elasticsearch query = " + query);
901 handleResponse(conn, opResult);
902 buildSuggestResult(opResult, indexName);
904 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
905 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
906 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
907 override, indexName, query);
909 shutdownConnection(conn);
914 private void attachContent(HttpURLConnection conn, String content)
915 throws DocumentStoreOperationException {
916 OutputStream outputStream = null;
917 OutputStreamWriter out = null;
920 outputStream = conn.getOutputStream();
921 } catch (IOException e) {
922 shutdownConnection(conn);
923 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
926 out = new OutputStreamWriter(outputStream);
931 } catch (IOException e) {
932 shutdownConnection(conn);
933 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
937 private HttpURLConnection initializeConnection(String fullUrl)
938 throws DocumentStoreOperationException {
940 HttpURLConnection conn = null;
943 url = new URL(fullUrl);
944 } catch (MalformedURLException e) {
945 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
949 conn = (HttpURLConnection) url.openConnection();
950 conn.setDoOutput(true);
951 } catch (IOException e) {
952 shutdownConnection(conn);
953 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
959 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
960 throws DocumentStoreOperationException {
961 int resultCode = 200;
964 resultCode = conn.getResponseCode();
965 } catch (IOException e) {
966 shutdownConnection(conn);
967 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
970 logger.debug("Response Code : " + resultCode);
972 InputStream inputStream = null;
974 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
975 inputStream = conn.getErrorStream();
978 inputStream = conn.getInputStream();
979 } catch (IOException e) {
980 shutdownConnection(conn);
981 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
985 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
986 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
988 StringBuilder result = new StringBuilder(128);
989 String string = null;
992 while ((string = bufferedreader.readLine()) != null) {
993 result.append(string).append("\n");
995 } catch (IOException e) {
996 shutdownConnection(conn);
997 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
1000 if (resultCode == HttpStatus.CONFLICT.value()) {
1001 opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
1003 opResult.setResultCode(resultCode);
1005 if (logger.isDebugEnabled()) {
1006 logger.debug("Raw result string from ElasticSearch = " + result.toString());
1008 opResult.setResult(result.toString());
1009 opResult.setResultVersion(extractVersion(result.toString()));
1012 private String extractVersion(String result) throws DocumentStoreOperationException {
1014 JSONParser parser = new JSONParser();
1015 String version = null;
1017 JSONObject root = (JSONObject) parser.parse(result);
1018 if (root.get("_version") != null) {
1019 version = root.get("_version").toString();
1022 } catch (ParseException e) {
1024 // Not all responses from ElasticSearch include a version, so
1025 // if we don't get one back, just return an empty string rather
1026 // than trigger a false failure.
1033 * This convenience method gets the current system time and stores
1034 * it in an attribute in the supplied {@link MdcOverride} object so
1035 * that it can be used later by the metrics logger.
1037 * @param override - The {@link MdcOverride} object to update.
1038 * @return - The supplied {@link MdcOverride} object.
1040 private MdcOverride getStartTime(MdcOverride override) {
1042 // Grab the current time...
1043 long startTimeInMs = System.currentTimeMillis();
1045 // ...and add it as an attribute to the supplied MDC Override
1047 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1048 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1050 // Return the MdcOverride object that we were passed.
1051 // This looks odd, but it allows us to do stuff like:
1053 // MdcOverride ov = getStartTime(new MdcOverride())
1055 // which is quite handy, but also allows us to pass in an existing
1056 // MdcOverride object which already has some attributes set.
1060 private boolean isSuccess(OperationResult result) {
1062 return isSuccessCode(result.getResultCode());
1066 private boolean isSuccessCode(int statusCode) {
1067 return ((statusCode >= 200) && (statusCode < 300));
1072 public OperationResult performBulkOperations(BulkRequest[] requests)
1073 throws DocumentStoreOperationException {
1075 if (logger.isDebugEnabled()) {
1076 String dbgString = "ESController: performBulkOperations - Operations: ";
1078 for (BulkRequest request : requests) {
1079 dbgString += "[" + request.toString() + "] ";
1082 logger.debug(dbgString);
1085 // Grab the current time so we can use it to generate a metrics log.
1086 MdcOverride override = getStartTime(new MdcOverride());
1088 // Parse the supplied set of operations.
1089 // Iterate over the list of operations which we were provided and
1090 // translate them into a format that ElasticSearh understands.
1092 StringBuilder esOperationSet = new StringBuilder(128);
1093 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1094 for (BulkRequest request : requests) {
1096 // Convert the request to the syntax ElasticSearch likes.
1097 if (buildEsOperation(request, esOperationSet, rejected)) {
1102 ElasticSearchBulkOperationResult opResult = null;
1105 // Open an HTTP connection to the ElasticSearch back end.
1106 String fullUrl = getFullUrl("/_bulk", false);
1108 HttpURLConnection conn;
1111 url = new URL(fullUrl);
1112 conn = (HttpURLConnection) url.openConnection();
1113 conn.setRequestMethod("PUT");
1114 conn.setDoOutput(true);
1115 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1116 conn.setRequestProperty("Connection", "Close");
1118 } catch (IOException e) {
1120 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1121 if (logger.isDebugEnabled()) {
1122 logger.debug(Throwables.getStackTraceAsString(e));
1125 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
1126 + e.getMessage(), e);
1129 StringBuilder bulkResult = new StringBuilder(128);
1131 // Create an output stream to write our request to.
1132 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1135 if (logger.isDebugEnabled()) {
1136 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1137 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1141 // Write the resulting request string to our output stream. (this sends the request to ES?)
1142 out.write(esOperationSet.toString());
1145 // Open an input stream on our connection in order to read back the results.
1146 InputStream is = conn.getInputStream();
1147 InputStreamReader inputstreamreader = new InputStreamReader(is);
1148 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1150 // Read the contents of the input stream into our result string...
1151 String esResponseString = null;
1153 while ((esResponseString = bufferedreader.readLine()) != null) {
1154 bulkResult.append(esResponseString).append("\n");
1157 } catch (IOException e) {
1159 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1160 if (logger.isDebugEnabled()) {
1161 StringWriter sw = new StringWriter();
1162 e.printStackTrace(new PrintWriter(sw));
1163 logger.debug(sw.toString());
1166 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1167 + e.getMessage(), e);
1170 if (logger.isDebugEnabled()) {
1171 logger.debug("ESController: Received result string from ElasticSearch: = "
1172 + bulkResult.toString());
1175 // ...and marshal the resulting string into a Java object.
1177 opResult = marshallEsBulkResult(bulkResult.toString());
1179 } catch (IOException e) {
1181 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1182 if (logger.isDebugEnabled()) {
1183 logger.debug(Throwables.getStackTraceAsString(e));
1186 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1187 + e.getMessage(), e);
1191 // Finally, build the operation result and return it to the caller.
1192 OperationResult result = new OperationResult();
1193 result.setResultCode(207);
1194 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1196 // In the success case we don't want the entire result string to be
1197 // dumped into the metrics log, so concatenate it.
1198 String resultStringForMetricsLog = result.getResult();
1199 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1200 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1201 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1204 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1206 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1207 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1215 * This method converts a {@link BulkRequest} object into a json structure
1216 * which can be understood by ElasticSearch.
1218 * @param request - The request to be performed.
1219 * @param sb - The string builder to append the json data to
1220 * @throws DocumentStoreOperationException
1222 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1223 List<ElasticSearchResultItem> fails)
1224 throws DocumentStoreOperationException {
1226 boolean retVal = true;
1227 OperationResult indexExistsResult = null;
1229 // What kind of operation are we performing?
1230 switch (request.getOperationType()) {
1232 // Create a new document.
1235 // Make sure that we were supplied a document payload.
1236 if (request.getOperation().getDocument() == null) {
1238 fails.add(generateRejectionEntry(request.getOperationType(),
1239 "Missing document payload",
1243 request.getOperation().getMetaData().getUrl()));
1247 // Make sure that the supplied document URL is formatted
1249 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1250 fails.add(generateRejectionEntry(request.getOperationType(),
1251 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1255 request.getOperation().getMetaData().getUrl()));
1259 // Validate that the specified index actually exists before we
1260 // try to perform the create.
1261 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1263 fails.add(generateRejectionEntry(request.getOperationType(),
1264 "Specified resource does not exist: "
1265 + request.getOperation().getMetaData().getUrl(),
1269 request.getOperation().getMetaData().getUrl()));
1273 // If we were supplied an id for the new document, then
1274 // include it in the bulk operation to Elastic Search
1275 if (request.getId() == null) {
1277 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1281 // Otherwise, we just leave that parameter off and ElasticSearch
1282 // will generate one for us.
1284 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1291 // Append the document that we want to create.
1292 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1293 } catch (JsonProcessingException e) {
1294 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1299 // Update an existing document.
1302 // Make sure that we were supplied a document payload.
1303 if (request.getOperation().getDocument() == null) {
1305 fails.add(generateRejectionEntry(request.getOperationType(),
1306 "Missing document payload",
1310 request.getOperation().getMetaData().getUrl()));
1314 // Make sure that the supplied document URL is formatted
1316 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1317 fails.add(generateRejectionEntry(request.getOperationType(),
1318 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1322 request.getOperation().getMetaData().getUrl()));
1326 // Validate that the specified index actually exists before we
1327 // try to perform the update.
1328 if (!indexExists(request.getIndex())) {
1330 fails.add(generateRejectionEntry(request.getOperationType(),
1331 "Specified resource does not exist: "
1332 + request.getOperation().getMetaData().getUrl(),
1336 request.getOperation().getMetaData().getUrl()));
1340 // Validate that the document we are trying to update actually
1341 // exists before we try to perform the update.
1342 if (!documentExists(request.getIndex(), request.getId())) {
1344 fails.add(generateRejectionEntry(request.getOperationType(),
1345 "Specified resource does not exist: "
1346 + request.getOperation().getMetaData().getUrl(),
1350 request.getOperation().getMetaData().getUrl()));
1354 // It is mandatory that a version be supplied for an update operation,
1355 // so validate that now.
1356 if (request.getOperation().getMetaData().getEtag() == null) {
1358 fails.add(generateRejectionEntry(request.getOperationType(),
1359 "Missing mandatory ETag field",
1363 request.getOperation().getMetaData().getUrl()));
1367 // Generate the update request...
1368 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1372 request.getOperation().getMetaData().getEtag()));
1374 // ...and append the document that we want to update.
1376 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1377 } catch (JsonProcessingException e) {
1378 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1382 // Delete an existing document.
1385 // Make sure that the supplied document URL is formatted
1387 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1388 fails.add(generateRejectionEntry(request.getOperationType(),
1389 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1393 request.getOperation().getMetaData().getUrl()));
1397 // Validate that the specified index actually exists before we
1398 // try to perform the delete.
1399 if (!indexExists(request.getIndex())) {
1401 fails.add(generateRejectionEntry(request.getOperationType(),
1402 "Specified resource does not exist: "
1403 + request.getOperation().getMetaData().getUrl(),
1407 request.getOperation().getMetaData().getUrl()));
1411 // Validate that the document we are trying to update actually
1412 // exists before we try to perform the delete.
1413 if (!documentExists(request.getIndex(), request.getId())) {
1415 fails.add(generateRejectionEntry(request.getOperationType(),
1416 "Specified resource does not exist: "
1417 + request.getOperation().getMetaData().getUrl(),
1421 request.getOperation().getMetaData().getUrl()));
1425 // It is mandatory that a version be supplied for a delete operation,
1426 // so validate that now.
1427 if (request.getOperation().getMetaData().getEtag() == null) {
1429 fails.add(generateRejectionEntry(request.getOperationType(),
1430 "Missing mandatory ETag field",
1434 request.getOperation().getMetaData().getUrl()));
1438 // Generate the delete request.
1439 sb.append(String.format(BULK_DELETE_TEMPLATE,
1443 request.getOperation().getMetaData().getEtag()));
1451 private boolean indexExists(String index) throws DocumentStoreOperationException {
1453 OperationResult indexExistsResult = checkIndexExistence(index);
1455 return ((indexExistsResult.getResultCode() >= 200)
1456 && (indexExistsResult.getResultCode() < 300));
1459 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1461 OperationResult docExistsResult = checkDocumentExistence(index, id);
1463 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1467 * This method constructs a status entry for a bulk operation which has
1468 * been rejected before even sending it to the document store.
1470 * @param rejectReason - A message describing why the operation was rejected.
1471 * @param anId - The identifier associated with the document being
1473 * @param statusCode - An HTTP status code.
1474 * @return - A result set item.
1476 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1477 String rejectReason,
1481 String originalUrl) {
1483 ElasticSearchError err = new ElasticSearchError();
1484 err.setReason(rejectReason);
1486 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1489 op.setStatus(statusCode);
1491 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1493 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1497 rejectionResult.setCreate(op);
1500 rejectionResult.setIndex(op);
1503 rejectionResult.setDelete(op);
1508 return rejectionResult;
1513 * This method takes the json structure returned from ElasticSearch in
1514 * response to a bulk operations request and marshals it into a Java
1517 * @param jsonResult - The bulk operations response returned from
1519 * @return - The marshalled response.
1520 * @throws JsonParseException
1521 * @throws JsonMappingException
1522 * @throws IOException
1524 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1525 throws JsonParseException, JsonMappingException, IOException {
1527 if (jsonResult != null) {
1528 if (logger.isDebugEnabled()) {
1529 logger.debug("ESController: Marshalling ES result set from json: "
1530 + jsonResult.replaceAll("\n", ""));
1533 ObjectMapper mapper = new ObjectMapper();
1534 mapper.setSerializationInclusion(Include.NON_EMPTY);
1536 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1544 * This method takes the marshalled ElasticSearch bulk response and
1545 * converts it into a generic response payload.
1547 * @param esResult - ElasticSearch bulk operations response.
1548 * @return - A generic result set.
1550 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1551 List<ElasticSearchResultItem> rejectedOps) {
1554 int totalSuccess = 0;
1557 if (logger.isDebugEnabled()) {
1559 logger.debug("ESController: Build generic result set. ES Results: "
1560 + ((esResult != null) ? esResult.toString() : "[]")
1561 + " Rejected Ops: " + rejectedOps.toString());
1564 // Build a combined list of result items from the results returned
1565 // from ElasticSearch and the list of operations that we rejected
1566 // without sending to ElasticSearch.
1567 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1568 if (esResult != null) {
1569 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1571 combinedResults.addAll(rejectedOps);
1573 // Iterate over the individual results in the resulting result set.
1574 StringBuilder resultsBuilder = new StringBuilder();
1575 AtomicBoolean firstItem = new AtomicBoolean(true);
1576 for (ElasticSearchResultItem item : combinedResults) {
1578 // Increment the operation counts.
1580 if (isSuccessCode(item.operationStatus().getStatus())) {
1586 // Prepend a comma to our response string unless this it the
1587 // first result in the set.
1588 if (!firstItem.compareAndSet(true, false)) {
1589 resultsBuilder.append(", ");
1592 // Append the current result as a generic json structure.
1593 resultsBuilder.append(item.toJson());
1596 // Now, build the result string and return it.
1597 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1598 + "\"total_success\": " + totalSuccess + ", "
1599 + "\"total_fails\": " + totalFails + ", "
1601 + resultsBuilder.toString()
1604 return responseBody;
1609 * This method queryies ElasticSearch to determine if the supplied
1610 * index is present in the document store.
1612 * @param indexName - The index to look for.
1613 * @return - An operation result indicating the success or failure of
1615 * @throws DocumentStoreOperationException
1617 public OperationResult checkIndexExistence(String indexName)
1618 throws DocumentStoreOperationException {
1620 // Initialize operation result with a failure codes / fault string
1621 OperationResult opResult = new OperationResult();
1622 opResult.setResultCode(500);
1624 // Grab the current time so we can use it to generate a metrics log.
1625 MdcOverride override = getStartTime(new MdcOverride());
1627 String fullUrl = getFullUrl("/" + indexName, false);
1628 HttpURLConnection conn = initializeConnection(fullUrl);
1631 conn.setRequestMethod("HEAD");
1633 } catch (ProtocolException e) {
1634 shutdownConnection(conn);
1635 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1638 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1642 resultCode = conn.getResponseCode();
1643 } catch (IOException e) {
1644 shutdownConnection(conn);
1645 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1647 logger.debug("Response Code : " + resultCode);
1649 opResult.setResultCode(resultCode);
1651 // Generate a metrics log so we can track how long the operation took.
1652 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1654 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1655 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1659 shutdownConnection(conn);
1665 private void buildDocumentResult(DocumentOperationResult result, String index)
1666 throws DocumentStoreOperationException {
1668 JSONParser parser = new JSONParser();
1671 root = (JSONObject) parser.parse(result.getResult());
1673 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1674 // Success response object
1675 Document doc = new Document();
1676 doc.setEtag(result.getResultVersion());
1677 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1679 doc.setContent((JSONObject) root.get("_source"));
1680 result.setDocument(doc);
1683 // Error response object
1684 JSONObject error = (JSONObject) root.get("error");
1685 if (error != null) {
1686 result.setError(new ErrorResult(error.get("type").toString(),
1687 error.get("reason").toString()));
1691 } catch (Exception e) {
1692 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1693 + result.getResult());
1699 private String buildDocumentResponseUrl(String index, String id) {
1700 return ApiUtils.buildDocumentUri(index, id);
1703 private void buildSearchResult(SearchOperationResult result, String index)
1704 throws DocumentStoreOperationException {
1706 JSONParser parser = new JSONParser();
1710 root = (JSONObject) parser.parse(result.getResult());
1711 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1712 JSONObject hits = (JSONObject) root.get("hits");
1713 JSONArray hitArray = (JSONArray) hits.get("hits");
1714 SearchHits searchHits = new SearchHits();
1715 searchHits.setTotalHits(hits.get("total").toString());
1716 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1718 for (int i = 0; i < hitArray.size(); i++) {
1719 JSONObject hit = (JSONObject) hitArray.get(i);
1720 SearchHit searchHit = new SearchHit();
1721 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1722 Document doc = new Document();
1723 if (hit.get("_version") != null) {
1724 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1727 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1728 ? hit.get("_id").toString() : ""));
1729 doc.setContent((JSONObject) hit.get("_source"));
1730 searchHit.setDocument(doc);
1731 searchHitArray.add(searchHit);
1733 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1734 result.setSearchResult(searchHits);
1736 JSONObject aggregations = (JSONObject) root.get("aggregations");
1737 if (aggregations != null) {
1738 AggregationResult[] aggResults =
1739 AggregationParsingUtil.parseAggregationResults(aggregations);
1740 AggregationResults aggs = new AggregationResults();
1741 aggs.setAggregations(aggResults);
1742 result.setAggregationResult(aggs);
1747 JSONObject error = (JSONObject) root.get("error");
1748 if (error != null) {
1749 result.setError(new ErrorResult(error.get("type").toString(),
1750 error.get("reason").toString()));
1753 } catch (Exception e) {
1754 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1755 + result.getResult());
1760 private void buildSuggestResult(SearchOperationResult result, String index)
1761 throws DocumentStoreOperationException {
1763 JSONParser parser = new JSONParser ();
1766 root = (JSONObject) parser.parse ( result.getResult () );
1767 if (result.getResultCode () >= 200 && result.getResultCode () <= 299) {
1768 JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" );
1769 JSONObject hitdata = (JSONObject) hitArray.get ( 0 );
1770 JSONArray optionsArray = (JSONArray) hitdata.get ( "options" );
1771 SuggestHits suggestHits = new SuggestHits ();
1772 suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) );
1774 ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> ();
1776 for (int i = 0; i < optionsArray.size (); i++) {
1777 JSONObject hit = (JSONObject) optionsArray.get ( i );
1779 SuggestHit suggestHit = new SuggestHit ();
1780 suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" );
1781 suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" );
1782 Document doc = new Document ();
1783 if (hit.get ( "_version" ) != null) {
1784 doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" );
1786 doc.setUrl ( buildDocumentResponseUrl ( index,
1787 (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) );
1789 doc.setContent ( (JSONObject) hit.get ( "payload" ) );
1790 suggestHit.setDocument ( doc );
1791 suggestHitArray.add ( suggestHit );
1793 suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) );
1794 result.setSuggestResult ( suggestHits );
1796 JSONObject aggregations = (JSONObject) root.get ( "aggregations" );
1797 if (aggregations != null) {
1798 AggregationResult[] aggResults =
1799 AggregationParsingUtil.parseAggregationResults ( aggregations );
1800 AggregationResults aggs = new AggregationResults ();
1801 aggs.setAggregations ( aggResults );
1802 result.setAggregationResult ( aggs );
1807 JSONObject error = (JSONObject) root.get ( "error" );
1808 if (error != null) {
1810 new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) );
1813 } catch (Exception e) {
1814 throw new DocumentStoreOperationException (
1815 "Failed to parse Elastic Search response." + result.getResult () );