2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23 import com.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import org.json.simple.JSONArray;
30 import org.json.simple.JSONObject;
31 import org.json.simple.parser.JSONParser;
32 import org.json.simple.parser.ParseException;
33 import org.onap.aai.sa.rest.AnalysisConfiguration;
34 import org.onap.aai.sa.rest.ApiUtils;
35 import org.onap.aai.sa.rest.BulkRequest;
36 import org.onap.aai.sa.rest.BulkRequest.OperationType;
37 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
39 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
41 import org.onap.aai.sa.searchdbabstraction.entity.Document;
42 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
48 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
49 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
50 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
52 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
53 import org.onap.aai.cl.api.LogFields;
54 import org.onap.aai.cl.api.LogLine;
55 import org.onap.aai.cl.api.Logger;
56 import org.onap.aai.cl.eelf.LoggerFactory;
57 import org.onap.aai.cl.mdc.MdcContext;
58 import org.onap.aai.cl.mdc.MdcOverride;
59 import org.onap.aai.sa.rest.DocumentSchema;
60 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
61 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
63 import java.io.BufferedReader;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.io.InputStreamReader;
69 import java.io.OutputStream;
70 import java.io.OutputStreamWriter;
71 import java.io.PrintWriter;
72 import java.io.StringWriter;
73 import java.net.HttpURLConnection;
74 import java.net.MalformedURLException;
75 import java.net.ProtocolException;
77 import java.text.SimpleDateFormat;
78 import java.util.ArrayList;
79 import java.util.Arrays;
80 import java.util.List;
81 import java.util.Properties;
82 import java.util.concurrent.atomic.AtomicBoolean;
84 import org.springframework.http.HttpStatus;
89 * This class has the Elasticsearch implementation of the
90 * DB operations defined in DocumentStoreInterface.
92 public class ElasticSearchHttpController implements DocumentStoreInterface {
94 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
95 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
96 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
97 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
98 private static final String BULK_IMPORT_INDEX_TEMPLATE =
99 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
100 private static final String BULK_DELETE_TEMPLATE =
101 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
103 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
104 "Internal Error: ElasticSearch operation fault occurred";
105 private static final Logger logger = LoggerFactory.getInstance()
106 .getLogger(ElasticSearchHttpController.class.getName());
107 private static final Logger metricsLogger = LoggerFactory.getInstance()
108 .getMetricsLogger(ElasticSearchHttpController.class.getName());
109 private final ElasticSearchConfig config;
111 private static final String DEFAULT_TYPE = "default";
113 private static ElasticSearchHttpController instance = null;
115 protected AnalysisConfiguration analysisConfig;
117 public static ElasticSearchHttpController getInstance() {
119 synchronized (ElasticSearchHttpController.class) {
121 if (instance == null) {
123 Properties properties = new Properties();
124 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
126 properties.load(new FileInputStream(file));
127 } catch (Exception e) {
128 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
129 "ElasticSearchHTTPController.getInstance",
130 e.getLocalizedMessage());
133 ElasticSearchConfig config = new ElasticSearchConfig(properties);
134 instance = new ElasticSearchHttpController(config);
141 public ElasticSearchHttpController(ElasticSearchConfig config) {
142 this.config = config;
143 analysisConfig = new AnalysisConfiguration();
146 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
148 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
149 } catch (Exception e) {
150 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
151 getFullUrl("", false), e.getMessage());
156 public AnalysisConfiguration getAnalysisConfig() {
157 return analysisConfig;
161 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
163 OperationResult result = new OperationResult();
164 result.setResultCode(500);
168 // Submit the request to ElasticSearch to create the index using a
169 // default document type.
170 result = createTable(index,
172 analysisConfig.getEsIndexSettings(),
173 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
175 // ElasticSearch will return us a 200 code on success when we
176 // want to report a 201, so translate the result here.
177 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
178 if (isSuccess(result)) {
179 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
180 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
183 } catch (DocumentStoreOperationException | IOException e) {
185 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
192 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
193 OperationResult result = new OperationResult();
194 result.setResultCode(500);
197 result = createTable(index, dynamicSchema);
199 // ElasticSearch will return us a 200 code on success when we
200 // want to report a 201, so translate the result here.
201 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
202 if (isSuccess(result)) {
203 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
205 } catch (DocumentStoreOperationException e) {
206 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
214 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
216 //Initialize operation result with a failure codes / fault string
217 OperationResult opResult = new OperationResult();
218 opResult.setResultCode(500);
219 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
221 // Grab the current time so we can use it to generate a metrics log.
222 MdcOverride override = getStartTime(new MdcOverride());
224 String fullUrl = getFullUrl("/" + indexName + "/", false);
225 HttpURLConnection conn = initializeConnection(fullUrl);
227 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
230 conn.setRequestMethod("DELETE");
231 } catch (ProtocolException e) {
232 shutdownConnection(conn);
233 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
236 handleResponse(conn, opResult);
238 // Generate a metrics log so we can track how long the operation took.
239 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
241 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
242 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
246 shutdownConnection(conn);
252 private OperationResult checkConnection() throws Exception {
254 String fullUrl = getFullUrl("/_cluster/health", false);
256 HttpURLConnection conn = null;
258 url = new URL(fullUrl);
259 conn = (HttpURLConnection) url.openConnection();
260 conn.setRequestMethod("GET");
261 conn.setDoOutput(true);
262 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
264 int resultCode = conn.getResponseCode();
265 logger.debug("getClusterHealth() response Code : " + resultCode);
266 OperationResult opResult = new OperationResult();
267 opResult.setResultCode(resultCode);
269 shutdownConnection(conn);
274 private String getFullUrl(String resourceUrl, boolean isSecure) {
276 final String host = config.getIpAddress();
277 final String port = config.getHttpPort();
280 return String.format("https://%s:%s%s", host, port, resourceUrl);
282 return String.format("http://%s:%s%s", host, port, resourceUrl);
286 private void shutdownConnection(HttpURLConnection connection) {
287 if (connection == null) {
291 InputStream inputstream = null;
292 OutputStream outputstream = null;
295 inputstream = connection.getInputStream();
296 } catch (IOException e) {
297 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
299 if (inputstream != null) {
302 } catch (IOException e) {
303 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
304 e.getLocalizedMessage());
310 outputstream = connection.getOutputStream();
311 } catch (IOException e) {
312 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
314 if (outputstream != null) {
316 outputstream.close();
317 } catch (IOException e) {
318 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
319 e.getLocalizedMessage());
324 connection.disconnect();
328 protected OperationResult createTable(String indexName, String typeName,
329 String indexSettings, String indexMappings)
330 throws DocumentStoreOperationException {
332 if (indexSettings == null) {
333 logger.debug("No settings provided.");
336 if (indexMappings == null) {
337 logger.debug("No mappings provided.");
340 OperationResult opResult = new OperationResult();
342 // Initialize operation result with a failure codes / fault string
343 opResult.setResultCode(500);
344 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
346 // Grab the current time so we can use it to generate a metrics log.
347 MdcOverride override = getStartTime(new MdcOverride());
349 String fullUrl = getFullUrl("/" + indexName + "/", false);
350 HttpURLConnection conn = initializeConnection(fullUrl);
353 conn.setRequestMethod("PUT");
354 conn.setRequestProperty("Content-Type", "application/json");
355 } catch (ProtocolException e) {
356 shutdownConnection(conn);
357 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
360 StringBuilder sb = new StringBuilder(128);
361 sb.append("{ \"settings\" : ");
362 sb.append(indexSettings);
365 sb.append("\"mappings\" : {");
366 sb.append("\"" + typeName + "\" :");
367 sb.append(indexMappings);
371 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
372 } catch(IOException e) {
373 throw new DocumentStoreOperationException("Error in translating Index payload to make it ES compliant.", e);
376 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
377 logger.debug("Request content: " + sb.toString());
379 handleResponse(conn, opResult);
381 shutdownConnection(conn);
383 // Generate a metrics log so we can track how long the operation took.
384 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
386 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
387 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
395 * Will send the passed in JSON payload to Elasticsearch using the
396 * provided index name in an attempt to create the index.
398 * @param indexName - The name of the index to be created
399 * @param settingsAndMappings - The actual JSON object that will define the index
400 * @return - The operation result of writing into Elasticsearch
401 * @throws DocumentStoreOperationException
403 protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
404 OperationResult result = new OperationResult();
405 result.setResultCode(500);
406 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
408 // Grab the current time so we can use it to generate a metrics log.
409 MdcOverride override = getStartTime(new MdcOverride());
411 String fullUrl = getFullUrl("/" + indexName + "/", false);
412 HttpURLConnection conn = initializeConnection(fullUrl);
415 conn.setRequestMethod("PUT");
416 conn.setRequestProperty("Content-Type", "application/json");
417 } catch (ProtocolException e) {
418 shutdownConnection(conn);
419 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
423 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
424 } catch(IOException e) {
425 throw new DocumentStoreOperationException("Error in translating DynamicIndex payload to make it ES compliant.", e);
427 handleResponse(conn, result);
429 // Generate a metrics log so we can track how long the operation took.
430 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
432 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
433 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
441 public DocumentOperationResult createDocument(String indexName,
442 DocumentStoreDataEntity document,
443 boolean allowImplicitIndexCreation)
444 throws DocumentStoreOperationException {
446 if(!allowImplicitIndexCreation) {
448 // Before we do anything, make sure that the specified index actually exists in the
449 // document store - we don't want to rely on ElasticSearch to fail the document
450 // create because it could be configured to implicitly create a non-existent index,
451 // which can lead to hard-to-debug behaviour with queries down the road.
452 OperationResult indexExistsResult = checkIndexExistence(indexName);
453 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
455 DocumentOperationResult opResult = new DocumentOperationResult();
456 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
457 opResult.setResult("Document Index '" + indexName + "' does not exist.");
458 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
463 if (document.getId() == null || document.getId().isEmpty()) {
464 return createDocumentWithoutId(indexName, document);
466 return createDocumentWithId(indexName, document);
470 private DocumentOperationResult createDocumentWithId(String indexName,
471 DocumentStoreDataEntity document)
472 throws DocumentStoreOperationException {
473 // check if the document already exists
474 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
477 if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
478 if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
479 opResult.setFailureCause("A document with the same id already exists.");
481 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
483 opResult.setResultCode(HttpStatus.CONFLICT.value());
487 opResult = new DocumentOperationResult();
488 // Initialize operation result with a failure codes / fault string
489 opResult.setResultCode(500);
490 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
492 // Grab the current time so we can use it to generate a metrics log.
493 MdcOverride override = getStartTime(new MdcOverride());
495 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
496 + "/" + document.getId(), false);
497 HttpURLConnection conn = initializeConnection(fullUrl);
500 conn.setRequestMethod("PUT");
501 } catch (ProtocolException e) {
502 shutdownConnection(conn);
503 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
506 attachDocument(conn, document);
508 logger.debug("Sending 'PUT' request to: " + conn.getURL());
510 handleResponse(conn, opResult);
511 buildDocumentResult(opResult, indexName);
513 // Generate a metrics log so we can track how long the operation took.
514 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
516 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
517 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
521 shutdownConnection(conn);
527 private DocumentOperationResult createDocumentWithoutId(String indexName,
528 DocumentStoreDataEntity document)
529 throws DocumentStoreOperationException {
531 DocumentOperationResult response = new DocumentOperationResult();
532 // Initialize operation result with a failure codes / fault string
533 response.setResultCode(500);
534 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
536 // Grab the current time so we can use it to generate a metrics log.
537 MdcOverride override = getStartTime(new MdcOverride());
539 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
540 HttpURLConnection conn = initializeConnection(fullUrl);
543 conn.setRequestMethod("POST");
544 } catch (ProtocolException e) {
545 shutdownConnection(conn);
546 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
549 attachDocument(conn, document);
551 logger.debug("Sending 'POST' request to: " + conn.getURL());
553 handleResponse(conn, response);
554 buildDocumentResult(response, indexName);
556 // Generate a metrics log so we can track how long the operation took.
557 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
559 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
560 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
564 shutdownConnection(conn);
569 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
570 throws DocumentStoreOperationException {
571 // conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
572 conn.setRequestProperty("Content-Type", "application/json");
573 conn.setRequestProperty("Connection", "Close");
576 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(doc.getContentInJson()));
577 } catch(IOException e) {
578 throw new DocumentStoreOperationException("Error in translating Document payload to make it ES compliant.", e);
582 private DocumentOperationResult checkDocumentExistence(String indexName,
584 throws DocumentStoreOperationException {
585 DocumentOperationResult opResult = new DocumentOperationResult();
587 // Initialize operation result with a failure codes / fault string
588 opResult.setResultCode(500);
590 // Grab the current time so we can use it to generate a metrics log.
591 MdcOverride override = getStartTime(new MdcOverride());
593 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
594 HttpURLConnection conn = initializeConnection(fullUrl);
597 conn.setRequestMethod("HEAD");
598 } catch (ProtocolException e) {
599 shutdownConnection(conn);
600 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
603 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
607 resultCode = conn.getResponseCode();
608 } catch (IOException e) {
609 shutdownConnection(conn);
610 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
613 logger.debug("Response Code : " + resultCode);
615 opResult.setResultCode(resultCode);
617 // Generate a metrics log so we can track how long the operation took.
618 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
620 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
621 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
626 shutdownConnection(conn);
632 public DocumentOperationResult updateDocument(String indexName,
633 DocumentStoreDataEntity document,
634 boolean allowImplicitIndexCreation)
635 throws DocumentStoreOperationException {
637 if(!allowImplicitIndexCreation) {
639 // Before we do anything, make sure that the specified index actually exists in the
640 // document store - we don't want to rely on ElasticSearch to fail the document
641 // create because it could be configured to implicitly create a non-existent index,
642 // which can lead to hard-to-debug behaviour with queries down the road.
643 OperationResult indexExistsResult = checkIndexExistence(indexName);
644 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
646 DocumentOperationResult opResult = new DocumentOperationResult();
647 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
648 opResult.setResult("Document Index '" + indexName + "' does not exist.");
649 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
654 DocumentOperationResult opResult = new DocumentOperationResult();
656 // Initialize operation result with a failure codes / fault string
657 opResult.setResultCode(500);
658 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
660 // Grab the current time so we can use it to generate a metrics log.
661 MdcOverride override = getStartTime(new MdcOverride());
663 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
664 + "?version=" + document.getVersion(), false);
665 HttpURLConnection conn = initializeConnection(fullUrl);
668 conn.setRequestMethod("PUT");
669 conn.setRequestProperty("Content-Type", "application/json");
670 } catch (ProtocolException e) {
671 shutdownConnection(conn);
672 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
675 attachDocument(conn, document);
677 logger.debug("Sending 'PUT' request to: " + conn.getURL());
679 handleResponse(conn, opResult);
680 buildDocumentResult(opResult, indexName);
682 // Generate a metrics log so we can track how long the operation took.
683 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
685 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
686 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
691 shutdownConnection(conn);
697 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
698 throws DocumentStoreOperationException {
699 DocumentOperationResult opResult = new DocumentOperationResult();
701 // Initialize operation result with a failure codes / fault string
702 opResult.setResultCode(500);
703 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
705 // Grab the current time so we can use it to generate a metrics log.
706 MdcOverride override = getStartTime(new MdcOverride());
708 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
709 + "?version=" + document.getVersion(), false);
710 HttpURLConnection conn = initializeConnection(fullUrl);
713 conn.setRequestMethod("DELETE");
714 } catch (ProtocolException e) {
715 shutdownConnection(conn);
716 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
719 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
721 handleResponse(conn, opResult);
722 buildDocumentResult(opResult, indexName);
723 //supress the etag and url in response for delete as they are not required
724 if (opResult.getDocument() != null) {
725 opResult.getDocument().setEtag(null);
726 opResult.getDocument().setUrl(null);
729 // Generate a metrics log so we can track how long the operation took.
730 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
732 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
733 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
738 shutdownConnection(conn);
744 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
745 throws DocumentStoreOperationException {
746 DocumentOperationResult opResult = new DocumentOperationResult();
748 // Initialize operation result with a failure codes / fault string
749 opResult.setResultCode(500);
750 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
752 // Grab the current time so we can use it to generate a metrics log.
753 MdcOverride override = getStartTime(new MdcOverride());
755 String fullUrl = null;
756 if (document.getVersion() == null) {
757 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
759 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
760 + "?version=" + document.getVersion(), false);
762 HttpURLConnection conn = initializeConnection(fullUrl);
764 logger.debug("\nSending 'GET' request to: " + conn.getURL());
766 handleResponse(conn, opResult);
767 buildDocumentResult(opResult, indexName);
769 // Generate a metrics log so we can track how long the operation took.
770 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
772 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
773 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
778 shutdownConnection(conn);
783 public SearchOperationResult search(String indexName, String queryString)
784 throws DocumentStoreOperationException {
785 SearchOperationResult opResult = new SearchOperationResult();
787 // Initialize operation result with a failure codes / fault string
788 opResult.setResultCode(500);
789 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
791 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
793 // Grab the current time so we can use it to generate a metrics log.
794 MdcOverride override = getStartTime(new MdcOverride());
796 HttpURLConnection conn = initializeConnection(fullUrl);
799 conn.setRequestMethod("GET");
800 } catch (ProtocolException e) {
801 shutdownConnection(conn);
802 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
805 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
807 handleResponse(conn, opResult);
808 buildSearchResult(opResult, indexName);
811 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
813 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
814 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
822 public SearchOperationResult searchWithPayload(String indexName, String query)
823 throws DocumentStoreOperationException {
824 SearchOperationResult opResult = new SearchOperationResult();
826 if (logger.isDebugEnabled()) {
827 logger.debug("Querying index: " + indexName + " with query string: " + query);
830 // Initialize operation result with a failure codes / fault string
831 opResult.setResultCode(500);
832 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
834 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
836 // Grab the current time so we can use it to generate a metrics log.
837 MdcOverride override = getStartTime(new MdcOverride());
839 HttpURLConnection conn = initializeConnection(fullUrl);
842 conn.setRequestMethod("POST");
843 conn.setRequestProperty("Content-Type", "application/json");
844 } catch (ProtocolException e) {
845 shutdownConnection(conn);
846 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
849 attachContent(conn, query);
851 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
852 logger.debug("Request body = Elasticsearch query = " + query);
854 handleResponse(conn, opResult);
855 buildSearchResult(opResult, indexName);
857 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
859 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
860 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
865 shutdownConnection(conn);
871 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
872 throws DocumentStoreOperationException {
874 SearchOperationResult opResult = new SearchOperationResult();
876 if (logger.isDebugEnabled()) {
877 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
880 // Initialize operation result with a failure codes / fault string
881 opResult.setResultCode(500);
882 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
884 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
886 // Grab the current time so we can use it to generate a metrics log.
887 MdcOverride override = getStartTime(new MdcOverride());
889 HttpURLConnection conn = initializeConnection(fullUrl);
892 conn.setRequestMethod("POST");
893 conn.setRequestProperty("Content-Type", "application/json");
894 } catch (ProtocolException e) {
895 shutdownConnection(conn);
896 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
899 attachContent(conn, query);
901 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
902 logger.debug("Request body = Elasticsearch query = " + query);
904 handleResponse(conn, opResult);
905 buildSuggestResult(opResult, indexName);
907 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
908 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
909 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
910 override, indexName, query);
912 shutdownConnection(conn);
917 private void attachContent(HttpURLConnection conn, String content)
918 throws DocumentStoreOperationException {
919 OutputStream outputStream = null;
920 OutputStreamWriter out = null;
923 outputStream = conn.getOutputStream();
924 } catch (IOException e) {
925 shutdownConnection(conn);
926 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
929 out = new OutputStreamWriter(outputStream);
934 } catch (IOException e) {
935 shutdownConnection(conn);
936 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
940 private HttpURLConnection initializeConnection(String fullUrl)
941 throws DocumentStoreOperationException {
943 HttpURLConnection conn = null;
946 url = new URL(fullUrl);
947 } catch (MalformedURLException e) {
948 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
952 conn = (HttpURLConnection) url.openConnection();
953 conn.setDoOutput(true);
954 } catch (IOException e) {
955 shutdownConnection(conn);
956 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
962 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
963 throws DocumentStoreOperationException {
964 int resultCode = 200;
967 resultCode = conn.getResponseCode();
968 } catch (IOException e) {
969 shutdownConnection(conn);
970 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
973 logger.debug("Response Code : " + resultCode);
975 InputStream inputStream = null;
977 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
978 inputStream = conn.getErrorStream();
981 inputStream = conn.getInputStream();
982 } catch (IOException e) {
983 shutdownConnection(conn);
984 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
988 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
989 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
991 StringBuilder result = new StringBuilder(128);
992 String string = null;
995 while ((string = bufferedreader.readLine()) != null) {
996 result.append(string).append("\n");
998 } catch (IOException e) {
999 shutdownConnection(conn);
1000 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
1003 if (resultCode == HttpStatus.CONFLICT.value()) {
1004 opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
1006 opResult.setResultCode(resultCode);
1008 if (logger.isDebugEnabled()) {
1009 logger.debug("Raw result string from ElasticSearch = " + result.toString());
1011 opResult.setResult(result.toString());
1012 opResult.setResultVersion(extractVersion(result.toString()));
1015 private String extractVersion(String result) throws DocumentStoreOperationException {
1017 JSONParser parser = new JSONParser();
1018 String version = null;
1020 JSONObject root = (JSONObject) parser.parse(result);
1021 if (root.get("_version") != null) {
1022 version = root.get("_version").toString();
1025 } catch (ParseException e) {
1027 // Not all responses from ElasticSearch include a version, so
1028 // if we don't get one back, just return an empty string rather
1029 // than trigger a false failure.
1036 * This convenience method gets the current system time and stores
1037 * it in an attribute in the supplied {@link MdcOverride} object so
1038 * that it can be used later by the metrics logger.
1040 * @param override - The {@link MdcOverride} object to update.
1041 * @return - The supplied {@link MdcOverride} object.
1043 private MdcOverride getStartTime(MdcOverride override) {
1045 // Grab the current time...
1046 long startTimeInMs = System.currentTimeMillis();
1048 // ...and add it as an attribute to the supplied MDC Override
1050 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1051 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1053 // Return the MdcOverride object that we were passed.
1054 // This looks odd, but it allows us to do stuff like:
1056 // MdcOverride ov = getStartTime(new MdcOverride())
1058 // which is quite handy, but also allows us to pass in an existing
1059 // MdcOverride object which already has some attributes set.
1063 private boolean isSuccess(OperationResult result) {
1065 return isSuccessCode(result.getResultCode());
1069 private boolean isSuccessCode(int statusCode) {
1070 return ((statusCode >= 200) && (statusCode < 300));
1075 public OperationResult performBulkOperations(BulkRequest[] requests)
1076 throws DocumentStoreOperationException {
1078 if (logger.isDebugEnabled()) {
1079 String dbgString = "ESController: performBulkOperations - Operations: ";
1081 for (BulkRequest request : requests) {
1082 dbgString += "[" + request.toString() + "] ";
1085 logger.debug(dbgString);
1088 // Grab the current time so we can use it to generate a metrics log.
1089 MdcOverride override = getStartTime(new MdcOverride());
1091 // Parse the supplied set of operations.
1092 // Iterate over the list of operations which we were provided and
1093 // translate them into a format that ElasticSearh understands.
1095 StringBuilder esOperationSet = new StringBuilder(128);
1096 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1097 for (BulkRequest request : requests) {
1099 // Convert the request to the syntax ElasticSearch likes.
1100 if (buildEsOperation(request, esOperationSet, rejected)) {
1105 ElasticSearchBulkOperationResult opResult = null;
1108 // Open an HTTP connection to the ElasticSearch back end.
1109 String fullUrl = getFullUrl("/_bulk", false);
1111 HttpURLConnection conn;
1114 url = new URL(fullUrl);
1115 conn = (HttpURLConnection) url.openConnection();
1116 conn.setRequestMethod("PUT");
1117 conn.setDoOutput(true);
1118 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1119 conn.setRequestProperty("Connection", "Close");
1121 } catch (IOException e) {
1123 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1124 if (logger.isDebugEnabled()) {
1125 logger.debug(Throwables.getStackTraceAsString(e));
1128 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
1129 + e.getMessage(), e);
1132 StringBuilder bulkResult = new StringBuilder(128);
1134 // Create an output stream to write our request to.
1135 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1138 if (logger.isDebugEnabled()) {
1139 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1140 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1144 // Write the resulting request string to our output stream. (this sends the request to ES?)
1145 out.write(esOperationSet.toString());
1148 // Open an input stream on our connection in order to read back the results.
1149 InputStream is = conn.getInputStream();
1150 InputStreamReader inputstreamreader = new InputStreamReader(is);
1151 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1153 // Read the contents of the input stream into our result string...
1154 String esResponseString = null;
1156 while ((esResponseString = bufferedreader.readLine()) != null) {
1157 bulkResult.append(esResponseString).append("\n");
1160 } catch (IOException e) {
1162 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1163 if (logger.isDebugEnabled()) {
1164 StringWriter sw = new StringWriter();
1165 e.printStackTrace(new PrintWriter(sw));
1166 logger.debug(sw.toString());
1169 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1170 + e.getMessage(), e);
1173 if (logger.isDebugEnabled()) {
1174 logger.debug("ESController: Received result string from ElasticSearch: = "
1175 + bulkResult.toString());
1178 // ...and marshal the resulting string into a Java object.
1180 opResult = marshallEsBulkResult(bulkResult.toString());
1182 } catch (IOException e) {
1184 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1185 if (logger.isDebugEnabled()) {
1186 logger.debug(Throwables.getStackTraceAsString(e));
1189 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1190 + e.getMessage(), e);
1194 // Finally, build the operation result and return it to the caller.
1195 OperationResult result = new OperationResult();
1196 result.setResultCode(207);
1197 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1199 // In the success case we don't want the entire result string to be
1200 // dumped into the metrics log, so concatenate it.
1201 String resultStringForMetricsLog = result.getResult();
1202 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1203 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1204 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1207 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1209 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1210 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1218 * This method converts a {@link BulkRequest} object into a json structure
1219 * which can be understood by ElasticSearch.
1221 * @param request - The request to be performed.
1222 * @param sb - The string builder to append the json data to
1223 * @throws DocumentStoreOperationException
1225 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1226 List<ElasticSearchResultItem> fails)
1227 throws DocumentStoreOperationException {
1229 boolean retVal = true;
1230 OperationResult indexExistsResult = null;
1232 // What kind of operation are we performing?
1233 switch (request.getOperationType()) {
1235 // Create a new document.
1238 // Make sure that we were supplied a document payload.
1239 if (request.getOperation().getDocument() == null) {
1241 fails.add(generateRejectionEntry(request.getOperationType(),
1242 "Missing document payload",
1246 request.getOperation().getMetaData().getUrl()));
1250 // Make sure that the supplied document URL is formatted
1252 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1253 fails.add(generateRejectionEntry(request.getOperationType(),
1254 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1258 request.getOperation().getMetaData().getUrl()));
1262 // Validate that the specified index actually exists before we
1263 // try to perform the create.
1264 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1266 fails.add(generateRejectionEntry(request.getOperationType(),
1267 "Specified resource does not exist: "
1268 + request.getOperation().getMetaData().getUrl(),
1272 request.getOperation().getMetaData().getUrl()));
1276 // If we were supplied an id for the new document, then
1277 // include it in the bulk operation to Elastic Search
1278 if (request.getId() == null) {
1280 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1284 // Otherwise, we just leave that parameter off and ElasticSearch
1285 // will generate one for us.
1287 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1294 // Append the document that we want to create.
1295 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1296 } catch (JsonProcessingException e) {
1297 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1302 // Update an existing document.
1305 // Make sure that we were supplied a document payload.
1306 if (request.getOperation().getDocument() == null) {
1308 fails.add(generateRejectionEntry(request.getOperationType(),
1309 "Missing document payload",
1313 request.getOperation().getMetaData().getUrl()));
1317 // Make sure that the supplied document URL is formatted
1319 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1320 fails.add(generateRejectionEntry(request.getOperationType(),
1321 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1325 request.getOperation().getMetaData().getUrl()));
1329 // Validate that the specified index actually exists before we
1330 // try to perform the update.
1331 if (!indexExists(request.getIndex())) {
1333 fails.add(generateRejectionEntry(request.getOperationType(),
1334 "Specified resource does not exist: "
1335 + request.getOperation().getMetaData().getUrl(),
1339 request.getOperation().getMetaData().getUrl()));
1343 // Validate that the document we are trying to update actually
1344 // exists before we try to perform the update.
1345 if (!documentExists(request.getIndex(), request.getId())) {
1347 fails.add(generateRejectionEntry(request.getOperationType(),
1348 "Specified resource does not exist: "
1349 + request.getOperation().getMetaData().getUrl(),
1353 request.getOperation().getMetaData().getUrl()));
1357 // It is mandatory that a version be supplied for an update operation,
1358 // so validate that now.
1359 if (request.getOperation().getMetaData().getEtag() == null) {
1361 fails.add(generateRejectionEntry(request.getOperationType(),
1362 "Missing mandatory ETag field",
1366 request.getOperation().getMetaData().getUrl()));
1370 // Generate the update request...
1371 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1375 request.getOperation().getMetaData().getEtag()));
1377 // ...and append the document that we want to update.
1379 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1380 } catch (JsonProcessingException e) {
1381 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1385 // Delete an existing document.
1388 // Make sure that the supplied document URL is formatted
1390 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1391 fails.add(generateRejectionEntry(request.getOperationType(),
1392 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1396 request.getOperation().getMetaData().getUrl()));
1400 // Validate that the specified index actually exists before we
1401 // try to perform the delete.
1402 if (!indexExists(request.getIndex())) {
1404 fails.add(generateRejectionEntry(request.getOperationType(),
1405 "Specified resource does not exist: "
1406 + request.getOperation().getMetaData().getUrl(),
1410 request.getOperation().getMetaData().getUrl()));
1414 // Validate that the document we are trying to update actually
1415 // exists before we try to perform the delete.
1416 if (!documentExists(request.getIndex(), request.getId())) {
1418 fails.add(generateRejectionEntry(request.getOperationType(),
1419 "Specified resource does not exist: "
1420 + request.getOperation().getMetaData().getUrl(),
1424 request.getOperation().getMetaData().getUrl()));
1428 // It is mandatory that a version be supplied for a delete operation,
1429 // so validate that now.
1430 if (request.getOperation().getMetaData().getEtag() == null) {
1432 fails.add(generateRejectionEntry(request.getOperationType(),
1433 "Missing mandatory ETag field",
1437 request.getOperation().getMetaData().getUrl()));
1441 // Generate the delete request.
1442 sb.append(String.format(BULK_DELETE_TEMPLATE,
1446 request.getOperation().getMetaData().getEtag()));
1454 private boolean indexExists(String index) throws DocumentStoreOperationException {
1456 OperationResult indexExistsResult = checkIndexExistence(index);
1458 return ((indexExistsResult.getResultCode() >= 200)
1459 && (indexExistsResult.getResultCode() < 300));
1462 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1464 OperationResult docExistsResult = checkDocumentExistence(index, id);
1466 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1470 * This method constructs a status entry for a bulk operation which has
1471 * been rejected before even sending it to the document store.
1473 * @param rejectReason - A message describing why the operation was rejected.
1474 * @param anId - The identifier associated with the document being
1476 * @param statusCode - An HTTP status code.
1477 * @return - A result set item.
1479 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1480 String rejectReason,
1484 String originalUrl) {
1486 ElasticSearchError err = new ElasticSearchError();
1487 err.setReason(rejectReason);
1489 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1492 op.setStatus(statusCode);
1494 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1496 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1500 rejectionResult.setCreate(op);
1503 rejectionResult.setIndex(op);
1506 rejectionResult.setDelete(op);
1511 return rejectionResult;
1516 * This method takes the json structure returned from ElasticSearch in
1517 * response to a bulk operations request and marshals it into a Java
1520 * @param jsonResult - The bulk operations response returned from
1522 * @return - The marshalled response.
1523 * @throws JsonParseException
1524 * @throws JsonMappingException
1525 * @throws IOException
1527 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1528 throws JsonParseException, JsonMappingException, IOException {
1530 if (jsonResult != null) {
1531 if (logger.isDebugEnabled()) {
1532 logger.debug("ESController: Marshalling ES result set from json: "
1533 + jsonResult.replaceAll("\n", ""));
1536 ObjectMapper mapper = new ObjectMapper();
1537 mapper.setSerializationInclusion(Include.NON_EMPTY);
1539 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1547 * This method takes the marshalled ElasticSearch bulk response and
1548 * converts it into a generic response payload.
1550 * @param esResult - ElasticSearch bulk operations response.
1551 * @return - A generic result set.
1553 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1554 List<ElasticSearchResultItem> rejectedOps) {
1557 int totalSuccess = 0;
1560 if (logger.isDebugEnabled()) {
1562 logger.debug("ESController: Build generic result set. ES Results: "
1563 + ((esResult != null) ? esResult.toString() : "[]")
1564 + " Rejected Ops: " + rejectedOps.toString());
1567 // Build a combined list of result items from the results returned
1568 // from ElasticSearch and the list of operations that we rejected
1569 // without sending to ElasticSearch.
1570 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1571 if (esResult != null) {
1572 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1574 combinedResults.addAll(rejectedOps);
1576 // Iterate over the individual results in the resulting result set.
1577 StringBuilder resultsBuilder = new StringBuilder();
1578 AtomicBoolean firstItem = new AtomicBoolean(true);
1579 for (ElasticSearchResultItem item : combinedResults) {
1581 // Increment the operation counts.
1583 if (isSuccessCode(item.operationStatus().getStatus())) {
1589 // Prepend a comma to our response string unless this it the
1590 // first result in the set.
1591 if (!firstItem.compareAndSet(true, false)) {
1592 resultsBuilder.append(", ");
1595 // Append the current result as a generic json structure.
1596 resultsBuilder.append(item.toJson());
1599 // Now, build the result string and return it.
1600 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1601 + "\"total_success\": " + totalSuccess + ", "
1602 + "\"total_fails\": " + totalFails + ", "
1604 + resultsBuilder.toString()
1607 return responseBody;
1612 * This method queryies ElasticSearch to determine if the supplied
1613 * index is present in the document store.
1615 * @param indexName - The index to look for.
1616 * @return - An operation result indicating the success or failure of
1618 * @throws DocumentStoreOperationException
1620 public OperationResult checkIndexExistence(String indexName)
1621 throws DocumentStoreOperationException {
1623 // Initialize operation result with a failure codes / fault string
1624 OperationResult opResult = new OperationResult();
1625 opResult.setResultCode(500);
1627 // Grab the current time so we can use it to generate a metrics log.
1628 MdcOverride override = getStartTime(new MdcOverride());
1630 String fullUrl = getFullUrl("/" + indexName, false);
1631 HttpURLConnection conn = initializeConnection(fullUrl);
1634 conn.setRequestMethod("HEAD");
1636 } catch (ProtocolException e) {
1637 shutdownConnection(conn);
1638 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1641 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1645 resultCode = conn.getResponseCode();
1646 } catch (IOException e) {
1647 shutdownConnection(conn);
1648 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1650 logger.debug("Response Code : " + resultCode);
1652 opResult.setResultCode(resultCode);
1654 // Generate a metrics log so we can track how long the operation took.
1655 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1657 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1658 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1662 shutdownConnection(conn);
1668 private void buildDocumentResult(DocumentOperationResult result, String index)
1669 throws DocumentStoreOperationException {
1671 JSONParser parser = new JSONParser();
1674 root = (JSONObject) parser.parse(result.getResult());
1676 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1677 // Success response object
1678 Document doc = new Document();
1679 doc.setEtag(result.getResultVersion());
1680 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1682 doc.setContent((JSONObject) root.get("_source"));
1683 result.setDocument(doc);
1686 // Error response object
1687 JSONObject error = (JSONObject) root.get("error");
1688 if (error != null) {
1689 result.setError(new ErrorResult(error.get("type").toString(),
1690 error.get("reason").toString()));
1694 } catch (Exception e) {
1695 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1696 + result.getResult());
1702 private String buildDocumentResponseUrl(String index, String id) {
1703 return ApiUtils.buildDocumentUri(index, id);
1706 private void buildSearchResult(SearchOperationResult result, String index)
1707 throws DocumentStoreOperationException {
1709 JSONParser parser = new JSONParser();
1713 root = (JSONObject) parser.parse(result.getResult());
1714 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1715 JSONObject hits = (JSONObject) root.get("hits");
1716 JSONArray hitArray = (JSONArray) hits.get("hits");
1717 SearchHits searchHits = new SearchHits();
1718 searchHits.setTotalHits(hits.get("total").toString());
1719 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1721 for (int i = 0; i < hitArray.size(); i++) {
1722 JSONObject hit = (JSONObject) hitArray.get(i);
1723 SearchHit searchHit = new SearchHit();
1724 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1725 Document doc = new Document();
1726 if (hit.get("_version") != null) {
1727 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1730 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1731 ? hit.get("_id").toString() : ""));
1732 doc.setContent((JSONObject) hit.get("_source"));
1733 searchHit.setDocument(doc);
1734 searchHitArray.add(searchHit);
1736 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1737 result.setSearchResult(searchHits);
1739 JSONObject aggregations = (JSONObject) root.get("aggregations");
1740 if (aggregations != null) {
1741 AggregationResult[] aggResults =
1742 AggregationParsingUtil.parseAggregationResults(aggregations);
1743 AggregationResults aggs = new AggregationResults();
1744 aggs.setAggregations(aggResults);
1745 result.setAggregationResult(aggs);
1750 JSONObject error = (JSONObject) root.get("error");
1751 if (error != null) {
1752 result.setError(new ErrorResult(error.get("type").toString(),
1753 error.get("reason").toString()));
1756 } catch (Exception e) {
1757 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1758 + result.getResult());
1763 private void buildSuggestResult(SearchOperationResult result, String index)
1764 throws DocumentStoreOperationException {
1766 JSONParser parser = new JSONParser ();
1769 root = (JSONObject) parser.parse ( result.getResult () );
1770 if (result.getResultCode () >= 200 && result.getResultCode () <= 299) {
1771 JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" );
1772 JSONObject hitdata = (JSONObject) hitArray.get ( 0 );
1773 JSONArray optionsArray = (JSONArray) hitdata.get ( "options" );
1774 SuggestHits suggestHits = new SuggestHits ();
1775 suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) );
1777 ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> ();
1779 for (int i = 0; i < optionsArray.size (); i++) {
1780 JSONObject hit = (JSONObject) optionsArray.get ( i );
1782 SuggestHit suggestHit = new SuggestHit ();
1783 suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" );
1784 suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" );
1785 Document doc = new Document ();
1786 if (hit.get ( "_version" ) != null) {
1787 doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" );
1789 doc.setUrl ( buildDocumentResponseUrl ( index,
1790 (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) );
1792 doc.setContent ( (JSONObject) hit.get ( "payload" ) );
1793 suggestHit.setDocument ( doc );
1794 suggestHitArray.add ( suggestHit );
1796 suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) );
1797 result.setSuggestResult ( suggestHits );
1799 JSONObject aggregations = (JSONObject) root.get ( "aggregations" );
1800 if (aggregations != null) {
1801 AggregationResult[] aggResults =
1802 AggregationParsingUtil.parseAggregationResults ( aggregations );
1803 AggregationResults aggs = new AggregationResults ();
1804 aggs.setAggregations ( aggResults );
1805 result.setAggregationResult ( aggs );
1810 JSONObject error = (JSONObject) root.get ( "error" );
1811 if (error != null) {
1813 new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) );
1816 } catch (Exception e) {
1817 throw new DocumentStoreOperationException (
1818 "Failed to parse Elastic Search response." + result.getResult () );