2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.json.simple.JSONArray;
57 import org.json.simple.JSONObject;
58 import org.json.simple.parser.JSONParser;
59 import org.json.simple.parser.ParseException;
60 import org.onap.aai.cl.api.LogFields;
61 import org.onap.aai.cl.api.LogLine;
62 import org.onap.aai.cl.api.Logger;
63 import org.onap.aai.cl.eelf.LoggerFactory;
64 import org.onap.aai.cl.mdc.MdcContext;
65 import org.onap.aai.cl.mdc.MdcOverride;
66 import org.onap.aai.sa.rest.AnalysisConfiguration;
67 import org.onap.aai.sa.rest.ApiUtils;
68 import org.onap.aai.sa.rest.BulkRequest;
69 import org.onap.aai.sa.rest.BulkRequest.OperationType;
70 import org.onap.aai.sa.rest.DocumentSchema;
71 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
73 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
75 import org.onap.aai.sa.searchdbabstraction.entity.Document;
76 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
77 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
80 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
81 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
84 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
85 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
86 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
87 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
88 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
91 * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
93 public class ElasticSearchHttpController implements DocumentStoreInterface {
95 private static ElasticSearchHttpController instance = null;
97 private static final Logger logger =
98 LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
99 private static final Logger metricsLogger =
100 LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
102 private static final String JSON_ATTR_VERSION = "_version";
103 private static final String JSON_ATTR_ERROR = "error";
104 private static final String JSON_ATTR_REASON = "reason";
106 private static final String QUERY_PARAM_VERSION = "?version=";
108 private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
109 private static final String MSG_RESPONSE_CODE = "Response Code : ";
110 private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
111 private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
112 private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
113 private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
114 "Internal Error: ElasticSearch operation fault occurred";
115 private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
116 "Failed to get the response code from the connection.";
117 private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
119 private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
120 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
121 private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
122 "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
123 private static final String BULK_IMPORT_INDEX_TEMPLATE =
124 "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
125 private static final String BULK_DELETE_TEMPLATE =
126 "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
128 private final ElasticSearchConfig config;
130 private static final String DEFAULT_TYPE = "default";
132 protected AnalysisConfiguration analysisConfig;
134 public static ElasticSearchHttpController getInstance() {
135 synchronized (ElasticSearchHttpController.class) {
136 if (instance == null) {
137 Properties properties = new Properties();
138 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
140 properties.load(new FileInputStream(file));
141 } catch (Exception e) {
142 logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
143 e.getLocalizedMessage());
146 ElasticSearchConfig config = new ElasticSearchConfig(properties);
147 instance = new ElasticSearchHttpController(config);
154 public ElasticSearchHttpController(ElasticSearchConfig config) {
155 this.config = config;
156 analysisConfig = new AnalysisConfiguration();
159 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
161 logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
162 } catch (Exception e) {
163 logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
168 public AnalysisConfiguration getAnalysisConfig() {
169 return analysisConfig;
173 public OperationResult createIndex(String index, DocumentSchema documentSchema) {
174 OperationResult result = new OperationResult();
175 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
179 // Submit the request to ElasticSearch to create the index using a
180 // default document type.
181 result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
182 DocumentSchemaUtil.generateDocumentMappings(documentSchema));
184 // ElasticSearch will return us a 200 code on success when we
185 // want to report a 201, so translate the result here.
186 if (result.getResultCode() == Status.OK.getStatusCode()) {
187 result.setResultCode(Status.CREATED.getStatusCode());
190 if (isSuccess(result)) {
191 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
193 } catch (DocumentStoreOperationException | IOException e) {
194 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
201 public OperationResult createDynamicIndex(String index, String dynamicSchema) {
202 OperationResult result = new OperationResult();
203 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
206 result = createTable(index, dynamicSchema);
208 // ElasticSearch will return us a 200 code on success when we
209 // want to report a 201, so translate the result here.
210 if (result.getResultCode() == Status.OK.getStatusCode()) {
211 result.setResultCode(Status.CREATED.getStatusCode());
213 if (isSuccess(result)) {
214 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
216 } catch (DocumentStoreOperationException e) {
217 result.setFailureCause("Document store operation failure. Cause: " + e.getMessage());
224 public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
226 // Initialize operation result with a failure codes / fault string
227 OperationResult opResult = new OperationResult();
228 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
229 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
231 // Grab the current time so we can use it to generate a metrics log.
232 MdcOverride override = getStartTime(new MdcOverride());
234 String fullUrl = getFullUrl("/" + indexName + "/", false);
235 HttpURLConnection conn = initializeConnection(fullUrl);
237 logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
240 conn.setRequestMethod("DELETE");
241 } catch (ProtocolException e) {
242 shutdownConnection(conn);
243 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
246 handleResponse(conn, opResult);
248 // Generate a metrics log so we can track how long the operation took.
250 .info(SearchDbMsgs.DELETE_INDEX_TIME,
251 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
252 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
253 override, indexName);
255 shutdownConnection(conn);
260 private OperationResult checkConnection() throws IOException {
261 String fullUrl = getFullUrl("/_cluster/health", false);
263 HttpURLConnection conn = null;
265 url = new URL(fullUrl);
266 conn = (HttpURLConnection) url.openConnection();
267 conn.setRequestMethod("GET");
268 conn.setDoOutput(true);
269 logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
271 int resultCode = conn.getResponseCode();
272 logger.debug("getClusterHealth() response Code : " + resultCode);
273 OperationResult opResult = new OperationResult();
274 opResult.setResultCode(resultCode);
276 shutdownConnection(conn);
281 private String getFullUrl(String resourceUrl, boolean isSecure) {
283 final String host = config.getIpAddress();
284 final String port = config.getHttpPort();
287 return String.format("https://%s:%s%s", host, port, resourceUrl);
289 return String.format("http://%s:%s%s", host, port, resourceUrl);
293 private void shutdownConnection(HttpURLConnection connection) {
294 if (connection == null) {
298 final String methodName = "shutdownConnection";
299 InputStream inputstream = null;
300 OutputStream outputstream = null;
303 inputstream = connection.getInputStream();
304 } catch (IOException e) {
305 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
307 if (inputstream != null) {
310 } catch (IOException e) {
311 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
317 outputstream = connection.getOutputStream();
318 } catch (IOException e) {
319 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
321 if (outputstream != null) {
323 outputstream.close();
324 } catch (IOException e) {
325 logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
330 connection.disconnect();
334 protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
335 throws DocumentStoreOperationException {
337 if (indexSettings == null) {
338 logger.debug("No settings provided.");
341 if (indexMappings == null) {
342 logger.debug("No mappings provided.");
345 OperationResult opResult = new OperationResult();
347 // Initialize operation result with a failure codes / fault string
348 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
349 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
351 // Grab the current time so we can use it to generate a metrics log.
352 MdcOverride override = getStartTime(new MdcOverride());
354 String fullUrl = getFullUrl("/" + indexName + "/", false);
355 HttpURLConnection conn = initializeConnection(fullUrl);
358 conn.setRequestMethod("PUT");
359 } catch (ProtocolException e) {
360 shutdownConnection(conn);
361 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
364 StringBuilder sb = new StringBuilder(128);
365 sb.append("{ \"settings\" : ");
366 sb.append(indexSettings);
369 sb.append("\"mappings\" : {");
370 sb.append("\"" + typeName + "\" :");
371 sb.append(indexMappings);
375 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
376 } catch (IOException e) {
377 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
378 throw new DocumentStoreOperationException(e.getMessage(), e);
381 logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
382 logger.debug("Request content: " + sb.toString());
384 handleResponse(conn, opResult);
386 shutdownConnection(conn);
388 // Generate a metrics log so we can track how long the operation took.
390 .info(SearchDbMsgs.CREATE_INDEX_TIME,
391 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
392 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
393 override, indexName);
399 * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
402 * @param indexName - The name of the index to be created
403 * @param settingsAndMappings - The actual JSON object that will define the index
404 * @return - The operation result of writing into Elasticsearch
405 * @throws DocumentStoreOperationException
407 protected OperationResult createTable(String indexName, String settingsAndMappings)
408 throws DocumentStoreOperationException {
409 OperationResult result = new OperationResult();
410 result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
411 result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
413 // Grab the current time so we can use it to generate a metrics log.
414 MdcOverride override = getStartTime(new MdcOverride());
416 String fullUrl = getFullUrl("/" + indexName + "/", false);
417 HttpURLConnection conn = initializeConnection(fullUrl);
420 conn.setRequestMethod("PUT");
421 } catch (ProtocolException e) {
422 shutdownConnection(conn);
423 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
427 attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
428 } catch (IOException e) {
429 logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
430 throw new DocumentStoreOperationException(e.getMessage());
432 handleResponse(conn, result);
434 // Generate a metrics log so we can track how long the operation took.
436 .info(SearchDbMsgs.CREATE_INDEX_TIME,
437 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
438 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
439 override, indexName);
445 public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
446 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
448 if (!allowImplicitIndexCreation) {
449 // Before we do anything, make sure that the specified index actually exists in the
450 // document store - we don't want to rely on ElasticSearch to fail the document
451 // create because it could be configured to implicitly create a non-existent index,
452 // which can lead to hard-to-debug behaviour with queries down the road.
453 OperationResult indexExistsResult = checkIndexExistence(indexName);
454 if (!isSuccess(indexExistsResult)) {
455 DocumentOperationResult opResult = new DocumentOperationResult();
456 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
457 String resultMsg = "Document Index '" + indexName + "' does not exist.";
458 opResult.setResult(resultMsg);
459 opResult.setFailureCause(resultMsg);
464 if (document.getId() == null || document.getId().isEmpty()) {
465 return createDocumentWithoutId(indexName, document);
467 return createDocumentWithId(indexName, document);
471 private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
472 throws DocumentStoreOperationException {
473 // check if the document already exists
474 DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
476 if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
477 if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
478 opResult.setFailureCause("A document with the same id already exists.");
480 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
482 opResult.setResultCode(Status.CONFLICT.getStatusCode());
486 opResult = new DocumentOperationResult();
487 // Initialize operation result with a failure codes / fault string
488 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
489 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
491 // Grab the current time so we can use it to generate a metrics log.
492 MdcOverride override = getStartTime(new MdcOverride());
494 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
495 HttpURLConnection conn = initializeConnection(fullUrl);
498 conn.setRequestMethod("PUT");
499 } catch (ProtocolException e) {
500 shutdownConnection(conn);
501 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
504 attachDocument(conn, document);
506 logger.debug("Sending 'PUT' request to: " + conn.getURL());
508 handleResponse(conn, opResult);
509 buildDocumentResult(opResult, indexName);
511 // Generate a metrics log so we can track how long the operation took.
513 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
514 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
515 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
516 override, indexName);
518 shutdownConnection(conn);
523 private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
524 throws DocumentStoreOperationException {
526 DocumentOperationResult response = new DocumentOperationResult();
527 // Initialize operation result with a failure codes / fault string
528 response.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
529 response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
531 // Grab the current time so we can use it to generate a metrics log.
532 MdcOverride override = getStartTime(new MdcOverride());
534 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
535 HttpURLConnection conn = initializeConnection(fullUrl);
538 conn.setRequestMethod("POST");
539 } catch (ProtocolException e) {
540 shutdownConnection(conn);
541 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
544 attachDocument(conn, document);
546 logger.debug("Sending 'POST' request to: " + conn.getURL());
548 handleResponse(conn, response);
549 buildDocumentResult(response, indexName);
551 // Generate a metrics log so we can track how long the operation took.
553 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
554 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
555 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
556 override, indexName);
558 shutdownConnection(conn);
563 private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
564 throws DocumentStoreOperationException {
565 conn.setRequestProperty("Connection", "Close");
566 attachContent(conn, doc.getContentInJson());
569 private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
570 throws DocumentStoreOperationException {
571 DocumentOperationResult opResult = new DocumentOperationResult();
573 // Initialize operation result with a failure codes / fault string
574 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
576 // Grab the current time so we can use it to generate a metrics log.
577 MdcOverride override = getStartTime(new MdcOverride());
579 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
580 HttpURLConnection conn = initializeConnection(fullUrl);
583 conn.setRequestMethod("HEAD");
584 } catch (ProtocolException e) {
585 shutdownConnection(conn);
586 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
589 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
593 resultCode = conn.getResponseCode();
594 } catch (IOException e) {
595 shutdownConnection(conn);
596 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
599 logger.debug(MSG_RESPONSE_CODE + resultCode);
601 opResult.setResultCode(resultCode);
603 // Generate a metrics log so we can track how long the operation took.
605 .info(SearchDbMsgs.GET_DOCUMENT_TIME,
606 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
607 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
608 override, indexName, docId);
610 shutdownConnection(conn);
616 public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
617 boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
619 if (!allowImplicitIndexCreation) {
620 // Before we do anything, make sure that the specified index actually exists in the
621 // document store - we don't want to rely on ElasticSearch to fail the document
622 // create because it could be configured to implicitly create a non-existent index,
623 // which can lead to hard-to-debug behaviour with queries down the road.
624 OperationResult indexExistsResult = checkIndexExistence(indexName);
625 if (!isSuccess(indexExistsResult)) {
626 DocumentOperationResult opResult = new DocumentOperationResult();
627 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
628 String resultMsg = "Document Index '" + indexName + "' does not exist.";
629 opResult.setResult(resultMsg);
630 opResult.setFailureCause(resultMsg);
635 DocumentOperationResult opResult = new DocumentOperationResult();
637 // Initialize operation result with a failure codes / fault string
638 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
639 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
641 // Grab the current time so we can use it to generate a metrics log.
642 MdcOverride override = getStartTime(new MdcOverride());
644 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
645 + document.getVersion(), false);
646 HttpURLConnection conn = initializeConnection(fullUrl);
649 conn.setRequestMethod("PUT");
650 } catch (ProtocolException e) {
651 shutdownConnection(conn);
652 throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
655 attachDocument(conn, document);
657 logger.debug("Sending 'PUT' request to: " + conn.getURL());
659 handleResponse(conn, opResult);
660 buildDocumentResult(opResult, indexName);
662 // Generate a metrics log so we can track how long the operation took.
663 metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
664 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
665 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
666 override, indexName, document.getId());
668 shutdownConnection(conn);
674 public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
675 throws DocumentStoreOperationException {
676 DocumentOperationResult opResult = new DocumentOperationResult();
678 // Initialize operation result with a failure codes / fault string
679 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
680 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
682 // Grab the current time so we can use it to generate a metrics log.
683 MdcOverride override = getStartTime(new MdcOverride());
685 String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
686 + document.getVersion(), false);
687 HttpURLConnection conn = initializeConnection(fullUrl);
690 conn.setRequestMethod("DELETE");
691 } catch (ProtocolException e) {
692 shutdownConnection(conn);
693 throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
696 logger.debug("\nSending 'DELETE' request to " + conn.getURL());
698 handleResponse(conn, opResult);
699 buildDocumentResult(opResult, indexName);
700 // supress the etag and url in response for delete as they are not required
701 if (opResult.getDocument() != null) {
702 opResult.getDocument().setEtag(null);
703 opResult.getDocument().setUrl(null);
706 // Generate a metrics log so we can track how long the operation took.
707 metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
708 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
709 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
710 override, indexName, document.getId());
712 shutdownConnection(conn);
718 public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
719 throws DocumentStoreOperationException {
720 DocumentOperationResult opResult = new DocumentOperationResult();
722 // Initialize operation result with a failure codes / fault string
723 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
724 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
726 // Grab the current time so we can use it to generate a metrics log.
727 MdcOverride override = getStartTime(new MdcOverride());
729 String fullUrl = null;
730 if (document.getVersion() == null) {
731 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
733 fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
734 + document.getVersion(), false);
736 HttpURLConnection conn = initializeConnection(fullUrl);
738 logger.debug("\nSending 'GET' request to: " + conn.getURL());
740 handleResponse(conn, opResult);
741 buildDocumentResult(opResult, indexName);
743 // Generate a metrics log so we can track how long the operation took.
744 metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
745 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
746 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
747 override, indexName, document.getId());
749 shutdownConnection(conn);
755 public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
756 SearchOperationResult opResult = new SearchOperationResult();
758 // Initialize operation result with a failure codes / fault string
759 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
760 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
762 String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
764 // Grab the current time so we can use it to generate a metrics log.
765 MdcOverride override = getStartTime(new MdcOverride());
767 HttpURLConnection conn = initializeConnection(fullUrl);
770 conn.setRequestMethod("GET");
771 } catch (ProtocolException e) {
772 shutdownConnection(conn);
773 throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
776 logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
778 handleResponse(conn, opResult);
779 buildSearchResult(opResult, indexName);
782 metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
783 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
784 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
785 override, indexName, queryString);
791 public SearchOperationResult searchWithPayload(String indexName, String query)
792 throws DocumentStoreOperationException {
793 SearchOperationResult opResult = new SearchOperationResult();
795 if (logger.isDebugEnabled()) {
796 logger.debug("Querying index: " + indexName + " with query string: " + query);
799 // Initialize operation result with a failure codes / fault string
800 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
801 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
803 String fullUrl = getFullUrl("/" + indexName + "/_search", false);
805 // Grab the current time so we can use it to generate a metrics log.
806 MdcOverride override = getStartTime(new MdcOverride());
808 HttpURLConnection conn = initializeConnection(fullUrl);
811 conn.setRequestMethod("POST");
812 } catch (ProtocolException e) {
813 shutdownConnection(conn);
814 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
817 attachContent(conn, query);
819 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
820 logger.debug("Request body = Elasticsearch query = " + query);
822 handleResponse(conn, opResult);
823 buildSearchResult(opResult, indexName);
826 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
827 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
828 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
829 override, indexName, query);
831 shutdownConnection(conn);
838 public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
839 throws DocumentStoreOperationException {
841 SearchOperationResult opResult = new SearchOperationResult();
843 if (logger.isDebugEnabled()) {
844 logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
847 // Initialize operation result with a failure codes / fault string
848 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
849 opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
851 String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
853 // Grab the current time so we can use it to generate a metrics log.
854 MdcOverride override = getStartTime(new MdcOverride());
856 HttpURLConnection conn = initializeConnection(fullUrl);
859 conn.setRequestMethod("POST");
860 } catch (ProtocolException e) {
861 shutdownConnection(conn);
862 throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
865 attachContent(conn, query);
867 logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
868 logger.debug("Request body = Elasticsearch query = " + query);
870 handleResponse(conn, opResult);
871 buildSuggestResult(opResult, indexName);
874 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
875 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
876 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
877 override, indexName, query);
879 shutdownConnection(conn);
884 private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
885 OutputStream outputStream = null;
886 OutputStreamWriter out = null;
889 outputStream = conn.getOutputStream();
890 } catch (IOException e) {
891 shutdownConnection(conn);
892 throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
895 out = new OutputStreamWriter(outputStream);
900 } catch (IOException e) {
901 shutdownConnection(conn);
902 throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
906 private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
908 HttpURLConnection conn = null;
911 url = new URL(fullUrl);
912 } catch (MalformedURLException e) {
913 throw new DocumentStoreOperationException("Error building a URL with " + url, e);
917 conn = (HttpURLConnection) url.openConnection();
918 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
919 conn.setDoOutput(true);
920 } catch (IOException e) {
921 shutdownConnection(conn);
922 throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
928 private void handleResponse(HttpURLConnection conn, OperationResult opResult)
929 throws DocumentStoreOperationException {
933 resultCode = conn.getResponseCode();
934 } catch (IOException e) {
935 shutdownConnection(conn);
936 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
939 logger.debug(MSG_RESPONSE_CODE + resultCode);
941 InputStream inputStream = null;
943 if (!isSuccessCode(resultCode)) {
944 inputStream = conn.getErrorStream();
947 inputStream = conn.getInputStream();
948 } catch (IOException e) {
949 shutdownConnection(conn);
950 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
954 InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
955 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
957 StringBuilder result = new StringBuilder(128);
958 String string = null;
961 while ((string = bufferedreader.readLine()) != null) {
962 result.append(string).append("\n");
964 } catch (IOException e) {
965 shutdownConnection(conn);
966 throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
969 if (resultCode == Status.CONFLICT.getStatusCode()) {
970 opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
972 opResult.setResultCode(resultCode);
974 if (logger.isDebugEnabled()) {
975 logger.debug("Raw result string from ElasticSearch = " + result.toString());
977 opResult.setResult(result.toString());
978 opResult.setResultVersion(extractVersion(result.toString()));
981 private String extractVersion(String result) {
982 JSONParser parser = new JSONParser();
983 String version = null;
985 JSONObject root = (JSONObject) parser.parse(result);
986 if (root.get(JSON_ATTR_VERSION) != null) {
987 version = root.get(JSON_ATTR_VERSION).toString();
989 } catch (ParseException e) {
990 // Not all responses from ElasticSearch include a version, so
991 // if we don't get one back, just return an empty string rather
992 // than trigger a false failure.
999 * This convenience method gets the current system time and stores it in an attribute in the supplied
1000 * {@link MdcOverride} object so that it can be used later by the metrics logger.
1002 * @param override - The {@link MdcOverride} object to update.
1003 * @return - The supplied {@link MdcOverride} object.
1005 private MdcOverride getStartTime(MdcOverride override) {
1007 // Grab the current time...
1008 long startTimeInMs = System.currentTimeMillis();
1010 // ...and add it as an attribute to the supplied MDC Override
1012 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1013 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1015 // Return the MdcOverride object that we were passed.
1016 // This looks odd, but it allows us to do stuff like:
1018 // MdcOverride ov = getStartTime(new MdcOverride())
1020 // which is quite handy, but also allows us to pass in an existing
1021 // MdcOverride object which already has some attributes set.
1025 private boolean isSuccess(OperationResult result) {
1026 return isSuccessCode(result.getResultCode());
1029 private boolean isSuccessCode(int statusCode) {
1030 return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
1034 public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
1036 if (logger.isDebugEnabled()) {
1037 StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
1039 for (BulkRequest request : requests) {
1040 dbgString.append("[").append(request).append("] ");
1043 logger.debug(dbgString.toString());
1046 // Grab the current time so we can use it to generate a metrics log.
1047 MdcOverride override = getStartTime(new MdcOverride());
1049 // Parse the supplied set of operations.
1050 // Iterate over the list of operations which we were provided and
1051 // translate them into a format that ElasticSearh understands.
1053 StringBuilder esOperationSet = new StringBuilder(128);
1054 List<ElasticSearchResultItem> rejected = new ArrayList<>();
1055 for (BulkRequest request : requests) {
1057 // Convert the request to the syntax ElasticSearch likes.
1058 if (buildEsOperation(request, esOperationSet, rejected)) {
1063 ElasticSearchBulkOperationResult opResult = null;
1066 // Open an HTTP connection to the ElasticSearch back end.
1067 String fullUrl = getFullUrl("/_bulk", false);
1069 HttpURLConnection conn;
1072 url = new URL(fullUrl);
1073 conn = (HttpURLConnection) url.openConnection();
1074 conn.setRequestMethod("PUT");
1075 conn.setDoOutput(true);
1076 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
1077 conn.setRequestProperty("Connection", "Close");
1079 } catch (IOException e) {
1081 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1082 if (logger.isDebugEnabled()) {
1083 logger.debug(Throwables.getStackTraceAsString(e));
1086 throw new DocumentStoreOperationException(
1087 "Failed to open connection to document store. Cause: " + e.getMessage(), e);
1090 StringBuilder bulkResult = new StringBuilder(128);
1092 // Create an output stream to write our request to.
1093 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1095 if (logger.isDebugEnabled()) {
1096 logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1097 logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1100 // Write the resulting request string to our output stream. (this sends the request to ES?)
1101 out.write(esOperationSet.toString());
1104 // Open an input stream on our connection in order to read back the results.
1105 InputStream is = conn.getInputStream();
1106 InputStreamReader inputstreamreader = new InputStreamReader(is);
1107 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1109 // Read the contents of the input stream into our result string...
1110 String esResponseString = null;
1112 while ((esResponseString = bufferedreader.readLine()) != null) {
1113 bulkResult.append(esResponseString).append("\n");
1116 } catch (IOException e) {
1118 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1119 if (logger.isDebugEnabled()) {
1120 StringWriter sw = new StringWriter();
1121 e.printStackTrace(new PrintWriter(sw));
1122 logger.debug(sw.toString());
1125 throw new DocumentStoreOperationException(
1126 "Failure interacting with document store. Cause: " + e.getMessage(), e);
1129 if (logger.isDebugEnabled()) {
1130 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1133 // ...and marshal the resulting string into a Java object.
1135 opResult = marshallEsBulkResult(bulkResult.toString());
1137 } catch (IOException e) {
1139 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1140 if (logger.isDebugEnabled()) {
1141 logger.debug(Throwables.getStackTraceAsString(e));
1144 throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(),
1149 // Finally, build the operation result and return it to the caller.
1150 OperationResult result = new OperationResult();
1151 result.setResultCode(207);
1152 result.setResult(buildGenericBulkResultSet(opResult, rejected));
1154 // In the success case we don't want the entire result string to be
1155 // dumped into the metrics log, so concatenate it.
1156 String resultStringForMetricsLog = result.getResult();
1157 if (isSuccess(result)) {
1158 resultStringForMetricsLog =
1159 resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1162 metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1163 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1164 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1172 * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1174 * @param request - The request to be performed.
1175 * @param sb - The string builder to append the json data to
1176 * @throws DocumentStoreOperationException
1178 private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1179 throws DocumentStoreOperationException {
1181 boolean retVal = true;
1182 // What kind of operation are we performing?
1183 switch (request.getOperationType()) {
1185 // Create a new document.
1188 // Make sure that we were supplied a document payload.
1189 if (request.getOperation().getDocument() == null) {
1191 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1192 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1196 // Make sure that the supplied document URL is formatted
1198 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1199 fails.add(generateRejectionEntry(request.getOperationType(),
1200 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1201 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1205 // Validate that the specified index actually exists before we
1206 // try to perform the create.
1207 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1209 fails.add(generateRejectionEntry(request.getOperationType(),
1210 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1211 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1215 // If we were supplied an id for the new document, then
1216 // include it in the bulk operation to Elastic Search
1217 if (request.getId() == null) {
1219 sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1221 // Otherwise, we just leave that parameter off and ElasticSearch
1222 // will generate one for us.
1224 sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1229 // Append the document that we want to create.
1230 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1231 } catch (JsonProcessingException e) {
1232 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1237 // Update an existing document.
1240 // Make sure that we were supplied a document payload.
1241 if (request.getOperation().getDocument() == null) {
1243 fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1244 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1248 // Make sure that the supplied document URL is formatted
1250 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1251 fails.add(generateRejectionEntry(request.getOperationType(),
1252 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1253 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1257 // Validate that the specified index actually exists before we
1258 // try to perform the update.
1259 if (!indexExists(request.getIndex())) {
1261 fails.add(generateRejectionEntry(request.getOperationType(),
1262 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1263 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1267 // Validate that the document we are trying to update actually
1268 // exists before we try to perform the update.
1269 if (!documentExists(request.getIndex(), request.getId())) {
1271 fails.add(generateRejectionEntry(request.getOperationType(),
1272 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1273 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1277 // It is mandatory that a version be supplied for an update operation,
1278 // so validate that now.
1279 if (request.getOperation().getMetaData().getEtag() == null) {
1281 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1282 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1286 // Generate the update request...
1287 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1288 request.getOperation().getMetaData().getEtag()));
1290 // ...and append the document that we want to update.
1292 sb.append(request.getOperation().getDocument().toJson()).append("\n");
1293 } catch (JsonProcessingException e) {
1294 throw new DocumentStoreOperationException("Failure parsing document to json", e);
1298 // Delete an existing document.
1301 // Make sure that the supplied document URL is formatted
1303 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1304 fails.add(generateRejectionEntry(request.getOperationType(),
1305 MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1306 request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1310 // Validate that the specified index actually exists before we
1311 // try to perform the delete.
1312 if (!indexExists(request.getIndex())) {
1314 fails.add(generateRejectionEntry(request.getOperationType(),
1315 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1316 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1320 // Validate that the document we are trying to update actually
1321 // exists before we try to perform the delete.
1322 if (!documentExists(request.getIndex(), request.getId())) {
1324 fails.add(generateRejectionEntry(request.getOperationType(),
1325 MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1326 request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1330 // It is mandatory that a version be supplied for a delete operation,
1331 // so validate that now.
1332 if (request.getOperation().getMetaData().getEtag() == null) {
1334 fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1335 request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1339 // Generate the delete request.
1340 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1341 request.getOperation().getMetaData().getEtag()));
1349 private boolean indexExists(String index) throws DocumentStoreOperationException {
1350 return isSuccess(checkIndexExistence(index));
1353 private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1354 return isSuccess(checkDocumentExistence(index, id));
1358 * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1361 * @param rejectReason - A message describing why the operation was rejected.
1362 * @param anId - The identifier associated with the document being acted on.
1363 * @param statusCode - An HTTP status code.
1364 * @return - A result set item.
1366 private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1367 String anId, int statusCode, String originalUrl) {
1369 ElasticSearchError err = new ElasticSearchError();
1370 err.setReason(rejectReason);
1372 ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1375 op.setStatus(statusCode);
1377 op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1379 ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1383 rejectionResult.setCreate(op);
1386 rejectionResult.setIndex(op);
1389 rejectionResult.setDelete(op);
1394 return rejectionResult;
1398 * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1399 * marshals it into a Java object.
1401 * @param jsonResult - The bulk operations response returned from ElasticSearch.
1402 * @return - The marshalled response.
1403 * @throws JsonParseException
1404 * @throws JsonMappingException
1405 * @throws IOException
1407 private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1408 if (jsonResult != null) {
1409 if (logger.isDebugEnabled()) {
1410 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1413 ObjectMapper mapper = new ObjectMapper();
1414 mapper.setSerializationInclusion(Include.NON_EMPTY);
1416 return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1423 * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1425 * @param esResult - ElasticSearch bulk operations response.
1426 * @return - A generic result set.
1428 private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1429 List<ElasticSearchResultItem> rejectedOps) {
1431 int totalSuccess = 0;
1434 if (logger.isDebugEnabled()) {
1436 logger.debug("ESController: Build generic result set. ES Results: "
1437 + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1440 // Build a combined list of result items from the results returned
1441 // from ElasticSearch and the list of operations that we rejected
1442 // without sending to ElasticSearch.
1443 List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1444 if (esResult != null) {
1445 combinedResults.addAll(Arrays.asList(esResult.getItems()));
1447 combinedResults.addAll(rejectedOps);
1449 // Iterate over the individual results in the resulting result set.
1450 StringBuilder resultsBuilder = new StringBuilder();
1451 AtomicBoolean firstItem = new AtomicBoolean(true);
1452 for (ElasticSearchResultItem item : combinedResults) {
1454 // Increment the operation counts.
1456 if (isSuccessCode(item.operationStatus().getStatus())) {
1462 // Prepend a comma to our response string unless this it the
1463 // first result in the set.
1464 if (!firstItem.compareAndSet(true, false)) {
1465 resultsBuilder.append(", ");
1468 // Append the current result as a generic json structure.
1469 resultsBuilder.append(item.toJson());
1472 return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1473 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1478 * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1480 * @param indexName - The index to look for.
1481 * @return - An operation result indicating the success or failure of the check.
1482 * @throws DocumentStoreOperationException
1484 public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1486 // Initialize operation result with a failure codes / fault string
1487 OperationResult opResult = new OperationResult();
1488 opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
1490 // Grab the current time so we can use it to generate a metrics log.
1491 MdcOverride override = getStartTime(new MdcOverride());
1493 String fullUrl = getFullUrl("/" + indexName, false);
1494 HttpURLConnection conn = initializeConnection(fullUrl);
1497 conn.setRequestMethod("HEAD");
1499 } catch (ProtocolException e) {
1500 shutdownConnection(conn);
1501 throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1504 logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1508 resultCode = conn.getResponseCode();
1509 } catch (IOException e) {
1510 shutdownConnection(conn);
1511 throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
1513 logger.debug(MSG_RESPONSE_CODE + resultCode);
1515 opResult.setResultCode(resultCode);
1517 // Generate a metrics log so we can track how long the operation took.
1519 .info(SearchDbMsgs.CHECK_INDEX_TIME,
1520 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1521 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1522 override, indexName);
1524 shutdownConnection(conn);
1530 private void buildDocumentResult(DocumentOperationResult result, String index)
1531 throws DocumentStoreOperationException {
1533 JSONParser parser = new JSONParser();
1536 root = (JSONObject) parser.parse(result.getResult());
1537 if (isSuccess(result)) {
1538 // Success response object
1539 Document doc = new Document();
1540 doc.setEtag(result.getResultVersion());
1541 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1543 doc.setContent((JSONObject) root.get("_source"));
1544 result.setDocument(doc);
1547 // Error response object
1548 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1549 if (error != null) {
1551 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1555 } catch (Exception e) {
1556 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1560 private String buildDocumentResponseUrl(String index, String id) {
1561 return ApiUtils.buildDocumentUri(index, id);
1564 private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1566 JSONParser parser = new JSONParser();
1570 root = (JSONObject) parser.parse(result.getResult());
1571 if (isSuccess(result)) {
1572 JSONObject hits = (JSONObject) root.get("hits");
1573 JSONArray hitArray = (JSONArray) hits.get("hits");
1574 SearchHits searchHits = new SearchHits();
1575 searchHits.setTotalHits(hits.get("total").toString());
1576 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1578 for (int i = 0; i < hitArray.size(); i++) {
1579 JSONObject hit = (JSONObject) hitArray.get(i);
1580 SearchHit searchHit = new SearchHit();
1581 searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1582 Document doc = new Document();
1583 if (hit.get(JSON_ATTR_VERSION) != null) {
1584 doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1588 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1589 doc.setContent((JSONObject) hit.get("_source"));
1590 searchHit.setDocument(doc);
1591 searchHitArray.add(searchHit);
1593 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1594 result.setSearchResult(searchHits);
1596 JSONObject aggregations = (JSONObject) root.get("aggregations");
1597 if (aggregations != null) {
1598 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1599 AggregationResults aggs = new AggregationResults();
1600 aggs.setAggregations(aggResults);
1601 result.setAggregationResult(aggs);
1606 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1607 if (error != null) {
1609 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1612 } catch (Exception e) {
1613 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1618 private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1619 JSONParser parser = new JSONParser();
1622 root = (JSONObject) parser.parse(result.getResult());
1623 if (isSuccess(result)) {
1624 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1625 JSONObject hitdata = (JSONObject) hitArray.get(0);
1626 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1627 SuggestHits suggestHits = new SuggestHits();
1628 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1630 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1632 for (int i = 0; i < optionsArray.size(); i++) {
1633 JSONObject hit = (JSONObject) optionsArray.get(i);
1635 SuggestHit suggestHit = new SuggestHit();
1636 suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1637 suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1638 Document doc = new Document();
1639 if (hit.get(JSON_ATTR_VERSION) != null) {
1640 doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1643 buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1645 doc.setContent((JSONObject) hit.get("payload"));
1646 suggestHit.setDocument(doc);
1647 suggestHitArray.add(suggestHit);
1649 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1650 result.setSuggestResult(suggestHits);
1652 JSONObject aggregations = (JSONObject) root.get("aggregations");
1653 if (aggregations != null) {
1654 AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1655 AggregationResults aggs = new AggregationResults();
1656 aggs.setAggregations(aggResults);
1657 result.setAggregationResult(aggs);
1662 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1663 if (error != null) {
1665 new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1668 } catch (Exception e) {
1669 throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());