c4a52b44199a0134420212ecc934f4814631e873
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Optional;
53 import java.util.Properties;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import javax.ws.rs.HttpMethod;
56 import javax.ws.rs.core.MediaType;
57 import javax.ws.rs.core.Response.Status;
58 import javax.ws.rs.core.UriBuilder;
59 import org.eclipse.jetty.http.HttpStatus;
60 import org.json.simple.JSONArray;
61 import org.json.simple.JSONObject;
62 import org.json.simple.parser.JSONParser;
63 import org.json.simple.parser.ParseException;
64 import org.onap.aai.cl.api.LogFields;
65 import org.onap.aai.cl.api.LogLine;
66 import org.onap.aai.cl.api.Logger;
67 import org.onap.aai.cl.eelf.LoggerFactory;
68 import org.onap.aai.cl.mdc.MdcContext;
69 import org.onap.aai.cl.mdc.MdcOverride;
70 import org.onap.aai.sa.rest.AnalysisConfiguration;
71 import org.onap.aai.sa.rest.ApiUtils;
72 import org.onap.aai.sa.rest.BulkRequest;
73 import org.onap.aai.sa.rest.BulkRequest.OperationType;
74 import org.onap.aai.sa.rest.DocumentSchema;
75 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
76 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
77 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage;
78 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
80 import org.onap.aai.sa.searchdbabstraction.entity.Document;
81 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
83 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
84 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
85 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
86 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
87 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
88 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
89 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
90 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
91 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
92 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
93 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
94 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
95 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
96
97 /**
98  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
99  */
100 public class ElasticSearchHttpController implements DocumentStoreInterface {
101
102     private static ElasticSearchHttpController instance = null;
103
104     private static final Logger logger =
105             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
106     private static final Logger metricsLogger =
107             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
108
109     private static final String URL_QUERY_VERSION = "version=";
110
111     private static final String JSON_ATTR_VERSION = "_version";
112     private static final String JSON_ATTR_ERROR = "error";
113     private static final String JSON_ATTR_REASON = "reason";
114
115     private static final String DEFAULT_TYPE = "default";
116
117     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
118     private static final String MSG_RESPONSE_CODE = "Response Code : ";
119     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
120
121     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
122
123     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
124             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
125     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
126             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
127     private static final String BULK_IMPORT_INDEX_TEMPLATE =
128             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
129     private static final String BULK_DELETE_TEMPLATE =
130             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
131     public final static String APPLICATION_XND_JSON_TYPE = new MediaType("application", "x-ndjson").toString();
132
133     private final ElasticSearchConfig config;
134
135     protected AnalysisConfiguration analysisConfig;
136
137     public ElasticSearchHttpController(ElasticSearchConfig config) {
138         this.config = config;
139         analysisConfig = new AnalysisConfiguration();
140
141         String rootUrl = null;
142         try {
143             if ("https".equals(config.getUriScheme())) {
144                 new ElasticSearchHttpsController(config);
145             }
146             rootUrl = buildUrl(createUriBuilder("")).toString();
147             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl);
148             checkConnection();
149             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl);
150         } catch (Exception e) {
151             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage());
152         }
153     }
154
155     public static ElasticSearchHttpController getInstance() {
156         synchronized (ElasticSearchHttpController.class) {
157             if (instance == null) {
158                 Properties properties = new Properties();
159                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
160                 try {
161                     properties.load(new FileInputStream(file));
162                 } catch (Exception e) {
163                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
164                             e.getLocalizedMessage());
165                 }
166
167                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
168                 instance = new ElasticSearchHttpController(config);
169             }
170         }
171
172         return instance;
173     }
174
175     public AnalysisConfiguration getAnalysisConfig() {
176         return analysisConfig;
177     }
178
179     public ElasticSearchConfig getElasticSearchConfig() {
180         return config;
181     }
182
183     @Override
184     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
185         try {
186             // Submit the request to ElasticSearch to create the index using a default document type.
187             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
188                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
189
190             // ElasticSearch will return us a 200 code on success when we
191             // want to report a 201, so translate the result here.
192             if (result.getResultCode() == Status.OK.getStatusCode()) {
193                 result.setResultCode(Status.CREATED.getStatusCode());
194             }
195
196             if (isSuccess(result)) {
197                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
198             }
199             return result;
200         } catch (DocumentStoreOperationException | IOException e) {
201             return new OperationResultBuilder().useDefaults()
202                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
203         }
204     }
205
206     @Override
207     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
208         try {
209             OperationResult result = createTable(index, dynamicSchema);
210
211             // ElasticSearch will return us a 200 code on success when we
212             // want to report a 201, so translate the result here.
213             if (result.getResultCode() == Status.OK.getStatusCode()) {
214                 result.setResultCode(Status.CREATED.getStatusCode());
215             }
216             if (isSuccess(result)) {
217                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
218             }
219             return result;
220         } catch (DocumentStoreOperationException e) {
221             return new OperationResultBuilder().useDefaults()
222                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
223         }
224     }
225
226     @Override
227     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
228         MdcOverride override = getStartTime(new MdcOverride());
229
230         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE);
231         OperationResult opResult = handleResponse(conn);
232         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
233         shutdownConnection(conn);
234
235         return opResult;
236     }
237
238     // @Override
239     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
240             throws DocumentStoreOperationException {
241         if (indexSettings == null) {
242             logger.debug("No settings provided.");
243         }
244
245         if (indexMappings == null) {
246             logger.debug("No mappings provided.");
247         }
248
249         MdcOverride override = getStartTime(new MdcOverride());
250
251         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
252
253         StringBuilder sb = new StringBuilder(128);
254         sb.append("{ \"settings\" : ");
255         sb.append(indexSettings);
256         sb.append(",");
257
258         sb.append("\"mappings\" : {");
259         sb.append("\"" + typeName + "\" :");
260         sb.append(indexMappings);
261         sb.append("}}");
262
263         try {
264             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
265         } catch (IOException e) {
266             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
267             throw new DocumentStoreOperationException(e.getMessage(), e);
268         }
269
270         logger.debug("Request content: " + sb);
271
272         OperationResult opResult = handleResponse(conn);
273         shutdownConnection(conn);
274         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
275
276         return opResult;
277     }
278
279     /**
280      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
281      * index.
282      *
283      * @param indexName - The name of the index to be created
284      * @param settingsAndMappings - The actual JSON object that will define the index
285      * @return - The operation result of writing into Elasticsearch
286      * @throws DocumentStoreOperationException
287      */
288     protected OperationResult createTable(String indexName, String settingsAndMappings)
289             throws DocumentStoreOperationException {
290         MdcOverride override = getStartTime(new MdcOverride());
291
292         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
293         try {
294             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
295         } catch (IOException e) {
296             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
297             throw new DocumentStoreOperationException(e.getMessage());
298         }
299
300         OperationResult result = handleResponse(conn);
301         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
302
303         return result;
304     }
305
306     @Override
307     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
308             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
309
310         if (!allowImplicitIndexCreation) {
311             // Before we do anything, make sure that the specified index actually exists in the
312             // document store - we don't want to rely on ElasticSearch to fail the document
313             // create because it could be configured to implicitly create a non-existent index,
314             // which can lead to hard-to-debug behaviour with queries down the road.
315             OperationResult indexExistsResult = checkIndexExistence(indexName);
316             if (!isSuccess(indexExistsResult)) {
317                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
318                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
319                         .result(resultMsg).failureCause(resultMsg).build();
320             }
321         }
322
323         if (document.getId() == null || document.getId().isEmpty()) {
324             return createDocumentWithoutId(indexName, document);
325         } else {
326             return createDocumentWithId(indexName, document);
327         }
328     }
329
330     @Override
331     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
332             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
333         if (!allowImplicitIndexCreation) {
334             // Before we do anything, make sure that the specified index actually exists in the
335             // document store - we don't want to rely on ElasticSearch to fail the document
336             // create because it could be configured to implicitly create a non-existent index,
337             // which can lead to hard-to-debug behaviour with queries down the road.
338             OperationResult indexExistsResult = checkIndexExistence(indexName);
339             if (!isSuccess(indexExistsResult)) {
340                 DocumentOperationResult opResult = new DocumentOperationResult();
341                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
342                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
343                 opResult.setResult(resultMsg);
344                 opResult.setFailureCause(resultMsg);
345                 return opResult;
346             }
347         }
348
349         MdcOverride override = getStartTime(new MdcOverride());
350
351         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
352                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
353
354         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
355         attachDocument(conn, document);
356
357         DocumentOperationResult opResult = getOperationResult(conn);
358         buildDocumentResult(opResult, indexName);
359
360         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
361
362         shutdownConnection(conn);
363
364         return opResult;
365     }
366
367     @Override
368     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
369             throws DocumentStoreOperationException {
370         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
371                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
372
373         MdcOverride override = getStartTime(new MdcOverride());
374
375         HttpURLConnection conn = createConnection(url, HttpMethod.DELETE);
376
377         DocumentOperationResult opResult = getOperationResult(conn);
378         buildDocumentResult(opResult, indexName);
379         // supress the etag and url in response for delete as they are not required
380         if (opResult.getDocument() != null) {
381             opResult.getDocument().setEtag(null);
382             opResult.getDocument().setUrl(null);
383         }
384
385         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
386
387         shutdownConnection(conn);
388
389         return opResult;
390     }
391
392     @Override
393     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
394             throws DocumentStoreOperationException {
395         final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId());
396         if (document.getVersion() != null) {
397             uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion());
398         }
399
400         MdcOverride override = getStartTime(new MdcOverride());
401         HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET");
402
403         DocumentOperationResult opResult = getOperationResult(conn);
404         buildDocumentResult(opResult, indexName);
405
406         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
407
408         shutdownConnection(conn);
409
410         return opResult;
411     }
412
413     @Override
414     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
415         final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString));
416
417         MdcOverride override = getStartTime(new MdcOverride());
418
419         HttpURLConnection conn = createConnection(url, "GET");
420         SearchOperationResult opResult = getSearchOperationResult(conn);
421         buildSearchResult(opResult, indexName);
422
423         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
424
425         return opResult;
426     }
427
428     @Override
429     public SearchOperationResult searchWithPayload(String indexName, String query)
430             throws DocumentStoreOperationException {
431         if (logger.isDebugEnabled()) {
432             logger.debug("Querying index: " + indexName + " with query string: " + query);
433         }
434         final URL url = buildUrl(createUriBuilder(indexName, "_search"));
435
436         MdcOverride override = getStartTime(new MdcOverride());
437
438         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
439         attachContent(conn, query);
440         logger.debug("Request body =  Elasticsearch query = " + query);
441
442         SearchOperationResult opResult = getSearchOperationResult(conn);
443         buildSearchResult(opResult, indexName);
444
445         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
446
447         shutdownConnection(conn);
448
449         return opResult;
450     }
451
452     @Override
453     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
454             throws DocumentStoreOperationException {
455         if (logger.isDebugEnabled()) {
456             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
457         }
458
459         MdcOverride override = getStartTime(new MdcOverride());
460         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST);
461         attachContent(conn, query);
462
463         logger.debug("Request body =  Elasticsearch query = " + query);
464
465         SearchOperationResult opResult = getSearchOperationResult(conn);
466         buildSuggestResult(opResult, indexName);
467
468         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
469
470         shutdownConnection(conn);
471
472         return opResult;
473     }
474
475     @Override
476     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
477         if (logger.isDebugEnabled()) {
478             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
479
480             for (BulkRequest request : requests) {
481                 dbgString.append("[").append(request).append("] ");
482             }
483
484             logger.debug(dbgString.toString());
485         }
486
487         MdcOverride override = getStartTime(new MdcOverride());
488
489         // Parse the supplied set of operations.
490         // Iterate over the list of operations which we were provided and
491         // translate them into a format that ElasticSearh understands.
492         int opCount = 0;
493         StringBuilder esOperationSet = new StringBuilder(128);
494         List<ElasticSearchResultItem> rejected = new ArrayList<>();
495         for (BulkRequest request : requests) {
496
497             // Convert the request to the syntax ElasticSearch likes.
498             if (buildEsOperation(request, esOperationSet, rejected)) {
499                 opCount++;
500             }
501         }
502
503         ElasticSearchBulkOperationResult opResult = null;
504         if (opCount > 0) {
505             HttpURLConnection conn;
506             try {
507                 conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection();
508                 conn.setRequestMethod(HttpMethod.PUT);
509                 conn.setDoOutput(true);
510                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_XND_JSON_TYPE);
511                 if(config.useAuth()){
512                         conn.setRequestProperty("Authorization", config.getAuthValue());
513                 }
514                 conn.setRequestProperty("Connection", "Close");
515
516             } catch (IOException e) {
517
518                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
519                 if (logger.isDebugEnabled()) {
520                     logger.debug(Throwables.getStackTraceAsString(e));
521                 }
522
523                 throw new DocumentStoreOperationException(
524                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
525             }
526
527             StringBuilder bulkResult = new StringBuilder(128);
528             try {
529                 // Create an output stream to write our request to.
530                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
531
532                 if (logger.isDebugEnabled()) {
533                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
534                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
535                 }
536
537                 // Write the resulting request string to our output stream. (this sends the request to ES?)
538                 out.write(esOperationSet.toString());
539                 out.close();
540
541                 // Open an input stream on our connection in order to read back the results.
542                 InputStream is = conn.getInputStream();
543                 InputStreamReader inputstreamreader = new InputStreamReader(is);
544                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
545
546                 // Read the contents of the input stream into our result string...
547                 String esResponseString = null;
548
549                 while ((esResponseString = bufferedreader.readLine()) != null) {
550                     bulkResult.append(esResponseString).append("\n");
551                 }
552
553             } catch (IOException e) {
554
555                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
556                 if (logger.isDebugEnabled()) {
557                     StringWriter sw = new StringWriter();
558                     e.printStackTrace(new PrintWriter(sw));
559                     logger.debug(sw.toString());
560                 }
561
562                 throw new DocumentStoreOperationException(
563                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
564             }
565
566             if (logger.isDebugEnabled()) {
567                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
568             }
569
570             // ...and marshal the resulting string into a Java object.
571             try {
572                 opResult = marshallEsBulkResult(bulkResult.toString());
573
574             } catch (IOException e) {
575                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
576                 if (logger.isDebugEnabled()) {
577                     logger.debug(Throwables.getStackTraceAsString(e));
578                 }
579
580                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
581                         e);
582             }
583         }
584
585         OperationResult result = new OperationResultBuilder() //
586                 .resultCode(HttpStatus.MULTI_STATUS_207) //
587                 .result(buildGenericBulkResultSet(opResult, rejected)) //
588                 .build();
589
590         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
591         // it.
592         String resultStringForMetricsLog = result.getResult();
593         if (isSuccess(result)) {
594             resultStringForMetricsLog =
595                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
596         }
597
598         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
599                 new LogFields() //
600                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
601                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
602                 override);
603
604         return result;
605     }
606
607
608     /**
609      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
610      *
611      * @param indexName - The index to look for.
612      * @return - An operation result indicating the success or failure of the check.
613      * @throws DocumentStoreOperationException
614      */
615     private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
616         MdcOverride override = getStartTime(new MdcOverride());
617
618         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD);
619         int resultCode;
620         try {
621             resultCode = conn.getResponseCode();
622         } catch (IOException ex) {
623             shutdownConnection(conn);
624             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
625         }
626         logger.debug(MSG_RESPONSE_CODE + resultCode);
627
628         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
629         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
630         shutdownConnection(conn);
631
632         return opResult;
633     }
634
635     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
636             throws DocumentStoreOperationException {
637         // check if the document already exists
638         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
639
640         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
641             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
642                 opResult.setFailureCause("A document with the same id already exists.");
643             } else {
644                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
645             }
646             opResult.setResultCode(Status.CONFLICT.getStatusCode());
647             return opResult;
648         }
649
650         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()));
651         MdcOverride override = getStartTime(new MdcOverride());
652
653         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
654         attachDocument(conn, document);
655
656         opResult = getOperationResult(conn);
657         buildDocumentResult(opResult, indexName);
658
659         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
660
661         shutdownConnection(conn);
662
663         return opResult;
664     }
665
666     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
667             throws DocumentStoreOperationException {
668         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE));
669
670         MdcOverride override = getStartTime(new MdcOverride());
671         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
672         attachDocument(conn, document);
673
674         DocumentOperationResult response = getOperationResult(conn);
675         buildDocumentResult(response, indexName);
676
677         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
678
679         shutdownConnection(conn);
680
681         return response;
682     }
683
684     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
685             throws DocumentStoreOperationException {
686         conn.setRequestProperty("Connection", "Close");
687         attachContent(conn, doc.getContentInJson());
688     }
689
690     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
691             throws DocumentStoreOperationException {
692         MdcOverride override = getStartTime(new MdcOverride());
693         HttpURLConnection conn =
694                 createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD);
695         int resultCode;
696         try {
697             resultCode = conn.getResponseCode();
698         } catch (IOException ex) {
699             shutdownConnection(conn);
700             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
701         }
702
703         logger.debug(MSG_RESPONSE_CODE + resultCode);
704
705         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
706                 .useDefaults().resultCode(resultCode).build();
707
708         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
709         shutdownConnection(conn);
710
711         return opResult;
712     }
713
714     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
715         OutputStream outputStream = null;
716         OutputStreamWriter out = null;
717
718         try {
719             outputStream = conn.getOutputStream();
720         } catch (IOException e) {
721             shutdownConnection(conn);
722             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
723         }
724
725         out = new OutputStreamWriter(outputStream);
726
727         try {
728             out.write(content);
729             out.close();
730         } catch (IOException e) {
731             shutdownConnection(conn);
732             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
733         }
734     }
735
736     private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException {
737         HttpURLConnection conn = null;
738         try {
739             conn = (HttpURLConnection) url.openConnection();
740             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
741             conn.setDoOutput(true);
742             if (config.useAuth()) {
743                 conn.setRequestProperty("Authorization", config.getAuthValue());
744             }
745         } catch (IOException e) {
746             shutdownConnection(conn);
747             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
748         }
749
750         return conn;
751     }
752
753     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
754         return handleResponse(conn, new OperationResultBuilder().useDefaults());
755     }
756
757     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
758             throws DocumentStoreOperationException {
759         int resultCode;
760
761         try {
762             resultCode = conn.getResponseCode();
763         } catch (IOException ex) {
764             shutdownConnection(conn);
765             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
766         }
767
768         logger.debug(MSG_RESPONSE_CODE + resultCode);
769
770         InputStream inputStream = null;
771
772         if (!ApiUtils.isSuccessStatusCode(resultCode)) {
773             inputStream = conn.getErrorStream();
774         } else {
775             try {
776                 inputStream = conn.getInputStream();
777             } catch (IOException e) {
778                 shutdownConnection(conn);
779                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
780             }
781         }
782
783         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
784         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
785
786         StringBuilder result = new StringBuilder(128);
787         String string = null;
788
789         try {
790             while ((string = bufferedreader.readLine()) != null) {
791                 result.append(string).append("\n");
792             }
793         } catch (IOException e) {
794             shutdownConnection(conn);
795             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
796         }
797
798         if (resultCode == Status.CONFLICT.getStatusCode()) {
799             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
800         } else {
801             rb.resultCode(resultCode);
802         }
803         if (logger.isDebugEnabled()) {
804             logger.debug("Raw result string from ElasticSearch = " + result.toString());
805         }
806         rb.result(result.toString());
807         rb.resultVersion(extractVersion(result.toString()));
808         return rb.build();
809     }
810
811     private String extractVersion(String result) {
812         JSONParser parser = new JSONParser();
813         String version = null;
814         try {
815             JSONObject root = (JSONObject) parser.parse(result);
816             if (root.get(JSON_ATTR_VERSION) != null) {
817                 version = root.get(JSON_ATTR_VERSION).toString();
818             }
819         } catch (ParseException e) {
820             // Not all responses from ElasticSearch include a version, so
821             // if we don't get one back, just return an empty string rather
822             // than trigger a false failure.
823             version = "";
824         }
825         return version;
826     }
827
828     /**
829      * This convenience method gets the current system time and stores it in an attribute in the supplied
830      * {@link MdcOverride} object so that it can be used later by the metrics logger.
831      *
832      * @param override - The {@link MdcOverride} object to update.
833      * @return - The supplied {@link MdcOverride} object.
834      */
835     private MdcOverride getStartTime(MdcOverride override) {
836
837         // Grab the current time...
838         long startTimeInMs = System.currentTimeMillis();
839
840         // ...and add it as an attribute to the supplied MDC Override object.
841         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
842         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
843
844         // Return the MdcOverride object that we were passed.
845         // This looks odd, but it allows us to do stuff like:
846         //
847         // MdcOverride ov = getStartTime(new MdcOverride())
848         //
849         // which is quite handy, but also allows us to pass in an existing
850         // MdcOverride object which already has some attributes set.
851         return override;
852     }
853
854     private boolean isSuccess(OperationResult result) {
855         return ApiUtils.isSuccessStatusCode(result.getResultCode());
856     }
857
858     private UriBuilder createUriBuilder(String path, String... paths) {
859         UriBuilder builder = UriBuilder.fromPath(path);
860         for (String other : paths) {
861             builder.path(other);
862         }
863         builder.host(config.getIpAddress());
864         String port = Optional.ofNullable(config.getHttpPort()).orElse("0");
865         builder.port(Integer.valueOf(port));
866         builder.scheme(config.getUriScheme());
867         return builder;
868     }
869
870     private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException {
871         try {
872             return builder.build().toURL();
873         } catch (MalformedURLException e) {
874             logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage());
875             throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e);
876         }
877     }
878
879     private HttpURLConnection createConnection(final URL url, final String method)
880             throws DocumentStoreOperationException {
881         HttpURLConnection conn = initializeConnection(url);
882         try {
883             logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL());
884             conn.setRequestMethod(method);
885         } catch (ProtocolException e) {
886             shutdownConnection(conn);
887             throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method);
888         }
889         return conn;
890     }
891
892     private OperationResult checkConnection() throws IOException, DocumentStoreOperationException {
893         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET);
894         int resultCode = conn.getResponseCode();
895         logger.debug("getClusterHealth() response Code : " + resultCode);
896         shutdownConnection(conn);
897         return new OperationResultBuilder().resultCode(resultCode).build();
898     }
899
900     private void shutdownConnection(HttpURLConnection connection) {
901         if (connection == null) {
902             return;
903         }
904
905         final String methodName = "shutdownConnection";
906         InputStream inputstream = null;
907         OutputStream outputstream = null;
908
909         try {
910             inputstream = connection.getInputStream();
911         } catch (IOException e) {
912             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
913         } finally {
914             if (inputstream != null) {
915                 try {
916                     inputstream.close();
917                 } catch (IOException e) {
918                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
919                 }
920             }
921         }
922
923         try {
924             outputstream = connection.getOutputStream();
925         } catch (IOException e) {
926             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
927         } finally {
928             if (outputstream != null) {
929                 try {
930                     outputstream.close();
931                 } catch (IOException e) {
932                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
933                 }
934             }
935         }
936
937         connection.disconnect();
938     }
939
940     /**
941      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
942      *
943      * @param request - The request to be performed.
944      * @param sb - The string builder to append the json data to
945      * @throws DocumentStoreOperationException
946      */
947     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
948             throws DocumentStoreOperationException {
949
950         boolean retVal = true;
951         // What kind of operation are we performing?
952         switch (request.getOperationType()) {
953
954             // Create a new document.
955             case CREATE:
956
957                 // Make sure that we were supplied a document payload.
958                 if (request.getOperation().getDocument() == null) {
959
960                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
961                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
962                     return false;
963                 }
964
965                 // Make sure that the supplied document URL is formatted
966                 // correctly.
967                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
968                     fails.add(generateRejectionEntry(request.getOperationType(),
969                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
970                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
971                     return false;
972                 }
973
974                 // Validate that the specified index actually exists before we
975                 // try to perform the create.
976                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
977
978                     fails.add(generateRejectionEntry(request.getOperationType(),
979                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
980                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
981                     return false;
982                 }
983
984                 // If we were supplied an id for the new document, then
985                 // include it in the bulk operation to Elastic Search
986                 if (request.getId() == null) {
987
988                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
989
990                     // Otherwise, we just leave that parameter off and ElasticSearch
991                     // will generate one for us.
992                 } else {
993                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
994                             request.getId()));
995                 }
996
997                 try {
998                     // Append the document that we want to create.
999                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1000                 } catch (JsonProcessingException e) {
1001                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1002                 }
1003
1004                 break;
1005
1006             // Update an existing document.
1007             case UPDATE:
1008
1009                 // Make sure that we were supplied a document payload.
1010                 if (request.getOperation().getDocument() == null) {
1011
1012                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1013                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1014                     return false;
1015                 }
1016
1017                 // Make sure that the supplied document URL is formatted
1018                 // correctly.
1019                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1020                     fails.add(generateRejectionEntry(request.getOperationType(),
1021                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1022                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1023                     return false;
1024                 }
1025
1026                 // Validate that the specified index actually exists before we
1027                 // try to perform the update.
1028                 if (!indexExists(request.getIndex())) {
1029
1030                     fails.add(generateRejectionEntry(request.getOperationType(),
1031                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1032                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1033                     return false;
1034                 }
1035
1036                 // Validate that the document we are trying to update actually
1037                 // exists before we try to perform the update.
1038                 if (!documentExists(request.getIndex(), request.getId())) {
1039
1040                     fails.add(generateRejectionEntry(request.getOperationType(),
1041                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1042                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1043                     return false;
1044                 }
1045
1046                 // It is mandatory that a version be supplied for an update operation,
1047                 // so validate that now.
1048                 if (request.getOperation().getMetaData().getEtag() == null) {
1049
1050                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1051                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1052                     return false;
1053                 }
1054
1055                 // Generate the update request...
1056                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1057                         request.getOperation().getMetaData().getEtag()));
1058
1059                 // ...and append the document that we want to update.
1060                 try {
1061                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1062                 } catch (JsonProcessingException e) {
1063                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1064                 }
1065                 break;
1066
1067             // Delete an existing document.
1068             case DELETE:
1069
1070                 // Make sure that the supplied document URL is formatted
1071                 // correctly.
1072                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1073                     fails.add(generateRejectionEntry(request.getOperationType(),
1074                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1075                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1076                     return false;
1077                 }
1078
1079                 // Validate that the specified index actually exists before we
1080                 // try to perform the delete.
1081                 if (!indexExists(request.getIndex())) {
1082
1083                     fails.add(generateRejectionEntry(request.getOperationType(),
1084                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1085                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1086                     return false;
1087                 }
1088
1089                 // Validate that the document we are trying to update actually
1090                 // exists before we try to perform the delete.
1091                 if (!documentExists(request.getIndex(), request.getId())) {
1092
1093                     fails.add(generateRejectionEntry(request.getOperationType(),
1094                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1095                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1096                     return false;
1097                 }
1098
1099                 // It is mandatory that a version be supplied for a delete operation,
1100                 // so validate that now.
1101                 if (request.getOperation().getMetaData().getEtag() == null) {
1102
1103                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1104                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1105                     return false;
1106                 }
1107
1108                 // Generate the delete request.
1109                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1110                         request.getOperation().getMetaData().getEtag()));
1111                 break;
1112             default:
1113         }
1114
1115         return retVal;
1116     }
1117
1118     private boolean indexExists(String index) throws DocumentStoreOperationException {
1119         return isSuccess(checkIndexExistence(index));
1120     }
1121
1122     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1123         return isSuccess(checkDocumentExistence(index, id));
1124     }
1125
1126     /**
1127      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1128      * document store.
1129      *
1130      * @param rejectReason - A message describing why the operation was rejected.
1131      * @param anId - The identifier associated with the document being acted on.
1132      * @param statusCode - An HTTP status code.
1133      * @return - A result set item.
1134      */
1135     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1136             String anId, int statusCode, String originalUrl) {
1137
1138         ElasticSearchError err = new ElasticSearchError();
1139         err.setReason(rejectReason);
1140
1141         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1142         op.setIndex(index);
1143         op.setId(anId);
1144         op.setStatus(statusCode);
1145         op.setError(err);
1146         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1147
1148         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1149
1150         switch (opType) {
1151             case CREATE:
1152                 rejectionResult.setCreate(op);
1153                 break;
1154             case UPDATE:
1155                 rejectionResult.setIndex(op);
1156                 break;
1157             case DELETE:
1158                 rejectionResult.setDelete(op);
1159                 break;
1160             default:
1161         }
1162
1163         return rejectionResult;
1164     }
1165
1166     /**
1167      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1168      * marshals it into a Java object.
1169      *
1170      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1171      * @return - The marshalled response.
1172      * @throws JsonParseException
1173      * @throws JsonMappingException
1174      * @throws IOException
1175      */
1176     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1177         if (jsonResult != null) {
1178             if (logger.isDebugEnabled()) {
1179                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1180             }
1181
1182             ObjectMapper mapper = new ObjectMapper();
1183             mapper.setSerializationInclusion(Include.NON_EMPTY);
1184
1185             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1186         }
1187
1188         return null;
1189     }
1190
1191     /**
1192      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1193      *
1194      * @param esResult - ElasticSearch bulk operations response.
1195      * @return - A generic result set.
1196      */
1197     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1198             List<ElasticSearchResultItem> rejectedOps) {
1199         int totalOps = 0;
1200         int totalSuccess = 0;
1201         int totalFails = 0;
1202
1203         if (logger.isDebugEnabled()) {
1204
1205             logger.debug("ESController: Build generic result set.  ES Results: "
1206                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1207         }
1208
1209         // Build a combined list of result items from the results returned
1210         // from ElasticSearch and the list of operations that we rejected
1211         // without sending to ElasticSearch.
1212         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1213         if (esResult != null) {
1214             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1215         }
1216         combinedResults.addAll(rejectedOps);
1217
1218         // Iterate over the individual results in the resulting result set.
1219         StringBuilder resultsBuilder = new StringBuilder();
1220         AtomicBoolean firstItem = new AtomicBoolean(true);
1221         for (ElasticSearchResultItem item : combinedResults) {
1222
1223             // Increment the operation counts.
1224             totalOps++;
1225             if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) {
1226                 totalSuccess++;
1227             } else {
1228                 totalFails++;
1229             }
1230
1231             // Prepend a comma to our response string unless this it the
1232             // first result in the set.
1233             if (!firstItem.compareAndSet(true, false)) {
1234                 resultsBuilder.append(", ");
1235             }
1236
1237             // Append the current result as a generic json structure.
1238             resultsBuilder.append(item.toJson());
1239         }
1240
1241         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1242                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1243     }
1244
1245     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1246         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1247     }
1248
1249     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1250             throws DocumentStoreOperationException {
1251         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1252     }
1253
1254     private void buildDocumentResult(DocumentOperationResult result, String index)
1255             throws DocumentStoreOperationException {
1256
1257         JSONParser parser = new JSONParser();
1258         JSONObject root;
1259         try {
1260             root = (JSONObject) parser.parse(result.getResult());
1261             if (isSuccess(result)) {
1262                 // Success response object
1263                 Document doc = new Document();
1264                 doc.setEtag(result.getResultVersion());
1265                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1266
1267                 doc.setContent((JSONObject) root.get("_source"));
1268                 result.setDocument(doc);
1269
1270             } else {
1271                 // Error response object
1272                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1273                 if (error != null) {
1274                     result.setError(
1275                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1276                 }
1277
1278             }
1279         } catch (Exception e) {
1280             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1281         }
1282     }
1283
1284     private String buildDocumentResponseUrl(String index, String id) {
1285         return ApiUtils.buildDocumentUri(index, id);
1286     }
1287
1288     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1289         JSONParser parser = new JSONParser();
1290         JSONObject root;
1291
1292         try {
1293             root = (JSONObject) parser.parse(result.getResult());
1294             if (isSuccess(result)) {
1295                 JSONObject hits = (JSONObject) root.get("hits");
1296                 JSONArray hitArray = (JSONArray) hits.get("hits");
1297                 SearchHits searchHits = new SearchHits();
1298                 searchHits.setTotalHits(hits.get("total").toString());
1299                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1300
1301                 for (int i = 0; i < hitArray.size(); i++) {
1302                     JSONObject hit = (JSONObject) hitArray.get(i);
1303                     SearchHit searchHit = new SearchHit();
1304                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1305                     Document doc = new Document();
1306                     if (hit.get(JSON_ATTR_VERSION) != null) {
1307                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1308                     }
1309
1310                     doc.setUrl(
1311                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1312                     doc.setContent((JSONObject) hit.get("_source"));
1313                     searchHit.setDocument(doc);
1314                     searchHitArray.add(searchHit);
1315                 }
1316                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1317                 result.setSearchResult(searchHits);
1318
1319                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1320                 if (aggregations != null) {
1321                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1322                     AggregationResults aggs = new AggregationResults();
1323                     aggs.setAggregations(aggResults);
1324                     result.setAggregationResult(aggs);
1325                 }
1326
1327                 // success
1328             } else {
1329                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1330                 if (error != null) {
1331                     result.setError(
1332                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1333                 }
1334             }
1335         } catch (Exception e) {
1336             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1337         }
1338     }
1339
1340     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1341         JSONParser parser = new JSONParser();
1342         JSONObject root;
1343         try {
1344             root = (JSONObject) parser.parse(result.getResult());
1345             if (isSuccess(result)) {
1346                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1347                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1348                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1349                 SuggestHits suggestHits = new SuggestHits();
1350                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1351
1352                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1353
1354                 for (int i = 0; i < optionsArray.size(); i++) {
1355                     JSONObject hit = (JSONObject) optionsArray.get(i);
1356
1357                     SuggestHit suggestHit = new SuggestHit();
1358                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1359                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1360                     Document doc = new Document();
1361                     if (hit.get(JSON_ATTR_VERSION) != null) {
1362                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1363                     }
1364                     doc.setUrl(
1365                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1366
1367                     doc.setContent((JSONObject) hit.get("payload"));
1368                     suggestHit.setDocument(doc);
1369                     suggestHitArray.add(suggestHit);
1370                 }
1371                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1372                 result.setSuggestResult(suggestHits);
1373
1374                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1375                 if (aggregations != null) {
1376                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1377                     AggregationResults aggs = new AggregationResults();
1378                     aggs.setAggregations(aggResults);
1379                     result.setAggregationResult(aggs);
1380                 }
1381
1382                 // success
1383             } else {
1384                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1385                 if (error != null) {
1386                     result.setError(
1387                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1388                 }
1389             }
1390         } catch (Exception e) {
1391             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1392         }
1393     }
1394
1395     /**
1396      * Record the timing of the operation in the metrics log.
1397      *
1398      */
1399     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1400             String... args) {
1401         metricsLogger.info(message,
1402                 new LogFields() //
1403                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1404                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult())
1405                         .setField(LogLine.DefinedFields.SERVER_IP, "ElasticHost-"+config.getIpAddress()),
1406                 override, args);
1407     }
1408 }