2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23 import com.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import org.json.simple.JSONArray;
30 import org.json.simple.JSONObject;
31 import org.json.simple.parser.JSONParser;
32 import org.json.simple.parser.ParseException;
33 import org.onap.aai.sa.rest.AnalysisConfiguration;
34 import org.onap.aai.sa.rest.ApiUtils;
35 import org.onap.aai.sa.rest.BulkRequest;
36 import org.onap.aai.sa.rest.BulkRequest.OperationType;
37 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
39 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
41 import org.onap.aai.sa.searchdbabstraction.entity.Document;
42 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
48 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
49 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
50 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
52 import org.onap.aai.cl.api.LogFields;
53 import org.onap.aai.cl.api.LogLine;
54 import org.onap.aai.cl.api.Logger;
55 import org.onap.aai.cl.eelf.LoggerFactory;
56 import org.onap.aai.cl.mdc.MdcContext;
57 import org.onap.aai.cl.mdc.MdcOverride;
58 import org.onap.aai.sa.rest.DocumentSchema;
59 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
60 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
62 import java.io.BufferedReader;
64 import java.io.FileInputStream;
65 import java.io.IOException;
66 import java.io.InputStream;
67 import java.io.InputStreamReader;
68 import java.io.OutputStream;
69 import java.io.OutputStreamWriter;
70 import java.io.PrintWriter;
71 import java.io.StringWriter;
72 import java.net.HttpURLConnection;
73 import java.net.MalformedURLException;
74 import java.net.ProtocolException;
76 import java.text.SimpleDateFormat;
77 import java.util.ArrayList;
78 import java.util.Arrays;
79 import java.util.List;
80 import java.util.Properties;
81 import java.util.concurrent.atomic.AtomicBoolean;
83 import org.springframework.http.HttpStatus;
88 * This class has the Elasticsearch implementation of the
89 * DB operations defined in DocumentStoreInterface.
91 public class ElasticSearchHttpController implements DocumentStoreInterface {
93 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
94 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
95 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
96 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
97 private static final String BULK_IMPORT_INDEX_TEMPLATE =
98 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
99 private static final String BULK_DELETE_TEMPLATE =
100 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
102 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
103 "Internal Error: ElasticSearch operation fault occurred";
104 private static final Logger logger = LoggerFactory.getInstance()
105 .getLogger(ElasticSearchHttpController.class.getName());
106 private static final Logger metricsLogger = LoggerFactory.getInstance()
107 .getMetricsLogger(ElasticSearchHttpController.class.getName());
108 private final ElasticSearchConfig config;
110 private static final String DEFAULT_TYPE = "default";
112 private static ElasticSearchHttpController instance = null;
114 protected AnalysisConfiguration analysisConfig;
116 public static ElasticSearchHttpController getInstance() {
118 synchronized (ElasticSearchHttpController.class) {
120 if (instance == null) {
122 Properties properties = new Properties();
123 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
125 properties.load(new FileInputStream(file));
126 } catch (Exception e) {
127 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
128 "ElasticSearchHTTPController.getInstance",
129 e.getLocalizedMessage());
132 ElasticSearchConfig config = new ElasticSearchConfig(properties);
133 instance = new ElasticSearchHttpController(config);
140 public ElasticSearchHttpController(ElasticSearchConfig config) {
141 this.config = config;
142 analysisConfig = new AnalysisConfiguration();
145 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
147 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
148 } catch (Exception e) {
149 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
150 getFullUrl("", false), e.getMessage());
155 public AnalysisConfiguration getAnalysisConfig() {
156 return analysisConfig;
160 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
162 OperationResult result = new OperationResult();
163 result.setResultCode(500);
167 // Submit the request to ElasticSearch to create the index using a
168 // default document type.
169 result = createTable(index,
171 analysisConfig.getEsIndexSettings(),
172 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
174 // ElasticSearch will return us a 200 code on success when we
175 // want to report a 201, so translate the result here.
176 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
177 if (isSuccess(result)) {
178 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
179 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
182 } catch (DocumentStoreOperationException e) {
184 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
191 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
192 OperationResult result = new OperationResult();
193 result.setResultCode(500);
196 result = createTable(index, dynamicSchema);
198 // ElasticSearch will return us a 200 code on success when we
199 // want to report a 201, so translate the result here.
200 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
201 if (isSuccess(result)) {
202 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
204 } catch (DocumentStoreOperationException e) {
205 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
213 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
215 //Initialize operation result with a failure codes / fault string
216 OperationResult opResult = new OperationResult();
217 opResult.setResultCode(500);
218 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
220 // Grab the current time so we can use it to generate a metrics log.
221 MdcOverride override = getStartTime(new MdcOverride());
223 String fullUrl = getFullUrl("/" + indexName + "/", false);
224 HttpURLConnection conn = initializeConnection(fullUrl);
226 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
229 conn.setRequestMethod("DELETE");
230 } catch (ProtocolException e) {
231 shutdownConnection(conn);
232 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
235 handleResponse(conn, opResult);
237 // Generate a metrics log so we can track how long the operation took.
238 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
240 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
241 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
245 shutdownConnection(conn);
251 private OperationResult checkConnection() throws Exception {
253 String fullUrl = getFullUrl("/_cluster/health", false);
255 HttpURLConnection conn = null;
257 url = new URL(fullUrl);
258 conn = (HttpURLConnection) url.openConnection();
259 conn.setRequestMethod("GET");
260 conn.setDoOutput(true);
261 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
263 int resultCode = conn.getResponseCode();
264 logger.debug("getClusterHealth() response Code : " + resultCode);
265 OperationResult opResult = new OperationResult();
266 opResult.setResultCode(resultCode);
268 shutdownConnection(conn);
273 private String getFullUrl(String resourceUrl, boolean isSecure) {
275 final String host = config.getIpAddress();
276 final String port = config.getHttpPort();
279 return String.format("https://%s:%s%s", host, port, resourceUrl);
281 return String.format("http://%s:%s%s", host, port, resourceUrl);
285 private void shutdownConnection(HttpURLConnection connection) {
286 if (connection == null) {
290 InputStream inputstream = null;
291 OutputStream outputstream = null;
294 inputstream = connection.getInputStream();
295 } catch (IOException e) {
296 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
298 if (inputstream != null) {
301 } catch (IOException e) {
302 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
303 e.getLocalizedMessage());
309 outputstream = connection.getOutputStream();
310 } catch (IOException e) {
311 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
313 if (outputstream != null) {
315 outputstream.close();
316 } catch (IOException e) {
317 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
318 e.getLocalizedMessage());
323 connection.disconnect();
327 protected OperationResult createTable(String indexName, String typeName,
328 String indexSettings, String indexMappings)
329 throws DocumentStoreOperationException {
331 if (indexSettings == null) {
332 logger.debug("No settings provided.");
335 if (indexMappings == null) {
336 logger.debug("No mappings provided.");
339 OperationResult opResult = new OperationResult();
341 // Initialize operation result with a failure codes / fault string
342 opResult.setResultCode(500);
343 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
345 // Grab the current time so we can use it to generate a metrics log.
346 MdcOverride override = getStartTime(new MdcOverride());
348 String fullUrl = getFullUrl("/" + indexName + "/", false);
349 HttpURLConnection conn = initializeConnection(fullUrl);
352 conn.setRequestMethod("PUT");
353 } catch (ProtocolException e) {
354 shutdownConnection(conn);
355 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
358 StringBuilder sb = new StringBuilder(128);
359 sb.append("{ \"settings\" : ");
360 sb.append(indexSettings);
363 sb.append("\"mappings\" : {");
364 sb.append("\"" + typeName + "\" :");
365 sb.append(indexMappings);
368 attachContent(conn, sb.toString());
370 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
371 logger.debug("Request content: " + sb.toString());
373 handleResponse(conn, opResult);
375 shutdownConnection(conn);
377 // Generate a metrics log so we can track how long the operation took.
378 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
380 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
381 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
389 * Will send the passed in JSON payload to Elasticsearch using the
390 * provided index name in an attempt to create the index.
392 * @param indexName - The name of the index to be created
393 * @param settingsAndMappings - The actual JSON object that will define the index
394 * @return - The operation result of writing into Elasticsearch
395 * @throws DocumentStoreOperationException
397 protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
398 OperationResult result = new OperationResult();
399 result.setResultCode(500);
400 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
402 // Grab the current time so we can use it to generate a metrics log.
403 MdcOverride override = getStartTime(new MdcOverride());
405 String fullUrl = getFullUrl("/" + indexName + "/", false);
406 HttpURLConnection conn = initializeConnection(fullUrl);
409 conn.setRequestMethod("PUT");
410 } catch (ProtocolException e) {
411 shutdownConnection(conn);
412 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
415 attachContent(conn, settingsAndMappings);
416 handleResponse(conn, result);
418 // Generate a metrics log so we can track how long the operation took.
419 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
421 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
422 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
430 public DocumentOperationResult createDocument(String indexName,
431 DocumentStoreDataEntity document,
432 boolean allowImplicitIndexCreation)
433 throws DocumentStoreOperationException {
435 if(!allowImplicitIndexCreation) {
437 // Before we do anything, make sure that the specified index actually exists in the
438 // document store - we don't want to rely on ElasticSearch to fail the document
439 // create because it could be configured to implicitly create a non-existent index,
440 // which can lead to hard-to-debug behaviour with queries down the road.
441 OperationResult indexExistsResult = checkIndexExistence(indexName);
442 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
444 DocumentOperationResult opResult = new DocumentOperationResult();
445 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
446 opResult.setResult("Document Index '" + indexName + "' does not exist.");
447 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
452 if (document.getId() == null || document.getId().isEmpty()) {
453 return createDocumentWithoutId(indexName, document);
455 return createDocumentWithId(indexName, document);
459 private DocumentOperationResult createDocumentWithId(String indexName,
460 DocumentStoreDataEntity document)
461 throws DocumentStoreOperationException {
462 // check if the document already exists
463 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
466 if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
467 if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
468 opResult.setFailureCause("A document with the same id already exists.");
470 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
472 opResult.setResultCode(HttpStatus.CONFLICT.value());
476 opResult = new DocumentOperationResult();
477 // Initialize operation result with a failure codes / fault string
478 opResult.setResultCode(500);
479 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
481 // Grab the current time so we can use it to generate a metrics log.
482 MdcOverride override = getStartTime(new MdcOverride());
484 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
485 + "/" + document.getId(), false);
486 HttpURLConnection conn = initializeConnection(fullUrl);
489 conn.setRequestMethod("PUT");
490 } catch (ProtocolException e) {
491 shutdownConnection(conn);
492 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
495 attachDocument(conn, document);
497 logger.debug("Sending 'PUT' request to: " + conn.getURL());
499 handleResponse(conn, opResult);
500 buildDocumentResult(opResult, indexName);
502 // Generate a metrics log so we can track how long the operation took.
503 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
505 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
506 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
510 shutdownConnection(conn);
516 private DocumentOperationResult createDocumentWithoutId(String indexName,
517 DocumentStoreDataEntity document)
518 throws DocumentStoreOperationException {
520 DocumentOperationResult response = new DocumentOperationResult();
521 // Initialize operation result with a failure codes / fault string
522 response.setResultCode(500);
523 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
525 // Grab the current time so we can use it to generate a metrics log.
526 MdcOverride override = getStartTime(new MdcOverride());
528 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
529 HttpURLConnection conn = initializeConnection(fullUrl);
532 conn.setRequestMethod("POST");
533 } catch (ProtocolException e) {
534 shutdownConnection(conn);
535 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
538 attachDocument(conn, document);
540 logger.debug("Sending 'POST' request to: " + conn.getURL());
542 handleResponse(conn, response);
543 buildDocumentResult(response, indexName);
545 // Generate a metrics log so we can track how long the operation took.
546 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
548 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
549 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
553 shutdownConnection(conn);
558 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
559 throws DocumentStoreOperationException {
560 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
561 conn.setRequestProperty("Connection", "Close");
563 attachContent(conn, doc.getContentInJson());
566 private DocumentOperationResult checkDocumentExistence(String indexName,
568 throws DocumentStoreOperationException {
569 DocumentOperationResult opResult = new DocumentOperationResult();
571 // Initialize operation result with a failure codes / fault string
572 opResult.setResultCode(500);
574 // Grab the current time so we can use it to generate a metrics log.
575 MdcOverride override = getStartTime(new MdcOverride());
577 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
578 HttpURLConnection conn = initializeConnection(fullUrl);
581 conn.setRequestMethod("HEAD");
582 } catch (ProtocolException e) {
583 shutdownConnection(conn);
584 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
587 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
591 resultCode = conn.getResponseCode();
592 } catch (IOException e) {
593 shutdownConnection(conn);
594 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
597 logger.debug("Response Code : " + resultCode);
599 opResult.setResultCode(resultCode);
601 // Generate a metrics log so we can track how long the operation took.
602 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
604 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
605 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
610 shutdownConnection(conn);
616 public DocumentOperationResult updateDocument(String indexName,
617 DocumentStoreDataEntity document,
618 boolean allowImplicitIndexCreation)
619 throws DocumentStoreOperationException {
621 if(!allowImplicitIndexCreation) {
623 // Before we do anything, make sure that the specified index actually exists in the
624 // document store - we don't want to rely on ElasticSearch to fail the document
625 // create because it could be configured to implicitly create a non-existent index,
626 // which can lead to hard-to-debug behaviour with queries down the road.
627 OperationResult indexExistsResult = checkIndexExistence(indexName);
628 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
630 DocumentOperationResult opResult = new DocumentOperationResult();
631 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
632 opResult.setResult("Document Index '" + indexName + "' does not exist.");
633 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
638 DocumentOperationResult opResult = new DocumentOperationResult();
640 // Initialize operation result with a failure codes / fault string
641 opResult.setResultCode(500);
642 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
644 // Grab the current time so we can use it to generate a metrics log.
645 MdcOverride override = getStartTime(new MdcOverride());
647 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
648 + "?version=" + document.getVersion(), false);
649 HttpURLConnection conn = initializeConnection(fullUrl);
652 conn.setRequestMethod("PUT");
653 } catch (ProtocolException e) {
654 shutdownConnection(conn);
655 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
658 attachDocument(conn, document);
660 logger.debug("Sending 'PUT' request to: " + conn.getURL());
662 handleResponse(conn, opResult);
663 buildDocumentResult(opResult, indexName);
665 // Generate a metrics log so we can track how long the operation took.
666 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
668 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
669 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
674 shutdownConnection(conn);
680 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
681 throws DocumentStoreOperationException {
682 DocumentOperationResult opResult = new DocumentOperationResult();
684 // Initialize operation result with a failure codes / fault string
685 opResult.setResultCode(500);
686 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
688 // Grab the current time so we can use it to generate a metrics log.
689 MdcOverride override = getStartTime(new MdcOverride());
691 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
692 + "?version=" + document.getVersion(), false);
693 HttpURLConnection conn = initializeConnection(fullUrl);
696 conn.setRequestMethod("DELETE");
697 } catch (ProtocolException e) {
698 shutdownConnection(conn);
699 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
702 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
704 handleResponse(conn, opResult);
705 buildDocumentResult(opResult, indexName);
706 //supress the etag and url in response for delete as they are not required
707 if (opResult.getDocument() != null) {
708 opResult.getDocument().setEtag(null);
709 opResult.getDocument().setUrl(null);
712 // Generate a metrics log so we can track how long the operation took.
713 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
715 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
716 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
721 shutdownConnection(conn);
727 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
728 throws DocumentStoreOperationException {
729 DocumentOperationResult opResult = new DocumentOperationResult();
731 // Initialize operation result with a failure codes / fault string
732 opResult.setResultCode(500);
733 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
735 // Grab the current time so we can use it to generate a metrics log.
736 MdcOverride override = getStartTime(new MdcOverride());
738 String fullUrl = null;
739 if (document.getVersion() == null) {
740 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
742 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
743 + "?version=" + document.getVersion(), false);
745 HttpURLConnection conn = initializeConnection(fullUrl);
747 logger.debug("\nSending 'GET' request to: " + conn.getURL());
749 handleResponse(conn, opResult);
750 buildDocumentResult(opResult, indexName);
752 // Generate a metrics log so we can track how long the operation took.
753 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
755 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
756 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
761 shutdownConnection(conn);
766 public SearchOperationResult search(String indexName, String queryString)
767 throws DocumentStoreOperationException {
768 SearchOperationResult opResult = new SearchOperationResult();
770 // Initialize operation result with a failure codes / fault string
771 opResult.setResultCode(500);
772 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
774 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
776 // Grab the current time so we can use it to generate a metrics log.
777 MdcOverride override = getStartTime(new MdcOverride());
779 HttpURLConnection conn = initializeConnection(fullUrl);
782 conn.setRequestMethod("GET");
783 } catch (ProtocolException e) {
784 shutdownConnection(conn);
785 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
788 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
790 handleResponse(conn, opResult);
791 buildSearchResult(opResult, indexName);
794 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
796 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
797 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
805 public SearchOperationResult searchWithPayload(String indexName, String query)
806 throws DocumentStoreOperationException {
807 SearchOperationResult opResult = new SearchOperationResult();
809 if (logger.isDebugEnabled()) {
810 logger.debug("Querying index: " + indexName + " with query string: " + query);
813 // Initialize operation result with a failure codes / fault string
814 opResult.setResultCode(500);
815 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
817 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
819 // Grab the current time so we can use it to generate a metrics log.
820 MdcOverride override = getStartTime(new MdcOverride());
822 HttpURLConnection conn = initializeConnection(fullUrl);
825 conn.setRequestMethod("POST");
826 } catch (ProtocolException e) {
827 shutdownConnection(conn);
828 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
831 attachContent(conn, query);
833 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
834 logger.debug("Request body = Elasticsearch query = " + query);
836 handleResponse(conn, opResult);
837 buildSearchResult(opResult, indexName);
839 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
841 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
842 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
847 shutdownConnection(conn);
853 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
854 throws DocumentStoreOperationException {
856 SearchOperationResult opResult = new SearchOperationResult();
858 if (logger.isDebugEnabled()) {
859 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
862 // Initialize operation result with a failure codes / fault string
863 opResult.setResultCode(500);
864 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
866 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
868 // Grab the current time so we can use it to generate a metrics log.
869 MdcOverride override = getStartTime(new MdcOverride());
871 HttpURLConnection conn = initializeConnection(fullUrl);
874 conn.setRequestMethod("POST");
875 } catch (ProtocolException e) {
876 shutdownConnection(conn);
877 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
880 attachContent(conn, query);
882 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
883 logger.debug("Request body = Elasticsearch query = " + query);
885 handleResponse(conn, opResult);
886 buildSuggestResult(opResult, indexName);
888 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
889 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
890 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
891 override, indexName, query);
893 shutdownConnection(conn);
898 private void attachContent(HttpURLConnection conn, String content)
899 throws DocumentStoreOperationException {
900 OutputStream outputStream = null;
901 OutputStreamWriter out = null;
904 outputStream = conn.getOutputStream();
905 } catch (IOException e) {
906 shutdownConnection(conn);
907 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
910 out = new OutputStreamWriter(outputStream);
915 } catch (IOException e) {
916 shutdownConnection(conn);
917 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
921 private HttpURLConnection initializeConnection(String fullUrl)
922 throws DocumentStoreOperationException {
924 HttpURLConnection conn = null;
927 url = new URL(fullUrl);
928 } catch (MalformedURLException e) {
929 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
933 conn = (HttpURLConnection) url.openConnection();
934 conn.setDoOutput(true);
935 } catch (IOException e) {
936 shutdownConnection(conn);
937 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
943 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
944 throws DocumentStoreOperationException {
945 int resultCode = 200;
948 resultCode = conn.getResponseCode();
949 } catch (IOException e) {
950 shutdownConnection(conn);
951 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
954 logger.debug("Response Code : " + resultCode);
956 InputStream inputStream = null;
958 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
959 inputStream = conn.getErrorStream();
962 inputStream = conn.getInputStream();
963 } catch (IOException e) {
964 shutdownConnection(conn);
965 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
969 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
970 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
972 StringBuilder result = new StringBuilder(128);
973 String string = null;
976 while ((string = bufferedreader.readLine()) != null) {
977 result.append(string).append("\n");
979 } catch (IOException e) {
980 shutdownConnection(conn);
981 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
984 if (resultCode == HttpStatus.CONFLICT.value()) {
985 opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
987 opResult.setResultCode(resultCode);
989 if (logger.isDebugEnabled()) {
990 logger.debug("Raw result string from ElasticSearch = " + result.toString());
992 opResult.setResult(result.toString());
993 opResult.setResultVersion(extractVersion(result.toString()));
996 private String extractVersion(String result) throws DocumentStoreOperationException {
998 JSONParser parser = new JSONParser();
999 String version = null;
1001 JSONObject root = (JSONObject) parser.parse(result);
1002 if (root.get("_version") != null) {
1003 version = root.get("_version").toString();
1006 } catch (ParseException e) {
1008 // Not all responses from ElasticSearch include a version, so
1009 // if we don't get one back, just return an empty string rather
1010 // than trigger a false failure.
1017 * This convenience method gets the current system time and stores
1018 * it in an attribute in the supplied {@link MdcOverride} object so
1019 * that it can be used later by the metrics logger.
1021 * @param override - The {@link MdcOverride} object to update.
1022 * @return - The supplied {@link MdcOverride} object.
1024 private MdcOverride getStartTime(MdcOverride override) {
1026 // Grab the current time...
1027 long startTimeInMs = System.currentTimeMillis();
1029 // ...and add it as an attribute to the supplied MDC Override
1031 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1032 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1034 // Return the MdcOverride object that we were passed.
1035 // This looks odd, but it allows us to do stuff like:
1037 // MdcOverride ov = getStartTime(new MdcOverride())
1039 // which is quite handy, but also allows us to pass in an existing
1040 // MdcOverride object which already has some attributes set.
1044 private boolean isSuccess(OperationResult result) {
1046 return isSuccessCode(result.getResultCode());
1050 private boolean isSuccessCode(int statusCode) {
1051 return ((statusCode >= 200) && (statusCode < 300));
1056 public OperationResult performBulkOperations(BulkRequest[] requests)
1057 throws DocumentStoreOperationException {
1059 if (logger.isDebugEnabled()) {
1060 String dbgString = "ESController: performBulkOperations - Operations: ";
1062 for (BulkRequest request : requests) {
1063 dbgString += "[" + request.toString() + "] ";
1066 logger.debug(dbgString);
1069 // Grab the current time so we can use it to generate a metrics log.
1070 MdcOverride override = getStartTime(new MdcOverride());
1072 // Parse the supplied set of operations.
1073 // Iterate over the list of operations which we were provided and
1074 // translate them into a format that ElasticSearh understands.
1076 StringBuilder esOperationSet = new StringBuilder(128);
1077 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1078 for (BulkRequest request : requests) {
1080 // Convert the request to the syntax ElasticSearch likes.
1081 if (buildEsOperation(request, esOperationSet, rejected)) {
1086 ElasticSearchBulkOperationResult opResult = null;
1089 // Open an HTTP connection to the ElasticSearch back end.
1090 String fullUrl = getFullUrl("/_bulk", false);
1092 HttpURLConnection conn;
1095 url = new URL(fullUrl);
1096 conn = (HttpURLConnection) url.openConnection();
1097 conn.setRequestMethod("PUT");
1098 conn.setDoOutput(true);
1099 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1100 conn.setRequestProperty("Connection", "Close");
1102 } catch (IOException e) {
1104 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1105 if (logger.isDebugEnabled()) {
1106 logger.debug(Throwables.getStackTraceAsString(e));
1109 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
1110 + e.getMessage(), e);
1113 StringBuilder bulkResult = new StringBuilder(128);
1115 // Create an output stream to write our request to.
1116 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1119 if (logger.isDebugEnabled()) {
1120 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1121 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1125 // Write the resulting request string to our output stream. (this sends the request to ES?)
1126 out.write(esOperationSet.toString());
1129 // Open an input stream on our connection in order to read back the results.
1130 InputStream is = conn.getInputStream();
1131 InputStreamReader inputstreamreader = new InputStreamReader(is);
1132 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1134 // Read the contents of the input stream into our result string...
1135 String esResponseString = null;
1137 while ((esResponseString = bufferedreader.readLine()) != null) {
1138 bulkResult.append(esResponseString).append("\n");
1141 } catch (IOException e) {
1143 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1144 if (logger.isDebugEnabled()) {
1145 StringWriter sw = new StringWriter();
1146 e.printStackTrace(new PrintWriter(sw));
1147 logger.debug(sw.toString());
1150 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1151 + e.getMessage(), e);
1154 if (logger.isDebugEnabled()) {
1155 logger.debug("ESController: Received result string from ElasticSearch: = "
1156 + bulkResult.toString());
1159 // ...and marshal the resulting string into a Java object.
1161 opResult = marshallEsBulkResult(bulkResult.toString());
1163 } catch (IOException e) {
1165 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1166 if (logger.isDebugEnabled()) {
1167 logger.debug(Throwables.getStackTraceAsString(e));
1170 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1171 + e.getMessage(), e);
1175 // Finally, build the operation result and return it to the caller.
1176 OperationResult result = new OperationResult();
1177 result.setResultCode(207);
1178 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1180 // In the success case we don't want the entire result string to be
1181 // dumped into the metrics log, so concatenate it.
1182 String resultStringForMetricsLog = result.getResult();
1183 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1184 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1185 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1188 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1190 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1191 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1199 * This method converts a {@link BulkRequest} object into a json structure
1200 * which can be understood by ElasticSearch.
1202 * @param request - The request to be performed.
1203 * @param sb - The string builder to append the json data to
1204 * @throws DocumentStoreOperationException
1206 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1207 List<ElasticSearchResultItem> fails)
1208 throws DocumentStoreOperationException {
1210 boolean retVal = true;
1211 OperationResult indexExistsResult = null;
1213 // What kind of operation are we performing?
1214 switch (request.getOperationType()) {
1216 // Create a new document.
1219 // Make sure that we were supplied a document payload.
1220 if (request.getOperation().getDocument() == null) {
1222 fails.add(generateRejectionEntry(request.getOperationType(),
1223 "Missing document payload",
1227 request.getOperation().getMetaData().getUrl()));
1231 // Make sure that the supplied document URL is formatted
1233 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1234 fails.add(generateRejectionEntry(request.getOperationType(),
1235 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1239 request.getOperation().getMetaData().getUrl()));
1243 // Validate that the specified index actually exists before we
1244 // try to perform the create.
1245 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1247 fails.add(generateRejectionEntry(request.getOperationType(),
1248 "Specified resource does not exist: "
1249 + request.getOperation().getMetaData().getUrl(),
1253 request.getOperation().getMetaData().getUrl()));
1257 // If we were supplied an id for the new document, then
1258 // include it in the bulk operation to Elastic Search
1259 if (request.getId() == null) {
1261 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1265 // Otherwise, we just leave that parameter off and ElasticSearch
1266 // will generate one for us.
1268 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1275 // Append the document that we want to create.
1276 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1277 } catch (JsonProcessingException e) {
1278 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1283 // Update an existing document.
1286 // Make sure that we were supplied a document payload.
1287 if (request.getOperation().getDocument() == null) {
1289 fails.add(generateRejectionEntry(request.getOperationType(),
1290 "Missing document payload",
1294 request.getOperation().getMetaData().getUrl()));
1298 // Make sure that the supplied document URL is formatted
1300 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1301 fails.add(generateRejectionEntry(request.getOperationType(),
1302 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1306 request.getOperation().getMetaData().getUrl()));
1310 // Validate that the specified index actually exists before we
1311 // try to perform the update.
1312 if (!indexExists(request.getIndex())) {
1314 fails.add(generateRejectionEntry(request.getOperationType(),
1315 "Specified resource does not exist: "
1316 + request.getOperation().getMetaData().getUrl(),
1320 request.getOperation().getMetaData().getUrl()));
1324 // Validate that the document we are trying to update actually
1325 // exists before we try to perform the update.
1326 if (!documentExists(request.getIndex(), request.getId())) {
1328 fails.add(generateRejectionEntry(request.getOperationType(),
1329 "Specified resource does not exist: "
1330 + request.getOperation().getMetaData().getUrl(),
1334 request.getOperation().getMetaData().getUrl()));
1338 // It is mandatory that a version be supplied for an update operation,
1339 // so validate that now.
1340 if (request.getOperation().getMetaData().getEtag() == null) {
1342 fails.add(generateRejectionEntry(request.getOperationType(),
1343 "Missing mandatory ETag field",
1347 request.getOperation().getMetaData().getUrl()));
1351 // Generate the update request...
1352 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1356 request.getOperation().getMetaData().getEtag()));
1358 // ...and append the document that we want to update.
1360 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1361 } catch (JsonProcessingException e) {
1362 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1366 // Delete an existing document.
1369 // Make sure that the supplied document URL is formatted
1371 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1372 fails.add(generateRejectionEntry(request.getOperationType(),
1373 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1377 request.getOperation().getMetaData().getUrl()));
1381 // Validate that the specified index actually exists before we
1382 // try to perform the delete.
1383 if (!indexExists(request.getIndex())) {
1385 fails.add(generateRejectionEntry(request.getOperationType(),
1386 "Specified resource does not exist: "
1387 + request.getOperation().getMetaData().getUrl(),
1391 request.getOperation().getMetaData().getUrl()));
1395 // Validate that the document we are trying to update actually
1396 // exists before we try to perform the delete.
1397 if (!documentExists(request.getIndex(), request.getId())) {
1399 fails.add(generateRejectionEntry(request.getOperationType(),
1400 "Specified resource does not exist: "
1401 + request.getOperation().getMetaData().getUrl(),
1405 request.getOperation().getMetaData().getUrl()));
1409 // It is mandatory that a version be supplied for a delete operation,
1410 // so validate that now.
1411 if (request.getOperation().getMetaData().getEtag() == null) {
1413 fails.add(generateRejectionEntry(request.getOperationType(),
1414 "Missing mandatory ETag field",
1418 request.getOperation().getMetaData().getUrl()));
1422 // Generate the delete request.
1423 sb.append(String.format(BULK_DELETE_TEMPLATE,
1427 request.getOperation().getMetaData().getEtag()));
1435 private boolean indexExists(String index) throws DocumentStoreOperationException {
1437 OperationResult indexExistsResult = checkIndexExistence(index);
1439 return ((indexExistsResult.getResultCode() >= 200)
1440 && (indexExistsResult.getResultCode() < 300));
1443 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1445 OperationResult docExistsResult = checkDocumentExistence(index, id);
1447 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1451 * This method constructs a status entry for a bulk operation which has
1452 * been rejected before even sending it to the document store.
1454 * @param rejectReason - A message describing why the operation was rejected.
1455 * @param anId - The identifier associated with the document being
1457 * @param statusCode - An HTTP status code.
1458 * @return - A result set item.
1460 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1461 String rejectReason,
1465 String originalUrl) {
1467 ElasticSearchError err = new ElasticSearchError();
1468 err.setReason(rejectReason);
1470 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1473 op.setStatus(statusCode);
1475 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1477 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1481 rejectionResult.setCreate(op);
1484 rejectionResult.setIndex(op);
1487 rejectionResult.setDelete(op);
1492 return rejectionResult;
1497 * This method takes the json structure returned from ElasticSearch in
1498 * response to a bulk operations request and marshals it into a Java
1501 * @param jsonResult - The bulk operations response returned from
1503 * @return - The marshalled response.
1504 * @throws JsonParseException
1505 * @throws JsonMappingException
1506 * @throws IOException
1508 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1509 throws JsonParseException, JsonMappingException, IOException {
1511 if (jsonResult != null) {
1512 if (logger.isDebugEnabled()) {
1513 logger.debug("ESController: Marshalling ES result set from json: "
1514 + jsonResult.replaceAll("\n", ""));
1517 ObjectMapper mapper = new ObjectMapper();
1518 mapper.setSerializationInclusion(Include.NON_EMPTY);
1520 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1528 * This method takes the marshalled ElasticSearch bulk response and
1529 * converts it into a generic response payload.
1531 * @param esResult - ElasticSearch bulk operations response.
1532 * @return - A generic result set.
1534 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1535 List<ElasticSearchResultItem> rejectedOps) {
1538 int totalSuccess = 0;
1541 if (logger.isDebugEnabled()) {
1543 logger.debug("ESController: Build generic result set. ES Results: "
1544 + ((esResult != null) ? esResult.toString() : "[]")
1545 + " Rejected Ops: " + rejectedOps.toString());
1548 // Build a combined list of result items from the results returned
1549 // from ElasticSearch and the list of operations that we rejected
1550 // without sending to ElasticSearch.
1551 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1552 if (esResult != null) {
1553 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1555 combinedResults.addAll(rejectedOps);
1557 // Iterate over the individual results in the resulting result set.
1558 StringBuilder resultsBuilder = new StringBuilder();
1559 AtomicBoolean firstItem = new AtomicBoolean(true);
1560 for (ElasticSearchResultItem item : combinedResults) {
1562 // Increment the operation counts.
1564 if (isSuccessCode(item.operationStatus().getStatus())) {
1570 // Prepend a comma to our response string unless this it the
1571 // first result in the set.
1572 if (!firstItem.compareAndSet(true, false)) {
1573 resultsBuilder.append(", ");
1576 // Append the current result as a generic json structure.
1577 resultsBuilder.append(item.toJson());
1580 // Now, build the result string and return it.
1581 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1582 + "\"total_success\": " + totalSuccess + ", "
1583 + "\"total_fails\": " + totalFails + ", "
1585 + resultsBuilder.toString()
1588 return responseBody;
1593 * This method queryies ElasticSearch to determine if the supplied
1594 * index is present in the document store.
1596 * @param indexName - The index to look for.
1597 * @return - An operation result indicating the success or failure of
1599 * @throws DocumentStoreOperationException
1601 public OperationResult checkIndexExistence(String indexName)
1602 throws DocumentStoreOperationException {
1604 // Initialize operation result with a failure codes / fault string
1605 OperationResult opResult = new OperationResult();
1606 opResult.setResultCode(500);
1608 // Grab the current time so we can use it to generate a metrics log.
1609 MdcOverride override = getStartTime(new MdcOverride());
1611 String fullUrl = getFullUrl("/" + indexName, false);
1612 HttpURLConnection conn = initializeConnection(fullUrl);
1615 conn.setRequestMethod("HEAD");
1617 } catch (ProtocolException e) {
1618 shutdownConnection(conn);
1619 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1622 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1626 resultCode = conn.getResponseCode();
1627 } catch (IOException e) {
1628 shutdownConnection(conn);
1629 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1631 logger.debug("Response Code : " + resultCode);
1633 opResult.setResultCode(resultCode);
1635 // Generate a metrics log so we can track how long the operation took.
1636 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1638 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1639 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1643 shutdownConnection(conn);
1649 private void buildDocumentResult(DocumentOperationResult result, String index)
1650 throws DocumentStoreOperationException {
1652 JSONParser parser = new JSONParser();
1655 root = (JSONObject) parser.parse(result.getResult());
1657 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1658 // Success response object
1659 Document doc = new Document();
1660 doc.setEtag(result.getResultVersion());
1661 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1663 doc.setContent((JSONObject) root.get("_source"));
1664 result.setDocument(doc);
1667 // Error response object
1668 JSONObject error = (JSONObject) root.get("error");
1669 if (error != null) {
1670 result.setError(new ErrorResult(error.get("type").toString(),
1671 error.get("reason").toString()));
1675 } catch (Exception e) {
1676 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1677 + result.getResult());
1683 private String buildDocumentResponseUrl(String index, String id) {
1684 return ApiUtils.buildDocumentUri(index, id);
1687 private void buildSearchResult(SearchOperationResult result, String index)
1688 throws DocumentStoreOperationException {
1690 JSONParser parser = new JSONParser();
1694 root = (JSONObject) parser.parse(result.getResult());
1695 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1696 JSONObject hits = (JSONObject) root.get("hits");
1697 JSONArray hitArray = (JSONArray) hits.get("hits");
1698 SearchHits searchHits = new SearchHits();
1699 searchHits.setTotalHits(hits.get("total").toString());
1700 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1702 for (int i = 0; i < hitArray.size(); i++) {
1703 JSONObject hit = (JSONObject) hitArray.get(i);
1704 SearchHit searchHit = new SearchHit();
1705 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1706 Document doc = new Document();
1707 if (hit.get("_version") != null) {
1708 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1711 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1712 ? hit.get("_id").toString() : ""));
1713 doc.setContent((JSONObject) hit.get("_source"));
1714 searchHit.setDocument(doc);
1715 searchHitArray.add(searchHit);
1717 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1718 result.setSearchResult(searchHits);
1720 JSONObject aggregations = (JSONObject) root.get("aggregations");
1721 if (aggregations != null) {
1722 AggregationResult[] aggResults =
1723 AggregationParsingUtil.parseAggregationResults(aggregations);
1724 AggregationResults aggs = new AggregationResults();
1725 aggs.setAggregations(aggResults);
1726 result.setAggregationResult(aggs);
1731 JSONObject error = (JSONObject) root.get("error");
1732 if (error != null) {
1733 result.setError(new ErrorResult(error.get("type").toString(),
1734 error.get("reason").toString()));
1737 } catch (Exception e) {
1738 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1739 + result.getResult());
1744 private void buildSuggestResult(SearchOperationResult result, String index)
1745 throws DocumentStoreOperationException {
1747 JSONParser parser = new JSONParser ();
1750 root = (JSONObject) parser.parse ( result.getResult () );
1751 if (result.getResultCode () >= 200 && result.getResultCode () <= 299) {
1752 JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" );
1753 JSONObject hitdata = (JSONObject) hitArray.get ( 0 );
1754 JSONArray optionsArray = (JSONArray) hitdata.get ( "options" );
1755 SuggestHits suggestHits = new SuggestHits ();
1756 suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) );
1758 ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> ();
1760 for (int i = 0; i < optionsArray.size (); i++) {
1761 JSONObject hit = (JSONObject) optionsArray.get ( i );
1763 SuggestHit suggestHit = new SuggestHit ();
1764 suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" );
1765 suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" );
1766 Document doc = new Document ();
1767 if (hit.get ( "_version" ) != null) {
1768 doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" );
1770 doc.setUrl ( buildDocumentResponseUrl ( index,
1771 (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) );
1773 doc.setContent ( (JSONObject) hit.get ( "payload" ) );
1774 suggestHit.setDocument ( doc );
1775 suggestHitArray.add ( suggestHit );
1777 suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) );
1778 result.setSuggestResult ( suggestHits );
1780 JSONObject aggregations = (JSONObject) root.get ( "aggregations" );
1781 if (aggregations != null) {
1782 AggregationResult[] aggResults =
1783 AggregationParsingUtil.parseAggregationResults ( aggregations );
1784 AggregationResults aggs = new AggregationResults ();
1785 aggs.setAggregations ( aggResults );
1786 result.setAggregationResult ( aggs );
1791 JSONObject error = (JSONObject) root.get ( "error" );
1792 if (error != null) {
1794 new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) );
1797 } catch (Exception e) {
1798 throw new DocumentStoreOperationException (
1799 "Failed to parse Elastic Search response." + result.getResult () );