Search service configurable index settings
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
26
27 import com.fasterxml.jackson.annotation.JsonInclude.Include;
28 import com.fasterxml.jackson.core.JsonParseException;
29 import com.fasterxml.jackson.core.JsonProcessingException;
30 import com.fasterxml.jackson.databind.JsonMappingException;
31 import com.fasterxml.jackson.databind.ObjectMapper;
32 import com.google.common.base.Throwables;
33 import java.io.BufferedReader;
34 import java.io.File;
35 import java.io.FileInputStream;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.io.InputStreamReader;
39 import java.io.OutputStream;
40 import java.io.OutputStreamWriter;
41 import java.io.PrintWriter;
42 import java.io.StringWriter;
43 import java.net.HttpURLConnection;
44 import java.net.MalformedURLException;
45 import java.net.ProtocolException;
46 import java.net.URL;
47 import java.text.SimpleDateFormat;
48 import java.util.ArrayList;
49 import java.util.Arrays;
50 import java.util.List;
51 import java.util.Optional;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.HttpMethod;
55 import javax.ws.rs.core.MediaType;
56 import javax.ws.rs.core.Response.Status;
57 import javax.ws.rs.core.UriBuilder;
58 import org.eclipse.jetty.http.HttpStatus;
59 import org.json.simple.JSONArray;
60 import org.json.simple.JSONObject;
61 import org.json.simple.parser.JSONParser;
62 import org.json.simple.parser.ParseException;
63 import org.onap.aai.cl.api.LogFields;
64 import org.onap.aai.cl.api.LogLine;
65 import org.onap.aai.cl.api.Logger;
66 import org.onap.aai.cl.eelf.LoggerFactory;
67 import org.onap.aai.cl.mdc.MdcContext;
68 import org.onap.aai.cl.mdc.MdcOverride;
69 import org.onap.aai.sa.rest.AnalysisConfiguration;
70 import org.onap.aai.sa.rest.ApiUtils;
71 import org.onap.aai.sa.rest.BulkRequest;
72 import org.onap.aai.sa.rest.BulkRequest.OperationType;
73 import org.onap.aai.sa.rest.DocumentSchema;
74 import org.onap.aai.sa.rest.SettingConfiguration;
75 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
76 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
77 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage;
78 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
80 import org.onap.aai.sa.searchdbabstraction.entity.Document;
81 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
83 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
84 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
85 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
86 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
87 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
88 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
89 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
90 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
91 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
92 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
93 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
94 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
95 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
96
97 /**
98  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
99  */
100 public class ElasticSearchHttpController implements DocumentStoreInterface {
101
102     private static ElasticSearchHttpController instance = null;
103
104     private static final Logger logger =
105             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
106     private static final Logger metricsLogger =
107             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
108
109     private static final String URL_QUERY_VERSION = "version=";
110
111     private static final String JSON_ATTR_VERSION = "_version";
112     private static final String JSON_ATTR_ERROR = "error";
113     private static final String JSON_ATTR_REASON = "reason";
114
115     private static final String DEFAULT_TYPE = "default";
116
117     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
118     private static final String MSG_RESPONSE_CODE = "Response Code : ";
119     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
120
121     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
122
123     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
124             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
125     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
126             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
127     private static final String BULK_IMPORT_INDEX_TEMPLATE =
128             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
129     private static final String BULK_DELETE_TEMPLATE =
130             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
131     public final static String APPLICATION_XND_JSON_TYPE = new MediaType("application", "x-ndjson").toString();
132
133     private final ElasticSearchConfig config;
134
135     protected AnalysisConfiguration analysisConfig;
136     protected SettingConfiguration settingConfig;
137
138     public ElasticSearchHttpController(ElasticSearchConfig config) {
139         this.config = config;
140         analysisConfig = new AnalysisConfiguration();
141         settingConfig = new SettingConfiguration();
142
143         String rootUrl = null;
144         try {
145             if ("https".equals(config.getUriScheme())) {
146                 new ElasticSearchHttpsController(config);
147             }
148             rootUrl = buildUrl(createUriBuilder("")).toString();
149             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl);
150             checkConnection();
151             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl);
152         } catch (Exception e) {
153             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage());
154         }
155     }
156
157     public static ElasticSearchHttpController getInstance() {
158         synchronized (ElasticSearchHttpController.class) {
159             if (instance == null) {
160                 Properties properties = new Properties();
161                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
162                 try {
163                     properties.load(new FileInputStream(file));
164                 } catch (Exception e) {
165                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
166                             e.getLocalizedMessage());
167                 }
168
169                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
170                 instance = new ElasticSearchHttpController(config);
171             }
172         }
173
174         return instance;
175     }
176
177     public AnalysisConfiguration getAnalysisConfig() {
178         return analysisConfig;
179     }
180
181     public ElasticSearchConfig getElasticSearchConfig() {
182         return config;
183     }
184
185     @Override
186     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
187         try {
188             // Submit the request to ElasticSearch to create the index using a default document type.
189             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig,
190                     DocumentSchemaUtil.generateDocumentMappings(documentSchema), settingConfig);
191
192             // ElasticSearch will return us a 200 code on success when we
193             // want to report a 201, so translate the result here.
194             if (result.getResultCode() == Status.OK.getStatusCode()) {
195                 result.setResultCode(Status.CREATED.getStatusCode());
196             }
197
198             if (isSuccess(result)) {
199                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
200             }
201             return result;
202         } catch (DocumentStoreOperationException | IOException e) {
203             return new OperationResultBuilder().useDefaults()
204                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
205         }
206     }
207
208     @Override
209     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
210         try {
211             OperationResult result = createTable(index, dynamicSchema);
212
213             // ElasticSearch will return us a 200 code on success when we
214             // want to report a 201, so translate the result here.
215             if (result.getResultCode() == Status.OK.getStatusCode()) {
216                 result.setResultCode(Status.CREATED.getStatusCode());
217             }
218             if (isSuccess(result)) {
219                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
220             }
221             return result;
222         } catch (DocumentStoreOperationException e) {
223             return new OperationResultBuilder().useDefaults()
224                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
225         }
226     }
227
228     @Override
229     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
230         MdcOverride override = getStartTime(new MdcOverride());
231
232         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE);
233         OperationResult opResult = handleResponse(conn);
234         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
235         shutdownConnection(conn);
236
237         return opResult;
238     }
239
240     // @Override
241     protected OperationResult createTable(String indexName, String typeName, AnalysisConfiguration ac, 
242             String indexMappings, SettingConfiguration sc) throws DocumentStoreOperationException {
243         if (ac.getEsIndexSettings() == null) {
244             logger.debug("No analysis settings provided.");
245         }
246
247         if (indexMappings == null) {
248             logger.debug("No mappings provided.");
249         }
250
251         MdcOverride override = getStartTime(new MdcOverride());
252
253         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
254         
255         StringBuilder sb = new StringBuilder(128);
256         sb.append("{ \"settings\" : ");
257         sb.append(sc.getSettingsWithAnalysis(ac));
258         sb.append(",");
259
260         sb.append("\"mappings\" : {");
261         sb.append("\"" + typeName + "\" :");
262         sb.append(indexMappings);
263         sb.append("}}");
264
265         try {
266             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
267         } catch (IOException e) {
268             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
269             throw new DocumentStoreOperationException(e.getMessage(), e);
270         }
271
272         logger.debug("Request content: " + sb);
273
274         OperationResult opResult = handleResponse(conn);
275         shutdownConnection(conn);
276         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
277
278         return opResult;
279     }
280
281     /**
282      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
283      * index.
284      *
285      * @param indexName - The name of the index to be created
286      * @param settingsAndMappings - The actual JSON object that will define the index
287      * @return - The operation result of writing into Elasticsearch
288      * @throws DocumentStoreOperationException
289      */
290     protected OperationResult createTable(String indexName, String settingsAndMappings)
291             throws DocumentStoreOperationException {
292         MdcOverride override = getStartTime(new MdcOverride());
293
294         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
295         try {
296             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
297         } catch (IOException e) {
298             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
299             throw new DocumentStoreOperationException(e.getMessage());
300         }
301
302         OperationResult result = handleResponse(conn);
303         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
304
305         return result;
306     }
307
308     @Override
309     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
310             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
311
312         if (!allowImplicitIndexCreation) {
313             // Before we do anything, make sure that the specified index actually exists in the
314             // document store - we don't want to rely on ElasticSearch to fail the document
315             // create because it could be configured to implicitly create a non-existent index,
316             // which can lead to hard-to-debug behaviour with queries down the road.
317             OperationResult indexExistsResult = checkIndexExistence(indexName);
318             if (!isSuccess(indexExistsResult)) {
319                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
320                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
321                         .result(resultMsg).failureCause(resultMsg).build();
322             }
323         }
324
325         if (document.getId() == null || document.getId().isEmpty()) {
326             return createDocumentWithoutId(indexName, document);
327         } else {
328             return createDocumentWithId(indexName, document);
329         }
330     }
331
332     @Override
333     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
334             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
335         if (!allowImplicitIndexCreation) {
336             // Before we do anything, make sure that the specified index actually exists in the
337             // document store - we don't want to rely on ElasticSearch to fail the document
338             // create because it could be configured to implicitly create a non-existent index,
339             // which can lead to hard-to-debug behaviour with queries down the road.
340             OperationResult indexExistsResult = checkIndexExistence(indexName);
341             if (!isSuccess(indexExistsResult)) {
342                 DocumentOperationResult opResult = new DocumentOperationResult();
343                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
344                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
345                 opResult.setResult(resultMsg);
346                 opResult.setFailureCause(resultMsg);
347                 return opResult;
348             }
349         }
350
351         MdcOverride override = getStartTime(new MdcOverride());
352
353         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
354                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
355
356         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
357         attachDocument(conn, document);
358
359         DocumentOperationResult opResult = getOperationResult(conn);
360         buildDocumentResult(opResult, indexName);
361
362         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
363
364         shutdownConnection(conn);
365
366         return opResult;
367     }
368
369     @Override
370     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
371             throws DocumentStoreOperationException {
372         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
373                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
374
375         MdcOverride override = getStartTime(new MdcOverride());
376
377         HttpURLConnection conn = createConnection(url, HttpMethod.DELETE);
378
379         DocumentOperationResult opResult = getOperationResult(conn);
380         buildDocumentResult(opResult, indexName);
381         // supress the etag and url in response for delete as they are not required
382         if (opResult.getDocument() != null) {
383             opResult.getDocument().setEtag(null);
384             opResult.getDocument().setUrl(null);
385         }
386
387         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
388
389         shutdownConnection(conn);
390
391         return opResult;
392     }
393
394     @Override
395     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
396             throws DocumentStoreOperationException {
397         final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId());
398         if (document.getVersion() != null) {
399             uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion());
400         }
401
402         MdcOverride override = getStartTime(new MdcOverride());
403         HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET");
404
405         DocumentOperationResult opResult = getOperationResult(conn);
406         buildDocumentResult(opResult, indexName);
407
408         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
409
410         shutdownConnection(conn);
411
412         return opResult;
413     }
414
415     @Override
416     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
417         final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString));
418
419         MdcOverride override = getStartTime(new MdcOverride());
420
421         HttpURLConnection conn = createConnection(url, "GET");
422         SearchOperationResult opResult = getSearchOperationResult(conn);
423         buildSearchResult(opResult, indexName);
424
425         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
426
427         return opResult;
428     }
429
430     @Override
431     public SearchOperationResult searchWithPayload(String indexName, String query)
432             throws DocumentStoreOperationException {
433         if (logger.isDebugEnabled()) {
434             logger.debug("Querying index: " + indexName + " with query string: " + query);
435         }
436         final URL url = buildUrl(createUriBuilder(indexName, "_search"));
437
438         MdcOverride override = getStartTime(new MdcOverride());
439
440         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
441         attachContent(conn, query);
442         logger.debug("Request body =  Elasticsearch query = " + query);
443
444         SearchOperationResult opResult = getSearchOperationResult(conn);
445         buildSearchResult(opResult, indexName);
446
447         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
448
449         shutdownConnection(conn);
450
451         return opResult;
452     }
453
454     @Override
455     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
456             throws DocumentStoreOperationException {
457         if (logger.isDebugEnabled()) {
458             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
459         }
460
461         MdcOverride override = getStartTime(new MdcOverride());
462         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST);
463         attachContent(conn, query);
464
465         logger.debug("Request body =  Elasticsearch query = " + query);
466
467         SearchOperationResult opResult = getSearchOperationResult(conn);
468         buildSuggestResult(opResult, indexName);
469
470         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
471
472         shutdownConnection(conn);
473
474         return opResult;
475     }
476
477     @Override
478     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
479         if (logger.isDebugEnabled()) {
480             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
481
482             for (BulkRequest request : requests) {
483                 dbgString.append("[").append(request).append("] ");
484             }
485
486             logger.debug(dbgString.toString());
487         }
488
489         MdcOverride override = getStartTime(new MdcOverride());
490
491         // Parse the supplied set of operations.
492         // Iterate over the list of operations which we were provided and
493         // translate them into a format that ElasticSearh understands.
494         int opCount = 0;
495         StringBuilder esOperationSet = new StringBuilder(128);
496         List<ElasticSearchResultItem> rejected = new ArrayList<>();
497         for (BulkRequest request : requests) {
498
499             // Convert the request to the syntax ElasticSearch likes.
500             if (buildEsOperation(request, esOperationSet, rejected)) {
501                 opCount++;
502             }
503         }
504
505         ElasticSearchBulkOperationResult opResult = null;
506         if (opCount > 0) {
507             HttpURLConnection conn;
508             try {
509                 conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection();
510                 conn.setRequestMethod(HttpMethod.PUT);
511                 conn.setDoOutput(true);
512                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_XND_JSON_TYPE);
513                 if(config.useAuth()){
514                         conn.setRequestProperty("Authorization", config.getAuthValue());
515                 }
516                 conn.setRequestProperty("Connection", "Close");
517
518             } catch (IOException e) {
519
520                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
521                 if (logger.isDebugEnabled()) {
522                     logger.debug(Throwables.getStackTraceAsString(e));
523                 }
524
525                 throw new DocumentStoreOperationException(
526                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
527             }
528
529             StringBuilder bulkResult = new StringBuilder(128);
530             try {
531                 // Create an output stream to write our request to.
532                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
533
534                 if (logger.isDebugEnabled()) {
535                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
536                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
537                 }
538
539                 // Write the resulting request string to our output stream. (this sends the request to ES?)
540                 out.write(esOperationSet.toString());
541                 out.close();
542
543                 // Open an input stream on our connection in order to read back the results.
544                 InputStream is = conn.getInputStream();
545                 InputStreamReader inputstreamreader = new InputStreamReader(is);
546                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
547
548                 // Read the contents of the input stream into our result string...
549                 String esResponseString = null;
550
551                 while ((esResponseString = bufferedreader.readLine()) != null) {
552                     bulkResult.append(esResponseString).append("\n");
553                 }
554
555             } catch (IOException e) {
556
557                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
558                 if (logger.isDebugEnabled()) {
559                     StringWriter sw = new StringWriter();
560                     e.printStackTrace(new PrintWriter(sw));
561                     logger.debug(sw.toString());
562                 }
563
564                 throw new DocumentStoreOperationException(
565                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
566             }
567
568             if (logger.isDebugEnabled()) {
569                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
570             }
571
572             // ...and marshal the resulting string into a Java object.
573             try {
574                 opResult = marshallEsBulkResult(bulkResult.toString());
575
576             } catch (IOException e) {
577                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
578                 if (logger.isDebugEnabled()) {
579                     logger.debug(Throwables.getStackTraceAsString(e));
580                 }
581
582                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
583                         e);
584             }
585         }
586
587         OperationResult result = new OperationResultBuilder() //
588                 .resultCode(HttpStatus.MULTI_STATUS_207) //
589                 .result(buildGenericBulkResultSet(opResult, rejected)) //
590                 .build();
591
592         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
593         // it.
594         String resultStringForMetricsLog = result.getResult();
595         if (isSuccess(result)) {
596             resultStringForMetricsLog =
597                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
598         }
599
600         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
601                 new LogFields() //
602                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
603                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
604                 override);
605
606         return result;
607     }
608
609
610     /**
611      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
612      *
613      * @param indexName - The index to look for.
614      * @return - An operation result indicating the success or failure of the check.
615      * @throws DocumentStoreOperationException
616      */
617     private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
618         MdcOverride override = getStartTime(new MdcOverride());
619
620         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD);
621         int resultCode;
622         try {
623             resultCode = conn.getResponseCode();
624         } catch (IOException ex) {
625             shutdownConnection(conn);
626             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
627         }
628         logger.debug(MSG_RESPONSE_CODE + resultCode);
629
630         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
631         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
632         shutdownConnection(conn);
633
634         return opResult;
635     }
636
637     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
638             throws DocumentStoreOperationException {
639         // check if the document already exists
640         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
641
642         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
643             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
644                 opResult.setFailureCause("A document with the same id already exists.");
645             } else {
646                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
647             }
648             opResult.setResultCode(Status.CONFLICT.getStatusCode());
649             return opResult;
650         }
651
652         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()));
653         MdcOverride override = getStartTime(new MdcOverride());
654
655         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
656         attachDocument(conn, document);
657
658         opResult = getOperationResult(conn);
659         buildDocumentResult(opResult, indexName);
660
661         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
662
663         shutdownConnection(conn);
664
665         return opResult;
666     }
667
668     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
669             throws DocumentStoreOperationException {
670         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE));
671
672         MdcOverride override = getStartTime(new MdcOverride());
673         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
674         attachDocument(conn, document);
675
676         DocumentOperationResult response = getOperationResult(conn);
677         buildDocumentResult(response, indexName);
678
679         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
680
681         shutdownConnection(conn);
682
683         return response;
684     }
685
686     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
687             throws DocumentStoreOperationException {
688         conn.setRequestProperty("Connection", "Close");
689         attachContent(conn, doc.getContentInJson());
690     }
691
692     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
693             throws DocumentStoreOperationException {
694         MdcOverride override = getStartTime(new MdcOverride());
695         HttpURLConnection conn =
696                 createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD);
697         int resultCode;
698         try {
699             resultCode = conn.getResponseCode();
700         } catch (IOException ex) {
701             shutdownConnection(conn);
702             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
703         }
704
705         logger.debug(MSG_RESPONSE_CODE + resultCode);
706
707         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
708                 .useDefaults().resultCode(resultCode).build();
709
710         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
711         shutdownConnection(conn);
712
713         return opResult;
714     }
715
716     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
717         OutputStream outputStream = null;
718         OutputStreamWriter out = null;
719
720         try {
721             outputStream = conn.getOutputStream();
722         } catch (IOException e) {
723             shutdownConnection(conn);
724             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
725         }
726
727         out = new OutputStreamWriter(outputStream);
728
729         try {
730             out.write(content);
731             out.close();
732         } catch (IOException e) {
733             shutdownConnection(conn);
734             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
735         }
736     }
737
738     private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException {
739         HttpURLConnection conn = null;
740         try {
741             conn = (HttpURLConnection) url.openConnection();
742             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
743             conn.setDoOutput(true);
744             if (config.useAuth()) {
745                 conn.setRequestProperty("Authorization", config.getAuthValue());
746             }
747         } catch (IOException e) {
748             shutdownConnection(conn);
749             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
750         }
751
752         return conn;
753     }
754
755     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
756         return handleResponse(conn, new OperationResultBuilder().useDefaults());
757     }
758
759     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
760             throws DocumentStoreOperationException {
761         int resultCode;
762
763         try {
764             resultCode = conn.getResponseCode();
765         } catch (IOException ex) {
766             shutdownConnection(conn);
767             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
768         }
769
770         logger.debug(MSG_RESPONSE_CODE + resultCode);
771
772         InputStream inputStream = null;
773
774         if (!ApiUtils.isSuccessStatusCode(resultCode)) {
775             inputStream = conn.getErrorStream();
776         } else {
777             try {
778                 inputStream = conn.getInputStream();
779             } catch (IOException e) {
780                 shutdownConnection(conn);
781                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
782             }
783         }
784
785         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
786         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
787
788         StringBuilder result = new StringBuilder(128);
789         String string = null;
790
791         try {
792             while ((string = bufferedreader.readLine()) != null) {
793                 result.append(string).append("\n");
794             }
795         } catch (IOException e) {
796             shutdownConnection(conn);
797             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
798         }
799
800         if (resultCode == Status.CONFLICT.getStatusCode()) {
801             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
802         } else {
803             rb.resultCode(resultCode);
804         }
805         if (logger.isDebugEnabled()) {
806             logger.debug("Raw result string from ElasticSearch = " + result.toString());
807         }
808         rb.result(result.toString());
809         rb.resultVersion(extractVersion(result.toString()));
810         return rb.build();
811     }
812
813     private String extractVersion(String result) {
814         JSONParser parser = new JSONParser();
815         String version = null;
816         try {
817             JSONObject root = (JSONObject) parser.parse(result);
818             if (root.get(JSON_ATTR_VERSION) != null) {
819                 version = root.get(JSON_ATTR_VERSION).toString();
820             }
821         } catch (ParseException e) {
822             // Not all responses from ElasticSearch include a version, so
823             // if we don't get one back, just return an empty string rather
824             // than trigger a false failure.
825             version = "";
826         }
827         return version;
828     }
829
830     /**
831      * This convenience method gets the current system time and stores it in an attribute in the supplied
832      * {@link MdcOverride} object so that it can be used later by the metrics logger.
833      *
834      * @param override - The {@link MdcOverride} object to update.
835      * @return - The supplied {@link MdcOverride} object.
836      */
837     private MdcOverride getStartTime(MdcOverride override) {
838
839         // Grab the current time...
840         long startTimeInMs = System.currentTimeMillis();
841
842         // ...and add it as an attribute to the supplied MDC Override object.
843         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
844         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
845
846         // Return the MdcOverride object that we were passed.
847         // This looks odd, but it allows us to do stuff like:
848         //
849         // MdcOverride ov = getStartTime(new MdcOverride())
850         //
851         // which is quite handy, but also allows us to pass in an existing
852         // MdcOverride object which already has some attributes set.
853         return override;
854     }
855
856     private boolean isSuccess(OperationResult result) {
857         return ApiUtils.isSuccessStatusCode(result.getResultCode());
858     }
859
860     private UriBuilder createUriBuilder(String path, String... paths) {
861         UriBuilder builder = UriBuilder.fromPath(path);
862         for (String other : paths) {
863             builder.path(other);
864         }
865         builder.host(config.getIpAddress());
866         String port = Optional.ofNullable(config.getHttpPort()).orElse("0");
867         builder.port(Integer.valueOf(port));
868         builder.scheme(config.getUriScheme());
869         return builder;
870     }
871
872     private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException {
873         try {
874             return builder.build().toURL();
875         } catch (MalformedURLException e) {
876             logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage());
877             throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e);
878         }
879     }
880
881     private HttpURLConnection createConnection(final URL url, final String method)
882             throws DocumentStoreOperationException {
883         HttpURLConnection conn = initializeConnection(url);
884         try {
885             logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL());
886             conn.setRequestMethod(method);
887         } catch (ProtocolException e) {
888             shutdownConnection(conn);
889             throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method);
890         }
891         return conn;
892     }
893
894     private OperationResult checkConnection() throws IOException, DocumentStoreOperationException {
895         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET);
896         int resultCode = conn.getResponseCode();
897         logger.debug("getClusterHealth() response Code : " + resultCode);
898         shutdownConnection(conn);
899         return new OperationResultBuilder().resultCode(resultCode).build();
900     }
901
902     private void shutdownConnection(HttpURLConnection connection) {
903         if (connection == null) {
904             return;
905         }
906
907         final String methodName = "shutdownConnection";
908         InputStream inputstream = null;
909         OutputStream outputstream = null;
910
911         try {
912             inputstream = connection.getInputStream();
913         } catch (IOException e) {
914             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
915         } finally {
916             if (inputstream != null) {
917                 try {
918                     inputstream.close();
919                 } catch (IOException e) {
920                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
921                 }
922             }
923         }
924
925         try {
926             outputstream = connection.getOutputStream();
927         } catch (IOException e) {
928             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
929         } finally {
930             if (outputstream != null) {
931                 try {
932                     outputstream.close();
933                 } catch (IOException e) {
934                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
935                 }
936             }
937         }
938
939         connection.disconnect();
940     }
941
942     /**
943      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
944      *
945      * @param request - The request to be performed.
946      * @param sb - The string builder to append the json data to
947      * @throws DocumentStoreOperationException
948      */
949     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
950             throws DocumentStoreOperationException {
951
952         boolean retVal = true;
953         // What kind of operation are we performing?
954         switch (request.getOperationType()) {
955
956             // Create a new document.
957             case CREATE:
958
959                 // Make sure that we were supplied a document payload.
960                 if (request.getOperation().getDocument() == null) {
961
962                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
963                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
964                     return false;
965                 }
966
967                 // Make sure that the supplied document URL is formatted
968                 // correctly.
969                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
970                     fails.add(generateRejectionEntry(request.getOperationType(),
971                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
972                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
973                     return false;
974                 }
975
976                 // Validate that the specified index actually exists before we
977                 // try to perform the create.
978                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
979
980                     fails.add(generateRejectionEntry(request.getOperationType(),
981                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
982                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
983                     return false;
984                 }
985
986                 // If we were supplied an id for the new document, then
987                 // include it in the bulk operation to Elastic Search
988                 if (request.getId() == null) {
989
990                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
991
992                     // Otherwise, we just leave that parameter off and ElasticSearch
993                     // will generate one for us.
994                 } else {
995                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
996                             request.getId()));
997                 }
998
999                 try {
1000                     // Append the document that we want to create.
1001                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1002                 } catch (JsonProcessingException e) {
1003                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1004                 }
1005
1006                 break;
1007
1008             // Update an existing document.
1009             case UPDATE:
1010
1011                 // Make sure that we were supplied a document payload.
1012                 if (request.getOperation().getDocument() == null) {
1013
1014                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1015                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1016                     return false;
1017                 }
1018
1019                 // Make sure that the supplied document URL is formatted
1020                 // correctly.
1021                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1022                     fails.add(generateRejectionEntry(request.getOperationType(),
1023                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1024                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1025                     return false;
1026                 }
1027
1028                 // Validate that the specified index actually exists before we
1029                 // try to perform the update.
1030                 if (!indexExists(request.getIndex())) {
1031
1032                     fails.add(generateRejectionEntry(request.getOperationType(),
1033                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1034                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1035                     return false;
1036                 }
1037
1038                 // Validate that the document we are trying to update actually
1039                 // exists before we try to perform the update.
1040                 if (!documentExists(request.getIndex(), request.getId())) {
1041
1042                     fails.add(generateRejectionEntry(request.getOperationType(),
1043                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1044                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1045                     return false;
1046                 }
1047
1048                 // It is mandatory that a version be supplied for an update operation,
1049                 // so validate that now.
1050                 if (request.getOperation().getMetaData().getEtag() == null) {
1051
1052                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1053                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1054                     return false;
1055                 }
1056
1057                 // Generate the update request...
1058                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1059                         request.getOperation().getMetaData().getEtag()));
1060
1061                 // ...and append the document that we want to update.
1062                 try {
1063                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1064                 } catch (JsonProcessingException e) {
1065                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1066                 }
1067                 break;
1068
1069             // Delete an existing document.
1070             case DELETE:
1071
1072                 // Make sure that the supplied document URL is formatted
1073                 // correctly.
1074                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1075                     fails.add(generateRejectionEntry(request.getOperationType(),
1076                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1077                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1078                     return false;
1079                 }
1080
1081                 // Validate that the specified index actually exists before we
1082                 // try to perform the delete.
1083                 if (!indexExists(request.getIndex())) {
1084
1085                     fails.add(generateRejectionEntry(request.getOperationType(),
1086                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1087                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1088                     return false;
1089                 }
1090
1091                 // Validate that the document we are trying to update actually
1092                 // exists before we try to perform the delete.
1093                 if (!documentExists(request.getIndex(), request.getId())) {
1094
1095                     fails.add(generateRejectionEntry(request.getOperationType(),
1096                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1097                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1098                     return false;
1099                 }
1100
1101                 // It is mandatory that a version be supplied for a delete operation,
1102                 // so validate that now.
1103                 if (request.getOperation().getMetaData().getEtag() == null) {
1104
1105                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1106                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1107                     return false;
1108                 }
1109
1110                 // Generate the delete request.
1111                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1112                         request.getOperation().getMetaData().getEtag()));
1113                 break;
1114             default:
1115         }
1116
1117         return retVal;
1118     }
1119
1120     private boolean indexExists(String index) throws DocumentStoreOperationException {
1121         return isSuccess(checkIndexExistence(index));
1122     }
1123
1124     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1125         return isSuccess(checkDocumentExistence(index, id));
1126     }
1127
1128     /**
1129      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1130      * document store.
1131      *
1132      * @param rejectReason - A message describing why the operation was rejected.
1133      * @param anId - The identifier associated with the document being acted on.
1134      * @param statusCode - An HTTP status code.
1135      * @return - A result set item.
1136      */
1137     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1138             String anId, int statusCode, String originalUrl) {
1139
1140         ElasticSearchError err = new ElasticSearchError();
1141         err.setReason(rejectReason);
1142
1143         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1144         op.setIndex(index);
1145         op.setId(anId);
1146         op.setStatus(statusCode);
1147         op.setError(err);
1148         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1149
1150         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1151
1152         switch (opType) {
1153             case CREATE:
1154                 rejectionResult.setCreate(op);
1155                 break;
1156             case UPDATE:
1157                 rejectionResult.setIndex(op);
1158                 break;
1159             case DELETE:
1160                 rejectionResult.setDelete(op);
1161                 break;
1162             default:
1163         }
1164
1165         return rejectionResult;
1166     }
1167
1168     /**
1169      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1170      * marshals it into a Java object.
1171      *
1172      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1173      * @return - The marshalled response.
1174      * @throws JsonParseException
1175      * @throws JsonMappingException
1176      * @throws IOException
1177      */
1178     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1179         if (jsonResult != null) {
1180             if (logger.isDebugEnabled()) {
1181                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1182             }
1183
1184             ObjectMapper mapper = new ObjectMapper();
1185             mapper.setSerializationInclusion(Include.NON_EMPTY);
1186
1187             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1188         }
1189
1190         return null;
1191     }
1192
1193     /**
1194      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1195      *
1196      * @param esResult - ElasticSearch bulk operations response.
1197      * @return - A generic result set.
1198      */
1199     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1200             List<ElasticSearchResultItem> rejectedOps) {
1201         int totalOps = 0;
1202         int totalSuccess = 0;
1203         int totalFails = 0;
1204
1205         if (logger.isDebugEnabled()) {
1206
1207             logger.debug("ESController: Build generic result set.  ES Results: "
1208                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1209         }
1210
1211         // Build a combined list of result items from the results returned
1212         // from ElasticSearch and the list of operations that we rejected
1213         // without sending to ElasticSearch.
1214         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1215         if (esResult != null) {
1216             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1217         }
1218         combinedResults.addAll(rejectedOps);
1219
1220         // Iterate over the individual results in the resulting result set.
1221         StringBuilder resultsBuilder = new StringBuilder();
1222         AtomicBoolean firstItem = new AtomicBoolean(true);
1223         for (ElasticSearchResultItem item : combinedResults) {
1224
1225             // Increment the operation counts.
1226             totalOps++;
1227             if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) {
1228                 totalSuccess++;
1229             } else {
1230                 totalFails++;
1231             }
1232
1233             // Prepend a comma to our response string unless this it the
1234             // first result in the set.
1235             if (!firstItem.compareAndSet(true, false)) {
1236                 resultsBuilder.append(", ");
1237             }
1238
1239             // Append the current result as a generic json structure.
1240             resultsBuilder.append(item.toJson());
1241         }
1242
1243         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1244                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1245     }
1246
1247     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1248         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1249     }
1250
1251     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1252             throws DocumentStoreOperationException {
1253         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1254     }
1255
1256     private void buildDocumentResult(DocumentOperationResult result, String index)
1257             throws DocumentStoreOperationException {
1258
1259         JSONParser parser = new JSONParser();
1260         JSONObject root;
1261         try {
1262             root = (JSONObject) parser.parse(result.getResult());
1263             if (isSuccess(result)) {
1264                 // Success response object
1265                 Document doc = new Document();
1266                 doc.setEtag(result.getResultVersion());
1267                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1268
1269                 doc.setContent((JSONObject) root.get("_source"));
1270                 result.setDocument(doc);
1271
1272             } else {
1273                 // Error response object
1274                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1275                 if (error != null) {
1276                     result.setError(
1277                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1278                 }
1279
1280             }
1281         } catch (Exception e) {
1282             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1283         }
1284     }
1285
1286     private String buildDocumentResponseUrl(String index, String id) {
1287         return ApiUtils.buildDocumentUri(index, id);
1288     }
1289
1290     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1291         JSONParser parser = new JSONParser();
1292         JSONObject root;
1293
1294         try {
1295             root = (JSONObject) parser.parse(result.getResult());
1296             if (isSuccess(result)) {
1297                 JSONObject hits = (JSONObject) root.get("hits");
1298                 JSONArray hitArray = (JSONArray) hits.get("hits");
1299                 SearchHits searchHits = new SearchHits();
1300                 searchHits.setTotalHits(hits.get("total").toString());
1301                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1302
1303                 for (int i = 0; i < hitArray.size(); i++) {
1304                     JSONObject hit = (JSONObject) hitArray.get(i);
1305                     SearchHit searchHit = new SearchHit();
1306                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1307                     Document doc = new Document();
1308                     if (hit.get(JSON_ATTR_VERSION) != null) {
1309                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1310                     }
1311
1312                     doc.setUrl(
1313                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1314                     doc.setContent((JSONObject) hit.get("_source"));
1315                     searchHit.setDocument(doc);
1316                     searchHitArray.add(searchHit);
1317                 }
1318                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1319                 result.setSearchResult(searchHits);
1320
1321                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1322                 if (aggregations != null) {
1323                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1324                     AggregationResults aggs = new AggregationResults();
1325                     aggs.setAggregations(aggResults);
1326                     result.setAggregationResult(aggs);
1327                 }
1328
1329                 // success
1330             } else {
1331                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1332                 if (error != null) {
1333                     result.setError(
1334                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1335                 }
1336             }
1337         } catch (Exception e) {
1338             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1339         }
1340     }
1341
1342     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1343         JSONParser parser = new JSONParser();
1344         JSONObject root;
1345         try {
1346             root = (JSONObject) parser.parse(result.getResult());
1347             if (isSuccess(result)) {
1348                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1349                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1350                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1351                 SuggestHits suggestHits = new SuggestHits();
1352                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1353
1354                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1355
1356                 for (int i = 0; i < optionsArray.size(); i++) {
1357                     JSONObject hit = (JSONObject) optionsArray.get(i);
1358
1359                     SuggestHit suggestHit = new SuggestHit();
1360                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1361                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1362                     Document doc = new Document();
1363                     if (hit.get(JSON_ATTR_VERSION) != null) {
1364                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1365                     }
1366                     doc.setUrl(
1367                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1368
1369                     doc.setContent((JSONObject) hit.get("payload"));
1370                     suggestHit.setDocument(doc);
1371                     suggestHitArray.add(suggestHit);
1372                 }
1373                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1374                 result.setSuggestResult(suggestHits);
1375
1376                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1377                 if (aggregations != null) {
1378                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1379                     AggregationResults aggs = new AggregationResults();
1380                     aggs.setAggregations(aggResults);
1381                     result.setAggregationResult(aggs);
1382                 }
1383
1384                 // success
1385             } else {
1386                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1387                 if (error != null) {
1388                     result.setError(
1389                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1390                 }
1391             }
1392         } catch (Exception e) {
1393             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1394         }
1395     }
1396
1397     /**
1398      * Record the timing of the operation in the metrics log.
1399      *
1400      */
1401     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1402             String... args) {
1403         metricsLogger.info(message,
1404                 new LogFields() //
1405                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1406                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult())
1407                         .setField(LogLine.DefinedFields.SERVER_IP, "ElasticHost-"+config.getIpAddress()),
1408                 override, args);
1409     }
1410 }