2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23 import com.att.aft.dme2.internal.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import edu.emory.mathcs.backport.java.util.Arrays;
30 import org.json.simple.JSONArray;
31 import org.json.simple.JSONObject;
32 import org.json.simple.parser.JSONParser;
33 import org.json.simple.parser.ParseException;
34 import org.onap.aai.sa.rest.AnalysisConfiguration;
35 import org.onap.aai.sa.rest.ApiUtils;
36 import org.onap.aai.sa.rest.BulkRequest;
37 import org.onap.aai.sa.rest.BulkRequest.OperationType;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
39 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
41 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
42 import org.onap.aai.sa.searchdbabstraction.entity.Document;
43 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
48 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
49 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
50 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
51 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
52 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
53 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
54 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
55 import org.onap.aai.cl.api.LogFields;
56 import org.onap.aai.cl.api.LogLine;
57 import org.onap.aai.cl.api.Logger;
58 import org.onap.aai.cl.eelf.LoggerFactory;
59 import org.onap.aai.cl.mdc.MdcContext;
60 import org.onap.aai.cl.mdc.MdcOverride;
61 import org.onap.aai.sa.rest.DocumentSchema;
63 import java.io.BufferedReader;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.io.InputStreamReader;
69 import java.io.OutputStream;
70 import java.io.OutputStreamWriter;
71 import java.io.PrintWriter;
72 import java.io.StringWriter;
73 import java.net.HttpURLConnection;
74 import java.net.MalformedURLException;
75 import java.net.ProtocolException;
77 import java.text.SimpleDateFormat;
78 import java.util.ArrayList;
79 import java.util.List;
80 import java.util.Properties;
81 import java.util.concurrent.atomic.AtomicBoolean;
82 import javax.ws.rs.core.Response.Status;
85 * This class has the Elasticsearch implementation of the DB operations defined in
86 * DocumentStoreInterface.
88 public class ElasticSearchHttpController implements DocumentStoreInterface {
90 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
91 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
92 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
93 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
94 private static final String BULK_IMPORT_INDEX_TEMPLATE =
95 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
96 private static final String BULK_DELETE_TEMPLATE =
97 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
99 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
100 "Internal Error: ElasticSearch operation fault occurred";
101 private static final Logger logger =
102 LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
103 private static final Logger metricsLogger =
104 LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
105 private final ElasticSearchConfig config;
107 private static final String DEFAULT_TYPE = "default";
109 private static ElasticSearchHttpController instance = null;
111 protected AnalysisConfiguration analysisConfig;
113 public static ElasticSearchHttpController getInstance() {
115 synchronized (ElasticSearchHttpController.class) {
117 if (instance == null) {
119 Properties properties = new Properties();
120 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
122 properties.load(new FileInputStream(file));
123 } catch (Exception e) {
124 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
125 "ElasticSearchHTTPController.getInstance", e.getLocalizedMessage());
128 ElasticSearchConfig config = new ElasticSearchConfig(properties);
129 instance = new ElasticSearchHttpController(config);
136 public ElasticSearchHttpController(ElasticSearchConfig config) {
137 this.config = config;
138 analysisConfig = new AnalysisConfiguration();
141 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
143 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
144 } catch (Exception e) {
145 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
150 public AnalysisConfiguration getAnalysisConfig() {
151 return analysisConfig;
155 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
157 OperationResult result = new OperationResult();
158 result.setResultCode(500);
162 // Submit the request to ElasticSearch to create the index using a
163 // default document type.
164 result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
165 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
167 // ElasticSearch will return us a 200 code on success when we
168 // want to report a 201, so translate the result here.
169 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
170 if (isSuccess(result)) {
171 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
172 // result.setResult("{\"index\": \"" + index + ", \"type\": \""
173 // + DEFAULT_TYPE + "\"}");
176 } catch (DocumentStoreOperationException e) {
178 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
185 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
186 OperationResult result = new OperationResult();
187 result.setResultCode(500);
190 result = createTable(index, dynamicSchema);
192 // ElasticSearch will return us a 200 code on success when we
193 // want to report a 201, so translate the result here.
194 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
195 if (isSuccess(result)) {
196 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
198 } catch (DocumentStoreOperationException e) {
199 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
206 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
208 // Initialize operation result with a failure codes / fault string
209 OperationResult opResult = new OperationResult();
210 opResult.setResultCode(500);
211 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
213 // Grab the current time so we can use it to generate a metrics log.
214 MdcOverride override = getStartTime(new MdcOverride());
216 String fullUrl = getFullUrl("/" + indexName + "/", false);
217 HttpURLConnection conn = initializeConnection(fullUrl);
219 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
222 conn.setRequestMethod("DELETE");
223 } catch (ProtocolException e) {
224 shutdownConnection(conn);
225 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
228 handleResponse(conn, opResult);
230 // Generate a metrics log so we can track how long the operation took.
231 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
232 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
233 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
234 override, indexName);
236 shutdownConnection(conn);
241 private OperationResult checkConnection() throws Exception {
243 String fullUrl = getFullUrl("/_cluster/health", false);
245 HttpURLConnection conn = null;
247 url = new URL(fullUrl);
248 conn = (HttpURLConnection) url.openConnection();
249 conn.setRequestMethod("GET");
250 conn.setDoOutput(true);
251 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
253 int resultCode = conn.getResponseCode();
254 logger.debug("getClusterHealth() response Code : " + resultCode);
255 OperationResult opResult = new OperationResult();
256 opResult.setResultCode(resultCode);
258 shutdownConnection(conn);
263 private String getFullUrl(String resourceUrl, boolean isSecure) {
265 final String host = config.getIpAddress();
266 final String port = config.getHttpPort();
269 return String.format("https://%s:%s%s", host, port, resourceUrl);
271 return String.format("http://%s:%s%s", host, port, resourceUrl);
275 private void shutdownConnection(HttpURLConnection connection) {
276 if (connection == null) {
280 InputStream inputstream = null;
281 OutputStream outputstream = null;
284 inputstream = connection.getInputStream();
285 } catch (IOException e) {
286 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
287 e.getLocalizedMessage());
289 if (inputstream != null) {
292 } catch (IOException e) {
293 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
294 e.getLocalizedMessage());
300 outputstream = connection.getOutputStream();
301 } catch (IOException e) {
302 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
303 e.getLocalizedMessage());
305 if (outputstream != null) {
307 outputstream.close();
308 } catch (IOException e) {
309 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
310 e.getLocalizedMessage());
315 connection.disconnect();
319 protected OperationResult createTable(String indexName, String typeName, String indexSettings,
320 String indexMappings) throws DocumentStoreOperationException {
322 if (indexSettings == null) {
323 logger.debug("No settings provided.");
326 if (indexMappings == null) {
327 logger.debug("No mappings provided.");
330 OperationResult opResult = new OperationResult();
332 // Initialize operation result with a failure codes / fault string
333 opResult.setResultCode(500);
334 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
336 // Grab the current time so we can use it to generate a metrics log.
337 MdcOverride override = getStartTime(new MdcOverride());
339 String fullUrl = getFullUrl("/" + indexName + "/", false);
340 HttpURLConnection conn = initializeConnection(fullUrl);
343 conn.setRequestMethod("PUT");
344 } catch (ProtocolException e) {
345 shutdownConnection(conn);
346 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
349 StringBuilder sb = new StringBuilder(128);
350 sb.append("{ \"settings\" : ");
351 sb.append(indexSettings);
354 sb.append("\"mappings\" : {");
355 sb.append("\"" + typeName + "\" :");
356 sb.append(indexMappings);
359 attachContent(conn, sb.toString());
361 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
362 logger.debug("Request content: " + sb.toString());
364 handleResponse(conn, opResult);
366 shutdownConnection(conn);
368 // Generate a metrics log so we can track how long the operation took.
369 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
370 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
371 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
372 override, indexName);
378 * Will send the passed in JSON payload to Elasticsearch using the provided index name in an
379 * attempt to create the index.
381 * @param indexName - The name of the index to be created
382 * @param settingsAndMappings - The actual JSON object that will define the index
383 * @return - The operation result of writing into Elasticsearch
384 * @throws DocumentStoreOperationException
386 protected OperationResult createTable(String indexName, String settingsAndMappings)
387 throws DocumentStoreOperationException {
388 OperationResult result = new OperationResult();
389 result.setResultCode(500);
390 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
392 // Grab the current time so we can use it to generate a metrics log.
393 MdcOverride override = getStartTime(new MdcOverride());
395 String fullUrl = getFullUrl("/" + indexName + "/", false);
396 HttpURLConnection conn = initializeConnection(fullUrl);
399 conn.setRequestMethod("PUT");
400 } catch (ProtocolException e) {
401 shutdownConnection(conn);
402 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
405 attachContent(conn, settingsAndMappings);
406 handleResponse(conn, result);
408 // Generate a metrics log so we can track how long the operation took.
409 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
410 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
411 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
412 override, indexName);
414 shutdownConnection(conn);
420 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
421 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
423 if (!allowImplicitIndexCreation) {
425 // Before we do anything, make sure that the specified index
426 // actually exists in the
427 // document store - we don't want to rely on ElasticSearch to fail
429 // create because it could be configured to implicitly create a
430 // non-existent index,
431 // which can lead to hard-to-debug behaviour with queries down the
433 OperationResult indexExistsResult = checkIndexExistence(indexName);
434 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
436 DocumentOperationResult opResult = new DocumentOperationResult();
437 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
438 opResult.setResult("Document Index '" + indexName + "' does not exist.");
439 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
444 if (document.getId() == null || document.getId().isEmpty()) {
445 return createDocumentWithoutId(indexName, document);
447 return createDocumentWithId(indexName, document);
451 private DocumentOperationResult createDocumentWithId(String indexName,
452 DocumentStoreDataEntity document) throws DocumentStoreOperationException {
453 // check if the document already exists
454 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
456 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
457 if (opResult.getResultCode() == Status.OK.getStatusCode()) {
458 opResult.setFailureCause("A document with the same id already exists.");
460 opResult.setFailureCause(
461 "Failed to verify a document with the specified id does not already exist.");
463 opResult.setResultCode(Status.CONFLICT.getStatusCode());
467 opResult = new DocumentOperationResult();
468 // Initialize operation result with a failure codes / fault string
469 opResult.setResultCode(500);
470 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
472 // Grab the current time so we can use it to generate a metrics log.
473 MdcOverride override = getStartTime(new MdcOverride());
476 getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
477 HttpURLConnection conn = initializeConnection(fullUrl);
480 conn.setRequestMethod("PUT");
481 } catch (ProtocolException e) {
482 shutdownConnection(conn);
483 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
486 attachDocument(conn, document);
488 logger.debug("Sending 'PUT' request to: " + conn.getURL());
490 handleResponse(conn, opResult);
491 buildDocumentResult(opResult, indexName);
493 // Generate a metrics log so we can track how long the operation took.
494 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
495 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
496 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
497 override, indexName);
499 shutdownConnection(conn);
505 private DocumentOperationResult createDocumentWithoutId(String indexName,
506 DocumentStoreDataEntity document) throws DocumentStoreOperationException {
508 DocumentOperationResult response = new DocumentOperationResult();
509 // Initialize operation result with a failure codes / fault string
510 response.setResultCode(500);
511 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
513 // Grab the current time so we can use it to generate a metrics log.
514 MdcOverride override = getStartTime(new MdcOverride());
516 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
517 HttpURLConnection conn = initializeConnection(fullUrl);
520 conn.setRequestMethod("POST");
521 } catch (ProtocolException e) {
522 shutdownConnection(conn);
523 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
526 attachDocument(conn, document);
528 logger.debug("Sending 'POST' request to: " + conn.getURL());
530 handleResponse(conn, response);
531 buildDocumentResult(response, indexName);
533 // Generate a metrics log so we can track how long the operation took.
534 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
535 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
536 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
537 override, indexName);
539 shutdownConnection(conn);
544 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
545 throws DocumentStoreOperationException {
546 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
547 conn.setRequestProperty("Connection", "Close");
549 attachContent(conn, doc.getContentInJson());
552 private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
553 throws DocumentStoreOperationException {
554 DocumentOperationResult opResult = new DocumentOperationResult();
556 // Initialize operation result with a failure codes / fault string
557 opResult.setResultCode(500);
559 // Grab the current time so we can use it to generate a metrics log.
560 MdcOverride override = getStartTime(new MdcOverride());
562 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
563 HttpURLConnection conn = initializeConnection(fullUrl);
566 conn.setRequestMethod("HEAD");
567 } catch (ProtocolException e) {
568 shutdownConnection(conn);
569 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
572 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
576 resultCode = conn.getResponseCode();
577 } catch (IOException e) {
578 shutdownConnection(conn);
579 throw new DocumentStoreOperationException(
580 "Failed to get the response code from the connection.", e);
583 logger.debug("Response Code : " + resultCode);
585 opResult.setResultCode(resultCode);
587 // Generate a metrics log so we can track how long the operation took.
588 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
589 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
590 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
591 override, indexName, docId);
593 shutdownConnection(conn);
599 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
600 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
602 if (!allowImplicitIndexCreation) {
604 // Before we do anything, make sure that the specified index
605 // actually exists in the
606 // document store - we don't want to rely on ElasticSearch to fail
608 // create because it could be configured to implicitly create a
609 // non-existent index,
610 // which can lead to hard-to-debug behaviour with queries down the
612 OperationResult indexExistsResult = checkIndexExistence(indexName);
613 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
615 DocumentOperationResult opResult = new DocumentOperationResult();
616 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
617 opResult.setResult("Document Index '" + indexName + "' does not exist.");
618 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
623 DocumentOperationResult opResult = new DocumentOperationResult();
625 // Initialize operation result with a failure codes / fault string
626 opResult.setResultCode(500);
627 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
629 // Grab the current time so we can use it to generate a metrics log.
630 MdcOverride override = getStartTime(new MdcOverride());
632 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
633 + "?version=" + document.getVersion(), false);
634 HttpURLConnection conn = initializeConnection(fullUrl);
637 conn.setRequestMethod("PUT");
638 } catch (ProtocolException e) {
639 shutdownConnection(conn);
640 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
643 attachDocument(conn, document);
645 logger.debug("Sending 'PUT' request to: " + conn.getURL());
647 handleResponse(conn, opResult);
648 buildDocumentResult(opResult, indexName);
650 // Generate a metrics log so we can track how long the operation took.
651 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
652 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
653 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
654 override, indexName, document.getId());
656 shutdownConnection(conn);
662 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
663 throws DocumentStoreOperationException {
664 DocumentOperationResult opResult = new DocumentOperationResult();
666 // Initialize operation result with a failure codes / fault string
667 opResult.setResultCode(500);
668 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
670 // Grab the current time so we can use it to generate a metrics log.
671 MdcOverride override = getStartTime(new MdcOverride());
673 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
674 + "?version=" + document.getVersion(), false);
675 HttpURLConnection conn = initializeConnection(fullUrl);
678 conn.setRequestMethod("DELETE");
679 } catch (ProtocolException e) {
680 shutdownConnection(conn);
681 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
684 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
686 handleResponse(conn, opResult);
687 buildDocumentResult(opResult, indexName);
688 // supress the etag and url in response for delete as they are not
690 if (opResult.getDocument() != null) {
691 opResult.getDocument().setEtag(null);
692 opResult.getDocument().setUrl(null);
695 // Generate a metrics log so we can track how long the operation took.
696 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
697 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
698 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
699 override, indexName, document.getId());
701 shutdownConnection(conn);
707 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
708 throws DocumentStoreOperationException {
709 DocumentOperationResult opResult = new DocumentOperationResult();
711 // Initialize operation result with a failure codes / fault string
712 opResult.setResultCode(500);
713 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
715 // Grab the current time so we can use it to generate a metrics log.
716 MdcOverride override = getStartTime(new MdcOverride());
718 String fullUrl = null;
719 if (document.getVersion() == null) {
720 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
722 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
723 + "?version=" + document.getVersion(), false);
725 HttpURLConnection conn = initializeConnection(fullUrl);
727 logger.debug("\nSending 'GET' request to: " + conn.getURL());
729 handleResponse(conn, opResult);
730 buildDocumentResult(opResult, indexName);
732 // Generate a metrics log so we can track how long the operation took.
733 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
734 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
735 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
736 override, indexName, document.getId());
738 shutdownConnection(conn);
743 public SearchOperationResult search(String indexName, String queryString)
744 throws DocumentStoreOperationException {
745 SearchOperationResult opResult = new SearchOperationResult();
747 // Initialize operation result with a failure codes / fault string
748 opResult.setResultCode(500);
749 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
751 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
753 // Grab the current time so we can use it to generate a metrics log.
754 MdcOverride override = getStartTime(new MdcOverride());
756 HttpURLConnection conn = initializeConnection(fullUrl);
759 conn.setRequestMethod("GET");
760 } catch (ProtocolException e) {
761 shutdownConnection(conn);
762 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
765 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
767 handleResponse(conn, opResult);
768 buildSearchResult(opResult, indexName);
770 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
771 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
772 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
773 override, indexName, queryString);
778 public SearchOperationResult searchWithPayload(String indexName, String query)
779 throws DocumentStoreOperationException {
780 SearchOperationResult opResult = new SearchOperationResult();
782 if (logger.isDebugEnabled()) {
783 logger.debug("Querying index: " + indexName + " with query string: " + query);
786 // Initialize operation result with a failure codes / fault string
787 opResult.setResultCode(500);
788 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
790 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
792 // Grab the current time so we can use it to generate a metrics log.
793 MdcOverride override = getStartTime(new MdcOverride());
795 HttpURLConnection conn = initializeConnection(fullUrl);
798 conn.setRequestMethod("POST");
799 } catch (ProtocolException e) {
800 shutdownConnection(conn);
801 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
804 attachContent(conn, query);
806 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
807 logger.debug("Request body = Elasticsearch query = " + query);
809 handleResponse(conn, opResult);
810 buildSearchResult(opResult, indexName);
812 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
813 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
814 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
815 override, indexName, query);
817 shutdownConnection(conn);
822 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
823 throws DocumentStoreOperationException {
825 SearchOperationResult opResult = new SearchOperationResult();
827 if (logger.isDebugEnabled()) {
828 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
831 // Initialize operation result with a failure codes / fault string
832 opResult.setResultCode(500);
833 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
835 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
837 // Grab the current time so we can use it to generate a metrics log.
838 MdcOverride override = getStartTime(new MdcOverride());
840 HttpURLConnection conn = initializeConnection(fullUrl);
843 conn.setRequestMethod("POST");
844 } catch (ProtocolException e) {
845 shutdownConnection(conn);
846 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
849 attachContent(conn, query);
851 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
852 logger.debug("Request body = Elasticsearch query = " + query);
854 handleResponse(conn, opResult);
855 buildSuggestResult(opResult, indexName);
857 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
858 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
859 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
860 override, indexName, query);
862 shutdownConnection(conn);
867 private void attachContent(HttpURLConnection conn, String content)
868 throws DocumentStoreOperationException {
869 OutputStream outputStream = null;
870 OutputStreamWriter out = null;
873 outputStream = conn.getOutputStream();
874 } catch (IOException e) {
875 shutdownConnection(conn);
876 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
879 out = new OutputStreamWriter(outputStream);
884 } catch (IOException e) {
885 shutdownConnection(conn);
886 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
890 private HttpURLConnection initializeConnection(String fullUrl)
891 throws DocumentStoreOperationException {
893 HttpURLConnection conn = null;
896 url = new URL(fullUrl);
897 } catch (MalformedURLException e) {
898 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
902 conn = (HttpURLConnection) url.openConnection();
903 conn.setDoOutput(true);
904 } catch (IOException e) {
905 shutdownConnection(conn);
906 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
912 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
913 throws DocumentStoreOperationException {
914 int resultCode = 200;
917 resultCode = conn.getResponseCode();
918 } catch (IOException e) {
919 shutdownConnection(conn);
920 throw new DocumentStoreOperationException(
921 "Failed to get the response code from the connection.", e);
924 logger.debug("Response Code : " + resultCode);
926 InputStream inputStream = null;
928 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response
930 inputStream = conn.getErrorStream();
933 inputStream = conn.getInputStream();
934 } catch (IOException e) {
935 shutdownConnection(conn);
936 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
940 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
941 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
943 StringBuilder result = new StringBuilder(128);
944 String string = null;
947 while ((string = bufferedreader.readLine()) != null) {
948 result.append(string).append("\n");
950 } catch (IOException e) {
951 shutdownConnection(conn);
952 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
955 if (resultCode == Status.CONFLICT.getStatusCode()) {
956 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
958 opResult.setResultCode(resultCode);
960 if (logger.isDebugEnabled()) {
961 logger.debug("Raw result string from ElasticSearch = " + result.toString());
963 opResult.setResult(result.toString());
964 opResult.setResultVersion(extractVersion(result.toString()));
967 private String extractVersion(String result) throws DocumentStoreOperationException {
969 JSONParser parser = new JSONParser();
970 String version = null;
972 JSONObject root = (JSONObject) parser.parse(result);
973 if (root.get("_version") != null) {
974 version = root.get("_version").toString();
977 } catch (ParseException e) {
979 // Not all responses from ElasticSearch include a version, so
980 // if we don't get one back, just return an empty string rather
981 // than trigger a false failure.
988 * This convenience method gets the current system time and stores it in an attribute in the
989 * supplied {@link MdcOverride} object so that it can be used later by the metrics logger.
991 * @param override - The {@link MdcOverride} object to update.
992 * @return - The supplied {@link MdcOverride} object.
994 private MdcOverride getStartTime(MdcOverride override) {
996 // Grab the current time...
997 long startTimeInMs = System.currentTimeMillis();
999 // ...and add it as an attribute to the supplied MDC Override
1001 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1002 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1004 // Return the MdcOverride object that we were passed.
1005 // This looks odd, but it allows us to do stuff like:
1007 // MdcOverride ov = getStartTime(new MdcOverride())
1009 // which is quite handy, but also allows us to pass in an existing
1010 // MdcOverride object which already has some attributes set.
1014 private boolean isSuccess(OperationResult result) {
1016 return isSuccessCode(result.getResultCode());
1019 private boolean isSuccessCode(int statusCode) {
1020 return ((statusCode >= 200) && (statusCode < 300));
1024 public OperationResult performBulkOperations(BulkRequest[] requests)
1025 throws DocumentStoreOperationException {
1027 if (logger.isDebugEnabled()) {
1028 String dbgString = "ESController: performBulkOperations - Operations: ";
1030 for (BulkRequest request : requests) {
1031 dbgString += "[" + request.toString() + "] ";
1034 logger.debug(dbgString);
1037 // Grab the current time so we can use it to generate a metrics log.
1038 MdcOverride override = getStartTime(new MdcOverride());
1040 // Parse the supplied set of operations.
1041 // Iterate over the list of operations which we were provided and
1042 // translate them into a format that ElasticSearh understands.
1044 StringBuilder esOperationSet = new StringBuilder(128);
1045 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1046 for (BulkRequest request : requests) {
1048 // Convert the request to the syntax ElasticSearch likes.
1049 if (buildEsOperation(request, esOperationSet, rejected)) {
1054 ElasticSearchBulkOperationResult opResult = null;
1057 // Open an HTTP connection to the ElasticSearch back end.
1058 String fullUrl = getFullUrl("/_bulk", false);
1060 HttpURLConnection conn;
1063 url = new URL(fullUrl);
1064 conn = (HttpURLConnection) url.openConnection();
1065 conn.setRequestMethod("PUT");
1066 conn.setDoOutput(true);
1067 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1068 conn.setRequestProperty("Connection", "Close");
1070 } catch (IOException e) {
1072 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1073 if (logger.isDebugEnabled()) {
1074 logger.debug(Throwables.getStackTraceAsString(e));
1077 throw new DocumentStoreOperationException(
1078 "Failed to open connection to document store. Cause: " + e.getMessage(), e);
1081 StringBuilder bulkResult = new StringBuilder(128);
1083 // Create an output stream to write our request to.
1084 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());;
1086 if (logger.isDebugEnabled()) {
1087 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1089 "ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1092 // Write the resulting request string to our output stream.
1093 // (this sends the request to ES?)
1094 out.write(esOperationSet.toString());
1097 // Open an input stream on our connection in order to read back
1099 InputStream is = conn.getInputStream();
1100 InputStreamReader inputstreamreader = new InputStreamReader(is);
1101 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1103 // Read the contents of the input stream into our result
1105 String esResponseString = null;
1107 while ((esResponseString = bufferedreader.readLine()) != null) {
1108 bulkResult.append(esResponseString).append("\n");
1111 } catch (IOException e) {
1113 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1114 if (logger.isDebugEnabled()) {
1115 StringWriter sw = new StringWriter();
1116 e.printStackTrace(new PrintWriter(sw));
1117 logger.debug(sw.toString());
1120 throw new DocumentStoreOperationException(
1121 "Failure interacting with document store. Cause: " + e.getMessage(), e);
1124 if (logger.isDebugEnabled()) {
1126 "ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1129 // ...and marshal the resulting string into a Java object.
1131 opResult = marshallEsBulkResult(bulkResult.toString());
1133 } catch (IOException e) {
1135 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1136 if (logger.isDebugEnabled()) {
1137 logger.debug(Throwables.getStackTraceAsString(e));
1140 throw new DocumentStoreOperationException(
1141 "Failed to marshal response body. Cause: " + e.getMessage(), e);
1145 // Finally, build the operation result and return it to the caller.
1146 OperationResult result = new OperationResult();
1147 result.setResultCode(207);
1148 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1150 // In the success case we don't want the entire result string to be
1151 // dumped into the metrics log, so concatenate it.
1152 String resultStringForMetricsLog = result.getResult();
1153 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1154 resultStringForMetricsLog =
1155 resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85))
1159 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1160 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1161 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1168 * This method converts a {@link BulkRequest} object into a json structure which can be understood
1171 * @param request - The request to be performed.
1172 * @param sb - The string builder to append the json data to
1173 * @throws DocumentStoreOperationException
1175 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1176 List<ElasticSearchResultItem> fails) throws DocumentStoreOperationException {
1178 boolean retVal = true;
1179 OperationResult indexExistsResult = null;
1181 // What kind of operation are we performing?
1182 switch (request.getOperationType()) {
1184 // Create a new document.
1187 // Make sure that we were supplied a document payload.
1188 if (request.getOperation().getDocument() == null) {
1190 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1191 request.getIndex(), request.getId(), 400,
1192 request.getOperation().getMetaData().getUrl()));
1196 // Make sure that the supplied document URL is formatted
1198 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1199 fails.add(generateRejectionEntry(request.getOperationType(),
1200 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1201 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1205 // Validate that the specified index actually exists before we
1206 // try to perform the create.
1208 ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1211 .add(generateRejectionEntry(request.getOperationType(),
1212 "Specified resource does not exist: "
1213 + request.getOperation().getMetaData().getUrl(),
1214 request.getIndex(), request.getId(), 404,
1215 request.getOperation().getMetaData().getUrl()));
1219 // If we were supplied an id for the new document, then
1220 // include it in the bulk operation to Elastic Search
1221 if (request.getId() == null) {
1224 String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1226 // Otherwise, we just leave that parameter off and ElasticSearch
1227 // will generate one for us.
1229 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1234 // Append the document that we want to create.
1235 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1236 } catch (JsonProcessingException e) {
1237 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1242 // Update an existing document.
1245 // Make sure that we were supplied a document payload.
1246 if (request.getOperation().getDocument() == null) {
1248 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1249 request.getIndex(), request.getId(), 400,
1250 request.getOperation().getMetaData().getUrl()));
1254 // Make sure that the supplied document URL is formatted
1256 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1257 fails.add(generateRejectionEntry(request.getOperationType(),
1258 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1259 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1263 // Validate that the specified index actually exists before we
1264 // try to perform the update.
1265 if (!indexExists(request.getIndex())) {
1268 .add(generateRejectionEntry(request.getOperationType(),
1269 "Specified resource does not exist: "
1270 + request.getOperation().getMetaData().getUrl(),
1271 request.getIndex(), request.getId(), 404,
1272 request.getOperation().getMetaData().getUrl()));
1276 // Validate that the document we are trying to update actually
1277 // exists before we try to perform the update.
1278 if (!documentExists(request.getIndex(), request.getId())) {
1281 .add(generateRejectionEntry(request.getOperationType(),
1282 "Specified resource does not exist: "
1283 + request.getOperation().getMetaData().getUrl(),
1284 request.getIndex(), request.getId(), 404,
1285 request.getOperation().getMetaData().getUrl()));
1289 // It is mandatory that a version be supplied for an update
1291 // so validate that now.
1292 if (request.getOperation().getMetaData().getEtag() == null) {
1294 fails.add(generateRejectionEntry(request.getOperationType(),
1295 "Missing mandatory ETag field", request.getIndex(), request.getId(), 400,
1296 request.getOperation().getMetaData().getUrl()));
1300 // Generate the update request...
1301 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1302 request.getId(), request.getOperation().getMetaData().getEtag()));
1304 // ...and append the document that we want to update.
1306 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1307 } catch (JsonProcessingException e) {
1308 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1312 // Delete an existing document.
1315 // Make sure that the supplied document URL is formatted
1317 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1318 fails.add(generateRejectionEntry(request.getOperationType(),
1319 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1320 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1324 // Validate that the specified index actually exists before we
1325 // try to perform the delete.
1326 if (!indexExists(request.getIndex())) {
1329 .add(generateRejectionEntry(request.getOperationType(),
1330 "Specified resource does not exist: "
1331 + request.getOperation().getMetaData().getUrl(),
1332 request.getIndex(), request.getId(), 404,
1333 request.getOperation().getMetaData().getUrl()));
1337 // Validate that the document we are trying to update actually
1338 // exists before we try to perform the delete.
1339 if (!documentExists(request.getIndex(), request.getId())) {
1342 .add(generateRejectionEntry(request.getOperationType(),
1343 "Specified resource does not exist: "
1344 + request.getOperation().getMetaData().getUrl(),
1345 request.getIndex(), request.getId(), 404,
1346 request.getOperation().getMetaData().getUrl()));
1350 // It is mandatory that a version be supplied for a delete
1352 // so validate that now.
1353 if (request.getOperation().getMetaData().getEtag() == null) {
1355 fails.add(generateRejectionEntry(request.getOperationType(),
1356 "Missing mandatory ETag field", request.getIndex(), request.getId(), 400,
1357 request.getOperation().getMetaData().getUrl()));
1361 // Generate the delete request.
1362 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1363 request.getId(), request.getOperation().getMetaData().getEtag()));
1371 private boolean indexExists(String index) throws DocumentStoreOperationException {
1373 OperationResult indexExistsResult = checkIndexExistence(index);
1375 return ((indexExistsResult.getResultCode() >= 200)
1376 && (indexExistsResult.getResultCode() < 300));
1379 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1381 OperationResult docExistsResult = checkDocumentExistence(index, id);
1383 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1387 * This method constructs a status entry for a bulk operation which has been rejected before even
1388 * sending it to the document store.
1390 * @param rejectReason - A message describing why the operation was rejected.
1391 * @param anId - The identifier associated with the document being acted on.
1392 * @param statusCode - An HTTP status code.
1393 * @return - A result set item.
1395 private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason,
1396 String index, String anId, int statusCode, String originalUrl) {
1398 ElasticSearchError err = new ElasticSearchError();
1399 err.setReason(rejectReason);
1401 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1404 op.setStatus(statusCode);
1406 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1408 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1412 rejectionResult.setCreate(op);
1415 rejectionResult.setIndex(op);
1418 rejectionResult.setDelete(op);
1423 return rejectionResult;
1427 * This method takes the json structure returned from ElasticSearch in response to a bulk
1428 * operations request and marshals it into a Java object.
1430 * @param jsonResult - The bulk operations response returned from ElasticSearch.
1431 * @return - The marshalled response.
1432 * @throws JsonParseException
1433 * @throws JsonMappingException
1434 * @throws IOException
1436 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1437 throws JsonParseException, JsonMappingException, IOException {
1439 if (jsonResult != null) {
1440 if (logger.isDebugEnabled()) {
1441 logger.debug("ESController: Marshalling ES result set from json: "
1442 + jsonResult.replaceAll("\n", ""));
1445 ObjectMapper mapper = new ObjectMapper();
1446 mapper.setSerializationInclusion(Include.NON_EMPTY);
1448 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1455 * This method takes the marshalled ElasticSearch bulk response and converts it into a generic
1458 * @param esResult - ElasticSearch bulk operations response.
1459 * @return - A generic result set.
1461 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1462 List<ElasticSearchResultItem> rejectedOps) {
1465 int totalSuccess = 0;
1468 if (logger.isDebugEnabled()) {
1470 logger.debug("ESController: Build generic result set. ES Results: "
1471 + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: "
1472 + rejectedOps.toString());
1475 // Build a combined list of result items from the results returned
1476 // from ElasticSearch and the list of operations that we rejected
1477 // without sending to ElasticSearch.
1478 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1479 if (esResult != null) {
1480 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1482 combinedResults.addAll(rejectedOps);
1484 // Iterate over the individual results in the resulting result set.
1485 StringBuilder resultsBuilder = new StringBuilder();
1486 AtomicBoolean firstItem = new AtomicBoolean(true);
1487 for (ElasticSearchResultItem item : combinedResults) {
1489 // Increment the operation counts.
1491 if (isSuccessCode(item.operationStatus().getStatus())) {
1497 // Prepend a comma to our response string unless this it the
1498 // first result in the set.
1499 if (!firstItem.compareAndSet(true, false)) {
1500 resultsBuilder.append(", ");
1503 // Append the current result as a generic json structure.
1504 resultsBuilder.append(item.toJson());
1507 // Now, build the result string and return it.
1508 String responseBody = "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": "
1509 + totalSuccess + ", " + "\"total_fails\": " + totalFails + ", " + "\"results\": ["
1510 + resultsBuilder.toString() + "]}";
1512 return responseBody;
1516 * This method queryies ElasticSearch to determine if the supplied index is present in the
1519 * @param indexName - The index to look for.
1520 * @return - An operation result indicating the success or failure of the check.
1521 * @throws DocumentStoreOperationException
1523 public OperationResult checkIndexExistence(String indexName)
1524 throws DocumentStoreOperationException {
1526 // Initialize operation result with a failure codes / fault string
1527 OperationResult opResult = new OperationResult();
1528 opResult.setResultCode(500);
1530 // Grab the current time so we can use it to generate a metrics log.
1531 MdcOverride override = getStartTime(new MdcOverride());
1533 String fullUrl = getFullUrl("/" + indexName, false);
1534 HttpURLConnection conn = initializeConnection(fullUrl);
1537 conn.setRequestMethod("HEAD");
1539 } catch (ProtocolException e) {
1540 shutdownConnection(conn);
1541 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1544 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1548 resultCode = conn.getResponseCode();
1549 } catch (IOException e) {
1550 shutdownConnection(conn);
1551 throw new DocumentStoreOperationException(
1552 "Failed to get the response code from the connection.", e);
1554 logger.debug("Response Code : " + resultCode);
1556 opResult.setResultCode(resultCode);
1558 // Generate a metrics log so we can track how long the operation took.
1559 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1560 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1561 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1562 override, indexName);
1564 shutdownConnection(conn);
1569 private void buildDocumentResult(DocumentOperationResult result, String index)
1570 throws DocumentStoreOperationException {
1572 JSONParser parser = new JSONParser();
1575 root = (JSONObject) parser.parse(result.getResult());
1577 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1578 // Success response object
1579 Document doc = new Document();
1580 doc.setEtag(result.getResultVersion());
1581 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1583 doc.setContent((JSONObject) root.get("_source"));
1584 result.setDocument(doc);
1587 // Error response object
1588 JSONObject error = (JSONObject) root.get("error");
1589 if (error != null) {
1591 new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1595 } catch (Exception e) {
1596 throw new DocumentStoreOperationException(
1597 "Failed to parse Elastic Search response." + result.getResult());
1602 private String buildDocumentResponseUrl(String index, String id) {
1603 return ApiUtils.buildDocumentUri(index, id);
1606 private void buildSearchResult(SearchOperationResult result, String index)
1607 throws DocumentStoreOperationException {
1609 JSONParser parser = new JSONParser();
1613 root = (JSONObject) parser.parse(result.getResult());
1614 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1615 JSONObject hits = (JSONObject) root.get("hits");
1616 JSONArray hitArray = (JSONArray) hits.get("hits");
1617 SearchHits searchHits = new SearchHits();
1618 searchHits.setTotalHits(hits.get("total").toString());
1619 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1621 for (int i = 0; i < hitArray.size(); i++) {
1622 JSONObject hit = (JSONObject) hitArray.get(i);
1623 SearchHit searchHit = new SearchHit();
1624 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1625 Document doc = new Document();
1626 if (hit.get("_version") != null) {
1627 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1630 doc.setUrl(buildDocumentResponseUrl(index,
1631 (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1632 doc.setContent((JSONObject) hit.get("_source"));
1633 searchHit.setDocument(doc);
1634 searchHitArray.add(searchHit);
1636 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1637 result.setSearchResult(searchHits);
1639 JSONObject aggregations = (JSONObject) root.get("aggregations");
1640 if (aggregations != null) {
1641 AggregationResult[] aggResults =
1642 AggregationParsingUtil.parseAggregationResults(aggregations);
1643 AggregationResults aggs = new AggregationResults();
1644 aggs.setAggregations(aggResults);
1645 result.setAggregationResult(aggs);
1650 JSONObject error = (JSONObject) root.get("error");
1651 if (error != null) {
1653 new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1656 } catch (Exception e) {
1657 throw new DocumentStoreOperationException(
1658 "Failed to parse Elastic Search response." + result.getResult());
1663 private void buildSuggestResult(SearchOperationResult result, String index)
1664 throws DocumentStoreOperationException {
1666 JSONParser parser = new JSONParser();
1669 root = (JSONObject) parser.parse(result.getResult());
1670 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1671 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1672 JSONObject hitdata = (JSONObject) hitArray.get(0);
1673 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1674 SuggestHits suggestHits = new SuggestHits();
1675 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1677 ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit>();
1679 for (int i = 0; i < optionsArray.size(); i++) {
1680 JSONObject hit = (JSONObject) optionsArray.get(i);
1682 SuggestHit suggestHit = new SuggestHit();
1683 suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1684 suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1685 Document doc = new Document();
1686 if (hit.get("_version") != null) {
1687 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1689 doc.setUrl(buildDocumentResponseUrl(index,
1690 (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1692 doc.setContent((JSONObject) hit.get("payload"));
1693 suggestHit.setDocument(doc);
1694 suggestHitArray.add(suggestHit);
1696 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1697 result.setSuggestResult(suggestHits);
1699 JSONObject aggregations = (JSONObject) root.get("aggregations");
1700 if (aggregations != null) {
1701 AggregationResult[] aggResults =
1702 AggregationParsingUtil.parseAggregationResults(aggregations);
1703 AggregationResults aggs = new AggregationResults();
1704 aggs.setAggregations(aggResults);
1705 result.setAggregationResult(aggs);
1710 JSONObject error = (JSONObject) root.get("error");
1711 if (error != null) {
1713 new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1716 } catch (Exception e) {
1717 throw new DocumentStoreOperationException(
1718 "Failed to parse Elastic Search response." + result.getResult());