2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23 import com.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import org.json.simple.JSONArray;
30 import org.json.simple.JSONObject;
31 import org.json.simple.parser.JSONParser;
32 import org.json.simple.parser.ParseException;
33 import org.onap.aai.sa.rest.AnalysisConfiguration;
34 import org.onap.aai.sa.rest.ApiUtils;
35 import org.onap.aai.sa.rest.BulkRequest;
36 import org.onap.aai.sa.rest.BulkRequest.OperationType;
37 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
39 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
41 import org.onap.aai.sa.searchdbabstraction.entity.Document;
42 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
48 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
49 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
50 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
52 import org.onap.aai.cl.api.LogFields;
53 import org.onap.aai.cl.api.LogLine;
54 import org.onap.aai.cl.api.Logger;
55 import org.onap.aai.cl.eelf.LoggerFactory;
56 import org.onap.aai.cl.mdc.MdcContext;
57 import org.onap.aai.cl.mdc.MdcOverride;
58 import org.onap.aai.sa.rest.DocumentSchema;
59 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
60 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
62 import java.io.BufferedReader;
64 import java.io.FileInputStream;
65 import java.io.IOException;
66 import java.io.InputStream;
67 import java.io.InputStreamReader;
68 import java.io.OutputStream;
69 import java.io.OutputStreamWriter;
70 import java.io.PrintWriter;
71 import java.io.StringWriter;
72 import java.net.HttpURLConnection;
73 import java.net.MalformedURLException;
74 import java.net.ProtocolException;
76 import java.text.SimpleDateFormat;
77 import java.util.ArrayList;
78 import java.util.Arrays;
79 import java.util.List;
80 import java.util.Properties;
81 import java.util.concurrent.atomic.AtomicBoolean;
82 import javax.ws.rs.core.Response.Status;
86 * This class has the Elasticsearch implementation of the
87 * DB operations defined in DocumentStoreInterface.
89 public class ElasticSearchHttpController implements DocumentStoreInterface {
91 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
92 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
93 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
94 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
95 private static final String BULK_IMPORT_INDEX_TEMPLATE =
96 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
97 private static final String BULK_DELETE_TEMPLATE =
98 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
100 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
101 "Internal Error: ElasticSearch operation fault occurred";
102 private static final Logger logger = LoggerFactory.getInstance()
103 .getLogger(ElasticSearchHttpController.class.getName());
104 private static final Logger metricsLogger = LoggerFactory.getInstance()
105 .getMetricsLogger(ElasticSearchHttpController.class.getName());
106 private final ElasticSearchConfig config;
108 private static final String DEFAULT_TYPE = "default";
110 private static ElasticSearchHttpController instance = null;
112 protected AnalysisConfiguration analysisConfig;
114 public static ElasticSearchHttpController getInstance() {
116 synchronized (ElasticSearchHttpController.class) {
118 if (instance == null) {
120 Properties properties = new Properties();
121 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
123 properties.load(new FileInputStream(file));
124 } catch (Exception e) {
125 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
126 "ElasticSearchHTTPController.getInstance",
127 e.getLocalizedMessage());
130 ElasticSearchConfig config = new ElasticSearchConfig(properties);
131 instance = new ElasticSearchHttpController(config);
138 public ElasticSearchHttpController(ElasticSearchConfig config) {
139 this.config = config;
140 analysisConfig = new AnalysisConfiguration();
143 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
145 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
146 } catch (Exception e) {
147 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
148 getFullUrl("", false), e.getMessage());
153 public AnalysisConfiguration getAnalysisConfig() {
154 return analysisConfig;
158 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
160 OperationResult result = new OperationResult();
161 result.setResultCode(500);
165 // Submit the request to ElasticSearch to create the index using a
166 // default document type.
167 result = createTable(index,
169 analysisConfig.getEsIndexSettings(),
170 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
172 // ElasticSearch will return us a 200 code on success when we
173 // want to report a 201, so translate the result here.
174 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
175 if (isSuccess(result)) {
176 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
177 //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
180 } catch (DocumentStoreOperationException e) {
182 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
189 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
190 OperationResult result = new OperationResult();
191 result.setResultCode(500);
194 result = createTable(index, dynamicSchema);
196 // ElasticSearch will return us a 200 code on success when we
197 // want to report a 201, so translate the result here.
198 result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
199 if (isSuccess(result)) {
200 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
202 } catch (DocumentStoreOperationException e) {
203 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
211 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
213 //Initialize operation result with a failure codes / fault string
214 OperationResult opResult = new OperationResult();
215 opResult.setResultCode(500);
216 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
218 // Grab the current time so we can use it to generate a metrics log.
219 MdcOverride override = getStartTime(new MdcOverride());
221 String fullUrl = getFullUrl("/" + indexName + "/", false);
222 HttpURLConnection conn = initializeConnection(fullUrl);
224 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
227 conn.setRequestMethod("DELETE");
228 } catch (ProtocolException e) {
229 shutdownConnection(conn);
230 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
233 handleResponse(conn, opResult);
235 // Generate a metrics log so we can track how long the operation took.
236 metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
238 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
239 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
243 shutdownConnection(conn);
249 private OperationResult checkConnection() throws Exception {
251 String fullUrl = getFullUrl("/_cluster/health", false);
253 HttpURLConnection conn = null;
255 url = new URL(fullUrl);
256 conn = (HttpURLConnection) url.openConnection();
257 conn.setRequestMethod("GET");
258 conn.setDoOutput(true);
259 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
261 int resultCode = conn.getResponseCode();
262 logger.debug("getClusterHealth() response Code : " + resultCode);
263 OperationResult opResult = new OperationResult();
264 opResult.setResultCode(resultCode);
266 shutdownConnection(conn);
271 private String getFullUrl(String resourceUrl, boolean isSecure) {
273 final String host = config.getIpAddress();
274 final String port = config.getHttpPort();
277 return String.format("https://%s:%s%s", host, port, resourceUrl);
279 return String.format("http://%s:%s%s", host, port, resourceUrl);
283 private void shutdownConnection(HttpURLConnection connection) {
284 if (connection == null) {
288 InputStream inputstream = null;
289 OutputStream outputstream = null;
292 inputstream = connection.getInputStream();
293 } catch (IOException e) {
294 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
296 if (inputstream != null) {
299 } catch (IOException e) {
300 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
301 e.getLocalizedMessage());
307 outputstream = connection.getOutputStream();
308 } catch (IOException e) {
309 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
311 if (outputstream != null) {
313 outputstream.close();
314 } catch (IOException e) {
315 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
316 e.getLocalizedMessage());
321 connection.disconnect();
325 protected OperationResult createTable(String indexName, String typeName,
326 String indexSettings, String indexMappings)
327 throws DocumentStoreOperationException {
329 if (indexSettings == null) {
330 logger.debug("No settings provided.");
333 if (indexMappings == null) {
334 logger.debug("No mappings provided.");
337 OperationResult opResult = new OperationResult();
339 // Initialize operation result with a failure codes / fault string
340 opResult.setResultCode(500);
341 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
343 // Grab the current time so we can use it to generate a metrics log.
344 MdcOverride override = getStartTime(new MdcOverride());
346 String fullUrl = getFullUrl("/" + indexName + "/", false);
347 HttpURLConnection conn = initializeConnection(fullUrl);
350 conn.setRequestMethod("PUT");
351 } catch (ProtocolException e) {
352 shutdownConnection(conn);
353 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
356 StringBuilder sb = new StringBuilder(128);
357 sb.append("{ \"settings\" : ");
358 sb.append(indexSettings);
361 sb.append("\"mappings\" : {");
362 sb.append("\"" + typeName + "\" :");
363 sb.append(indexMappings);
366 attachContent(conn, sb.toString());
368 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
369 logger.debug("Request content: " + sb.toString());
371 handleResponse(conn, opResult);
373 shutdownConnection(conn);
375 // Generate a metrics log so we can track how long the operation took.
376 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
378 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
379 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
387 * Will send the passed in JSON payload to Elasticsearch using the
388 * provided index name in an attempt to create the index.
390 * @param indexName - The name of the index to be created
391 * @param settingsAndMappings - The actual JSON object that will define the index
392 * @return - The operation result of writing into Elasticsearch
393 * @throws DocumentStoreOperationException
395 protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
396 OperationResult result = new OperationResult();
397 result.setResultCode(500);
398 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
400 // Grab the current time so we can use it to generate a metrics log.
401 MdcOverride override = getStartTime(new MdcOverride());
403 String fullUrl = getFullUrl("/" + indexName + "/", false);
404 HttpURLConnection conn = initializeConnection(fullUrl);
407 conn.setRequestMethod("PUT");
408 } catch (ProtocolException e) {
409 shutdownConnection(conn);
410 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
413 attachContent(conn, settingsAndMappings);
414 handleResponse(conn, result);
416 // Generate a metrics log so we can track how long the operation took.
417 metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
419 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
420 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
428 public DocumentOperationResult createDocument(String indexName,
429 DocumentStoreDataEntity document,
430 boolean allowImplicitIndexCreation)
431 throws DocumentStoreOperationException {
433 if(!allowImplicitIndexCreation) {
435 // Before we do anything, make sure that the specified index actually exists in the
436 // document store - we don't want to rely on ElasticSearch to fail the document
437 // create because it could be configured to implicitly create a non-existent index,
438 // which can lead to hard-to-debug behaviour with queries down the road.
439 OperationResult indexExistsResult = checkIndexExistence(indexName);
440 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
442 DocumentOperationResult opResult = new DocumentOperationResult();
443 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
444 opResult.setResult("Document Index '" + indexName + "' does not exist.");
445 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
450 if (document.getId() == null || document.getId().isEmpty()) {
451 return createDocumentWithoutId(indexName, document);
453 return createDocumentWithId(indexName, document);
457 private DocumentOperationResult createDocumentWithId(String indexName,
458 DocumentStoreDataEntity document)
459 throws DocumentStoreOperationException {
460 // check if the document already exists
461 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
464 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
465 if (opResult.getResultCode() == Status.OK.getStatusCode()) {
466 opResult.setFailureCause("A document with the same id already exists.");
468 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
470 opResult.setResultCode(Status.CONFLICT.getStatusCode());
474 opResult = new DocumentOperationResult();
475 // Initialize operation result with a failure codes / fault string
476 opResult.setResultCode(500);
477 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
479 // Grab the current time so we can use it to generate a metrics log.
480 MdcOverride override = getStartTime(new MdcOverride());
482 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
483 + "/" + document.getId(), false);
484 HttpURLConnection conn = initializeConnection(fullUrl);
487 conn.setRequestMethod("PUT");
488 } catch (ProtocolException e) {
489 shutdownConnection(conn);
490 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
493 attachDocument(conn, document);
495 logger.debug("Sending 'PUT' request to: " + conn.getURL());
497 handleResponse(conn, opResult);
498 buildDocumentResult(opResult, indexName);
500 // Generate a metrics log so we can track how long the operation took.
501 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
503 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
504 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
508 shutdownConnection(conn);
514 private DocumentOperationResult createDocumentWithoutId(String indexName,
515 DocumentStoreDataEntity document)
516 throws DocumentStoreOperationException {
518 DocumentOperationResult response = new DocumentOperationResult();
519 // Initialize operation result with a failure codes / fault string
520 response.setResultCode(500);
521 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
523 // Grab the current time so we can use it to generate a metrics log.
524 MdcOverride override = getStartTime(new MdcOverride());
526 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
527 HttpURLConnection conn = initializeConnection(fullUrl);
530 conn.setRequestMethod("POST");
531 } catch (ProtocolException e) {
532 shutdownConnection(conn);
533 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
536 attachDocument(conn, document);
538 logger.debug("Sending 'POST' request to: " + conn.getURL());
540 handleResponse(conn, response);
541 buildDocumentResult(response, indexName);
543 // Generate a metrics log so we can track how long the operation took.
544 metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
546 .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
547 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
551 shutdownConnection(conn);
556 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
557 throws DocumentStoreOperationException {
558 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
559 conn.setRequestProperty("Connection", "Close");
561 attachContent(conn, doc.getContentInJson());
564 private DocumentOperationResult checkDocumentExistence(String indexName,
566 throws DocumentStoreOperationException {
567 DocumentOperationResult opResult = new DocumentOperationResult();
569 // Initialize operation result with a failure codes / fault string
570 opResult.setResultCode(500);
572 // Grab the current time so we can use it to generate a metrics log.
573 MdcOverride override = getStartTime(new MdcOverride());
575 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
576 HttpURLConnection conn = initializeConnection(fullUrl);
579 conn.setRequestMethod("HEAD");
580 } catch (ProtocolException e) {
581 shutdownConnection(conn);
582 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
585 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
589 resultCode = conn.getResponseCode();
590 } catch (IOException e) {
591 shutdownConnection(conn);
592 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
595 logger.debug("Response Code : " + resultCode);
597 opResult.setResultCode(resultCode);
599 // Generate a metrics log so we can track how long the operation took.
600 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
602 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
603 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
608 shutdownConnection(conn);
614 public DocumentOperationResult updateDocument(String indexName,
615 DocumentStoreDataEntity document,
616 boolean allowImplicitIndexCreation)
617 throws DocumentStoreOperationException {
619 if(!allowImplicitIndexCreation) {
621 // Before we do anything, make sure that the specified index actually exists in the
622 // document store - we don't want to rely on ElasticSearch to fail the document
623 // create because it could be configured to implicitly create a non-existent index,
624 // which can lead to hard-to-debug behaviour with queries down the road.
625 OperationResult indexExistsResult = checkIndexExistence(indexName);
626 if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
628 DocumentOperationResult opResult = new DocumentOperationResult();
629 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
630 opResult.setResult("Document Index '" + indexName + "' does not exist.");
631 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
636 DocumentOperationResult opResult = new DocumentOperationResult();
638 // Initialize operation result with a failure codes / fault string
639 opResult.setResultCode(500);
640 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
642 // Grab the current time so we can use it to generate a metrics log.
643 MdcOverride override = getStartTime(new MdcOverride());
645 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
646 + "?version=" + document.getVersion(), false);
647 HttpURLConnection conn = initializeConnection(fullUrl);
650 conn.setRequestMethod("PUT");
651 } catch (ProtocolException e) {
652 shutdownConnection(conn);
653 throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
656 attachDocument(conn, document);
658 logger.debug("Sending 'PUT' request to: " + conn.getURL());
660 handleResponse(conn, opResult);
661 buildDocumentResult(opResult, indexName);
663 // Generate a metrics log so we can track how long the operation took.
664 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
666 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
667 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
672 shutdownConnection(conn);
678 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
679 throws DocumentStoreOperationException {
680 DocumentOperationResult opResult = new DocumentOperationResult();
682 // Initialize operation result with a failure codes / fault string
683 opResult.setResultCode(500);
684 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
686 // Grab the current time so we can use it to generate a metrics log.
687 MdcOverride override = getStartTime(new MdcOverride());
689 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
690 + "?version=" + document.getVersion(), false);
691 HttpURLConnection conn = initializeConnection(fullUrl);
694 conn.setRequestMethod("DELETE");
695 } catch (ProtocolException e) {
696 shutdownConnection(conn);
697 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
700 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
702 handleResponse(conn, opResult);
703 buildDocumentResult(opResult, indexName);
704 //supress the etag and url in response for delete as they are not required
705 if (opResult.getDocument() != null) {
706 opResult.getDocument().setEtag(null);
707 opResult.getDocument().setUrl(null);
710 // Generate a metrics log so we can track how long the operation took.
711 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
713 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
714 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
719 shutdownConnection(conn);
725 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
726 throws DocumentStoreOperationException {
727 DocumentOperationResult opResult = new DocumentOperationResult();
729 // Initialize operation result with a failure codes / fault string
730 opResult.setResultCode(500);
731 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
733 // Grab the current time so we can use it to generate a metrics log.
734 MdcOverride override = getStartTime(new MdcOverride());
736 String fullUrl = null;
737 if (document.getVersion() == null) {
738 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
740 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
741 + "?version=" + document.getVersion(), false);
743 HttpURLConnection conn = initializeConnection(fullUrl);
745 logger.debug("\nSending 'GET' request to: " + conn.getURL());
747 handleResponse(conn, opResult);
748 buildDocumentResult(opResult, indexName);
750 // Generate a metrics log so we can track how long the operation took.
751 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
753 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
754 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
759 shutdownConnection(conn);
764 public SearchOperationResult search(String indexName, String queryString)
765 throws DocumentStoreOperationException {
766 SearchOperationResult opResult = new SearchOperationResult();
768 // Initialize operation result with a failure codes / fault string
769 opResult.setResultCode(500);
770 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
772 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
774 // Grab the current time so we can use it to generate a metrics log.
775 MdcOverride override = getStartTime(new MdcOverride());
777 HttpURLConnection conn = initializeConnection(fullUrl);
780 conn.setRequestMethod("GET");
781 } catch (ProtocolException e) {
782 shutdownConnection(conn);
783 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
786 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
788 handleResponse(conn, opResult);
789 buildSearchResult(opResult, indexName);
792 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
794 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
795 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
803 public SearchOperationResult searchWithPayload(String indexName, String query)
804 throws DocumentStoreOperationException {
805 SearchOperationResult opResult = new SearchOperationResult();
807 if (logger.isDebugEnabled()) {
808 logger.debug("Querying index: " + indexName + " with query string: " + query);
811 // Initialize operation result with a failure codes / fault string
812 opResult.setResultCode(500);
813 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
815 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
817 // Grab the current time so we can use it to generate a metrics log.
818 MdcOverride override = getStartTime(new MdcOverride());
820 HttpURLConnection conn = initializeConnection(fullUrl);
823 conn.setRequestMethod("POST");
824 } catch (ProtocolException e) {
825 shutdownConnection(conn);
826 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
829 attachContent(conn, query);
831 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
832 logger.debug("Request body = Elasticsearch query = " + query);
834 handleResponse(conn, opResult);
835 buildSearchResult(opResult, indexName);
837 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
839 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
840 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
845 shutdownConnection(conn);
851 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
852 throws DocumentStoreOperationException {
854 SearchOperationResult opResult = new SearchOperationResult();
856 if (logger.isDebugEnabled()) {
857 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
860 // Initialize operation result with a failure codes / fault string
861 opResult.setResultCode(500);
862 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
864 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
866 // Grab the current time so we can use it to generate a metrics log.
867 MdcOverride override = getStartTime(new MdcOverride());
869 HttpURLConnection conn = initializeConnection(fullUrl);
872 conn.setRequestMethod("POST");
873 } catch (ProtocolException e) {
874 shutdownConnection(conn);
875 throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
878 attachContent(conn, query);
880 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
881 logger.debug("Request body = Elasticsearch query = " + query);
883 handleResponse(conn, opResult);
884 buildSuggestResult(opResult, indexName);
886 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
887 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
888 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
889 override, indexName, query);
891 shutdownConnection(conn);
896 private void attachContent(HttpURLConnection conn, String content)
897 throws DocumentStoreOperationException {
898 OutputStream outputStream = null;
899 OutputStreamWriter out = null;
902 outputStream = conn.getOutputStream();
903 } catch (IOException e) {
904 shutdownConnection(conn);
905 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
908 out = new OutputStreamWriter(outputStream);
913 } catch (IOException e) {
914 shutdownConnection(conn);
915 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
919 private HttpURLConnection initializeConnection(String fullUrl)
920 throws DocumentStoreOperationException {
922 HttpURLConnection conn = null;
925 url = new URL(fullUrl);
926 } catch (MalformedURLException e) {
927 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
931 conn = (HttpURLConnection) url.openConnection();
932 conn.setDoOutput(true);
933 } catch (IOException e) {
934 shutdownConnection(conn);
935 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
941 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
942 throws DocumentStoreOperationException {
943 int resultCode = 200;
946 resultCode = conn.getResponseCode();
947 } catch (IOException e) {
948 shutdownConnection(conn);
949 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
952 logger.debug("Response Code : " + resultCode);
954 InputStream inputStream = null;
956 if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
957 inputStream = conn.getErrorStream();
960 inputStream = conn.getInputStream();
961 } catch (IOException e) {
962 shutdownConnection(conn);
963 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
967 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
968 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
970 StringBuilder result = new StringBuilder(128);
971 String string = null;
974 while ((string = bufferedreader.readLine()) != null) {
975 result.append(string).append("\n");
977 } catch (IOException e) {
978 shutdownConnection(conn);
979 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
982 if (resultCode == Status.CONFLICT.getStatusCode()) {
983 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
985 opResult.setResultCode(resultCode);
987 if (logger.isDebugEnabled()) {
988 logger.debug("Raw result string from ElasticSearch = " + result.toString());
990 opResult.setResult(result.toString());
991 opResult.setResultVersion(extractVersion(result.toString()));
994 private String extractVersion(String result) throws DocumentStoreOperationException {
996 JSONParser parser = new JSONParser();
997 String version = null;
999 JSONObject root = (JSONObject) parser.parse(result);
1000 if (root.get("_version") != null) {
1001 version = root.get("_version").toString();
1004 } catch (ParseException e) {
1006 // Not all responses from ElasticSearch include a version, so
1007 // if we don't get one back, just return an empty string rather
1008 // than trigger a false failure.
1015 * This convenience method gets the current system time and stores
1016 * it in an attribute in the supplied {@link MdcOverride} object so
1017 * that it can be used later by the metrics logger.
1019 * @param override - The {@link MdcOverride} object to update.
1020 * @return - The supplied {@link MdcOverride} object.
1022 private MdcOverride getStartTime(MdcOverride override) {
1024 // Grab the current time...
1025 long startTimeInMs = System.currentTimeMillis();
1027 // ...and add it as an attribute to the supplied MDC Override
1029 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1030 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1032 // Return the MdcOverride object that we were passed.
1033 // This looks odd, but it allows us to do stuff like:
1035 // MdcOverride ov = getStartTime(new MdcOverride())
1037 // which is quite handy, but also allows us to pass in an existing
1038 // MdcOverride object which already has some attributes set.
1042 private boolean isSuccess(OperationResult result) {
1044 return isSuccessCode(result.getResultCode());
1048 private boolean isSuccessCode(int statusCode) {
1049 return ((statusCode >= 200) && (statusCode < 300));
1054 public OperationResult performBulkOperations(BulkRequest[] requests)
1055 throws DocumentStoreOperationException {
1057 if (logger.isDebugEnabled()) {
1058 String dbgString = "ESController: performBulkOperations - Operations: ";
1060 for (BulkRequest request : requests) {
1061 dbgString += "[" + request.toString() + "] ";
1064 logger.debug(dbgString);
1067 // Grab the current time so we can use it to generate a metrics log.
1068 MdcOverride override = getStartTime(new MdcOverride());
1070 // Parse the supplied set of operations.
1071 // Iterate over the list of operations which we were provided and
1072 // translate them into a format that ElasticSearh understands.
1074 StringBuilder esOperationSet = new StringBuilder(128);
1075 List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1076 for (BulkRequest request : requests) {
1078 // Convert the request to the syntax ElasticSearch likes.
1079 if (buildEsOperation(request, esOperationSet, rejected)) {
1084 ElasticSearchBulkOperationResult opResult = null;
1087 // Open an HTTP connection to the ElasticSearch back end.
1088 String fullUrl = getFullUrl("/_bulk", false);
1090 HttpURLConnection conn;
1093 url = new URL(fullUrl);
1094 conn = (HttpURLConnection) url.openConnection();
1095 conn.setRequestMethod("PUT");
1096 conn.setDoOutput(true);
1097 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1098 conn.setRequestProperty("Connection", "Close");
1100 } catch (IOException e) {
1102 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1103 if (logger.isDebugEnabled()) {
1104 logger.debug(Throwables.getStackTraceAsString(e));
1107 throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: "
1108 + e.getMessage(), e);
1111 StringBuilder bulkResult = new StringBuilder(128);
1113 // Create an output stream to write our request to.
1114 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1117 if (logger.isDebugEnabled()) {
1118 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1119 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1123 // Write the resulting request string to our output stream. (this sends the request to ES?)
1124 out.write(esOperationSet.toString());
1127 // Open an input stream on our connection in order to read back the results.
1128 InputStream is = conn.getInputStream();
1129 InputStreamReader inputstreamreader = new InputStreamReader(is);
1130 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1132 // Read the contents of the input stream into our result string...
1133 String esResponseString = null;
1135 while ((esResponseString = bufferedreader.readLine()) != null) {
1136 bulkResult.append(esResponseString).append("\n");
1139 } catch (IOException e) {
1141 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1142 if (logger.isDebugEnabled()) {
1143 StringWriter sw = new StringWriter();
1144 e.printStackTrace(new PrintWriter(sw));
1145 logger.debug(sw.toString());
1148 throw new DocumentStoreOperationException("Failure interacting with document store. Cause: "
1149 + e.getMessage(), e);
1152 if (logger.isDebugEnabled()) {
1153 logger.debug("ESController: Received result string from ElasticSearch: = "
1154 + bulkResult.toString());
1157 // ...and marshal the resulting string into a Java object.
1159 opResult = marshallEsBulkResult(bulkResult.toString());
1161 } catch (IOException e) {
1163 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1164 if (logger.isDebugEnabled()) {
1165 logger.debug(Throwables.getStackTraceAsString(e));
1168 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: "
1169 + e.getMessage(), e);
1173 // Finally, build the operation result and return it to the caller.
1174 OperationResult result = new OperationResult();
1175 result.setResultCode(207);
1176 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1178 // In the success case we don't want the entire result string to be
1179 // dumped into the metrics log, so concatenate it.
1180 String resultStringForMetricsLog = result.getResult();
1181 if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1182 resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1183 Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1186 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1188 .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1189 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1197 * This method converts a {@link BulkRequest} object into a json structure
1198 * which can be understood by ElasticSearch.
1200 * @param request - The request to be performed.
1201 * @param sb - The string builder to append the json data to
1202 * @throws DocumentStoreOperationException
1204 private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1205 List<ElasticSearchResultItem> fails)
1206 throws DocumentStoreOperationException {
1208 boolean retVal = true;
1209 OperationResult indexExistsResult = null;
1211 // What kind of operation are we performing?
1212 switch (request.getOperationType()) {
1214 // Create a new document.
1217 // Make sure that we were supplied a document payload.
1218 if (request.getOperation().getDocument() == null) {
1220 fails.add(generateRejectionEntry(request.getOperationType(),
1221 "Missing document payload",
1225 request.getOperation().getMetaData().getUrl()));
1229 // Make sure that the supplied document URL is formatted
1231 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1232 fails.add(generateRejectionEntry(request.getOperationType(),
1233 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1237 request.getOperation().getMetaData().getUrl()));
1241 // Validate that the specified index actually exists before we
1242 // try to perform the create.
1243 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1245 fails.add(generateRejectionEntry(request.getOperationType(),
1246 "Specified resource does not exist: "
1247 + request.getOperation().getMetaData().getUrl(),
1251 request.getOperation().getMetaData().getUrl()));
1255 // If we were supplied an id for the new document, then
1256 // include it in the bulk operation to Elastic Search
1257 if (request.getId() == null) {
1259 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1263 // Otherwise, we just leave that parameter off and ElasticSearch
1264 // will generate one for us.
1266 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1273 // Append the document that we want to create.
1274 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1275 } catch (JsonProcessingException e) {
1276 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1281 // Update an existing document.
1284 // Make sure that we were supplied a document payload.
1285 if (request.getOperation().getDocument() == null) {
1287 fails.add(generateRejectionEntry(request.getOperationType(),
1288 "Missing document payload",
1292 request.getOperation().getMetaData().getUrl()));
1296 // Make sure that the supplied document URL is formatted
1298 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1299 fails.add(generateRejectionEntry(request.getOperationType(),
1300 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1304 request.getOperation().getMetaData().getUrl()));
1308 // Validate that the specified index actually exists before we
1309 // try to perform the update.
1310 if (!indexExists(request.getIndex())) {
1312 fails.add(generateRejectionEntry(request.getOperationType(),
1313 "Specified resource does not exist: "
1314 + request.getOperation().getMetaData().getUrl(),
1318 request.getOperation().getMetaData().getUrl()));
1322 // Validate that the document we are trying to update actually
1323 // exists before we try to perform the update.
1324 if (!documentExists(request.getIndex(), request.getId())) {
1326 fails.add(generateRejectionEntry(request.getOperationType(),
1327 "Specified resource does not exist: "
1328 + request.getOperation().getMetaData().getUrl(),
1332 request.getOperation().getMetaData().getUrl()));
1336 // It is mandatory that a version be supplied for an update operation,
1337 // so validate that now.
1338 if (request.getOperation().getMetaData().getEtag() == null) {
1340 fails.add(generateRejectionEntry(request.getOperationType(),
1341 "Missing mandatory ETag field",
1345 request.getOperation().getMetaData().getUrl()));
1349 // Generate the update request...
1350 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1354 request.getOperation().getMetaData().getEtag()));
1356 // ...and append the document that we want to update.
1358 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1359 } catch (JsonProcessingException e) {
1360 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1364 // Delete an existing document.
1367 // Make sure that the supplied document URL is formatted
1369 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1370 fails.add(generateRejectionEntry(request.getOperationType(),
1371 "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1375 request.getOperation().getMetaData().getUrl()));
1379 // Validate that the specified index actually exists before we
1380 // try to perform the delete.
1381 if (!indexExists(request.getIndex())) {
1383 fails.add(generateRejectionEntry(request.getOperationType(),
1384 "Specified resource does not exist: "
1385 + request.getOperation().getMetaData().getUrl(),
1389 request.getOperation().getMetaData().getUrl()));
1393 // Validate that the document we are trying to update actually
1394 // exists before we try to perform the delete.
1395 if (!documentExists(request.getIndex(), request.getId())) {
1397 fails.add(generateRejectionEntry(request.getOperationType(),
1398 "Specified resource does not exist: "
1399 + request.getOperation().getMetaData().getUrl(),
1403 request.getOperation().getMetaData().getUrl()));
1407 // It is mandatory that a version be supplied for a delete operation,
1408 // so validate that now.
1409 if (request.getOperation().getMetaData().getEtag() == null) {
1411 fails.add(generateRejectionEntry(request.getOperationType(),
1412 "Missing mandatory ETag field",
1416 request.getOperation().getMetaData().getUrl()));
1420 // Generate the delete request.
1421 sb.append(String.format(BULK_DELETE_TEMPLATE,
1425 request.getOperation().getMetaData().getEtag()));
1433 private boolean indexExists(String index) throws DocumentStoreOperationException {
1435 OperationResult indexExistsResult = checkIndexExistence(index);
1437 return ((indexExistsResult.getResultCode() >= 200)
1438 && (indexExistsResult.getResultCode() < 300));
1441 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1443 OperationResult docExistsResult = checkDocumentExistence(index, id);
1445 return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1449 * This method constructs a status entry for a bulk operation which has
1450 * been rejected before even sending it to the document store.
1452 * @param rejectReason - A message describing why the operation was rejected.
1453 * @param anId - The identifier associated with the document being
1455 * @param statusCode - An HTTP status code.
1456 * @return - A result set item.
1458 private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1459 String rejectReason,
1463 String originalUrl) {
1465 ElasticSearchError err = new ElasticSearchError();
1466 err.setReason(rejectReason);
1468 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1471 op.setStatus(statusCode);
1473 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1475 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1479 rejectionResult.setCreate(op);
1482 rejectionResult.setIndex(op);
1485 rejectionResult.setDelete(op);
1490 return rejectionResult;
1495 * This method takes the json structure returned from ElasticSearch in
1496 * response to a bulk operations request and marshals it into a Java
1499 * @param jsonResult - The bulk operations response returned from
1501 * @return - The marshalled response.
1502 * @throws JsonParseException
1503 * @throws JsonMappingException
1504 * @throws IOException
1506 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1507 throws JsonParseException, JsonMappingException, IOException {
1509 if (jsonResult != null) {
1510 if (logger.isDebugEnabled()) {
1511 logger.debug("ESController: Marshalling ES result set from json: "
1512 + jsonResult.replaceAll("\n", ""));
1515 ObjectMapper mapper = new ObjectMapper();
1516 mapper.setSerializationInclusion(Include.NON_EMPTY);
1518 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1526 * This method takes the marshalled ElasticSearch bulk response and
1527 * converts it into a generic response payload.
1529 * @param esResult - ElasticSearch bulk operations response.
1530 * @return - A generic result set.
1532 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1533 List<ElasticSearchResultItem> rejectedOps) {
1536 int totalSuccess = 0;
1539 if (logger.isDebugEnabled()) {
1541 logger.debug("ESController: Build generic result set. ES Results: "
1542 + ((esResult != null) ? esResult.toString() : "[]")
1543 + " Rejected Ops: " + rejectedOps.toString());
1546 // Build a combined list of result items from the results returned
1547 // from ElasticSearch and the list of operations that we rejected
1548 // without sending to ElasticSearch.
1549 List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1550 if (esResult != null) {
1551 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1553 combinedResults.addAll(rejectedOps);
1555 // Iterate over the individual results in the resulting result set.
1556 StringBuilder resultsBuilder = new StringBuilder();
1557 AtomicBoolean firstItem = new AtomicBoolean(true);
1558 for (ElasticSearchResultItem item : combinedResults) {
1560 // Increment the operation counts.
1562 if (isSuccessCode(item.operationStatus().getStatus())) {
1568 // Prepend a comma to our response string unless this it the
1569 // first result in the set.
1570 if (!firstItem.compareAndSet(true, false)) {
1571 resultsBuilder.append(", ");
1574 // Append the current result as a generic json structure.
1575 resultsBuilder.append(item.toJson());
1578 // Now, build the result string and return it.
1579 String responseBody = "{ \"total_operations\": " + totalOps + ", "
1580 + "\"total_success\": " + totalSuccess + ", "
1581 + "\"total_fails\": " + totalFails + ", "
1583 + resultsBuilder.toString()
1586 return responseBody;
1591 * This method queryies ElasticSearch to determine if the supplied
1592 * index is present in the document store.
1594 * @param indexName - The index to look for.
1595 * @return - An operation result indicating the success or failure of
1597 * @throws DocumentStoreOperationException
1599 public OperationResult checkIndexExistence(String indexName)
1600 throws DocumentStoreOperationException {
1602 // Initialize operation result with a failure codes / fault string
1603 OperationResult opResult = new OperationResult();
1604 opResult.setResultCode(500);
1606 // Grab the current time so we can use it to generate a metrics log.
1607 MdcOverride override = getStartTime(new MdcOverride());
1609 String fullUrl = getFullUrl("/" + indexName, false);
1610 HttpURLConnection conn = initializeConnection(fullUrl);
1613 conn.setRequestMethod("HEAD");
1615 } catch (ProtocolException e) {
1616 shutdownConnection(conn);
1617 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1620 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1624 resultCode = conn.getResponseCode();
1625 } catch (IOException e) {
1626 shutdownConnection(conn);
1627 throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1629 logger.debug("Response Code : " + resultCode);
1631 opResult.setResultCode(resultCode);
1633 // Generate a metrics log so we can track how long the operation took.
1634 metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1636 .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1637 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1641 shutdownConnection(conn);
1647 private void buildDocumentResult(DocumentOperationResult result, String index)
1648 throws DocumentStoreOperationException {
1650 JSONParser parser = new JSONParser();
1653 root = (JSONObject) parser.parse(result.getResult());
1655 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1656 // Success response object
1657 Document doc = new Document();
1658 doc.setEtag(result.getResultVersion());
1659 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1661 doc.setContent((JSONObject) root.get("_source"));
1662 result.setDocument(doc);
1665 // Error response object
1666 JSONObject error = (JSONObject) root.get("error");
1667 if (error != null) {
1668 result.setError(new ErrorResult(error.get("type").toString(),
1669 error.get("reason").toString()));
1673 } catch (Exception e) {
1674 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1675 + result.getResult());
1681 private String buildDocumentResponseUrl(String index, String id) {
1682 return ApiUtils.buildDocumentUri(index, id);
1685 private void buildSearchResult(SearchOperationResult result, String index)
1686 throws DocumentStoreOperationException {
1688 JSONParser parser = new JSONParser();
1692 root = (JSONObject) parser.parse(result.getResult());
1693 if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1694 JSONObject hits = (JSONObject) root.get("hits");
1695 JSONArray hitArray = (JSONArray) hits.get("hits");
1696 SearchHits searchHits = new SearchHits();
1697 searchHits.setTotalHits(hits.get("total").toString());
1698 ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1700 for (int i = 0; i < hitArray.size(); i++) {
1701 JSONObject hit = (JSONObject) hitArray.get(i);
1702 SearchHit searchHit = new SearchHit();
1703 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1704 Document doc = new Document();
1705 if (hit.get("_version") != null) {
1706 doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1709 doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1710 ? hit.get("_id").toString() : ""));
1711 doc.setContent((JSONObject) hit.get("_source"));
1712 searchHit.setDocument(doc);
1713 searchHitArray.add(searchHit);
1715 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1716 result.setSearchResult(searchHits);
1718 JSONObject aggregations = (JSONObject) root.get("aggregations");
1719 if (aggregations != null) {
1720 AggregationResult[] aggResults =
1721 AggregationParsingUtil.parseAggregationResults(aggregations);
1722 AggregationResults aggs = new AggregationResults();
1723 aggs.setAggregations(aggResults);
1724 result.setAggregationResult(aggs);
1729 JSONObject error = (JSONObject) root.get("error");
1730 if (error != null) {
1731 result.setError(new ErrorResult(error.get("type").toString(),
1732 error.get("reason").toString()));
1735 } catch (Exception e) {
1736 throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1737 + result.getResult());
1742 private void buildSuggestResult(SearchOperationResult result, String index)
1743 throws DocumentStoreOperationException {
1745 JSONParser parser = new JSONParser ();
1748 root = (JSONObject) parser.parse ( result.getResult () );
1749 if (result.getResultCode () >= 200 && result.getResultCode () <= 299) {
1750 JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" );
1751 JSONObject hitdata = (JSONObject) hitArray.get ( 0 );
1752 JSONArray optionsArray = (JSONArray) hitdata.get ( "options" );
1753 SuggestHits suggestHits = new SuggestHits ();
1754 suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) );
1756 ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> ();
1758 for (int i = 0; i < optionsArray.size (); i++) {
1759 JSONObject hit = (JSONObject) optionsArray.get ( i );
1761 SuggestHit suggestHit = new SuggestHit ();
1762 suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" );
1763 suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" );
1764 Document doc = new Document ();
1765 if (hit.get ( "_version" ) != null) {
1766 doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" );
1768 doc.setUrl ( buildDocumentResponseUrl ( index,
1769 (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) );
1771 doc.setContent ( (JSONObject) hit.get ( "payload" ) );
1772 suggestHit.setDocument ( doc );
1773 suggestHitArray.add ( suggestHit );
1775 suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) );
1776 result.setSuggestResult ( suggestHits );
1778 JSONObject aggregations = (JSONObject) root.get ( "aggregations" );
1779 if (aggregations != null) {
1780 AggregationResult[] aggResults =
1781 AggregationParsingUtil.parseAggregationResults ( aggregations );
1782 AggregationResults aggs = new AggregationResults ();
1783 aggs.setAggregations ( aggResults );
1784 result.setAggregationResult ( aggs );
1789 JSONObject error = (JSONObject) root.get ( "error" );
1790 if (error != null) {
1792 new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) );
1795 } catch (Exception e) {
1796 throw new DocumentStoreOperationException (
1797 "Failed to parse Elastic Search response." + result.getResult () );