Use a URI builder to create URLs
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Optional;
53 import java.util.Properties;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import javax.ws.rs.HttpMethod;
56 import javax.ws.rs.core.Response.Status;
57 import javax.ws.rs.core.Response.Status.Family;
58 import javax.ws.rs.core.UriBuilder;
59 import org.eclipse.jetty.http.HttpStatus;
60 import org.json.simple.JSONArray;
61 import org.json.simple.JSONObject;
62 import org.json.simple.parser.JSONParser;
63 import org.json.simple.parser.ParseException;
64 import org.onap.aai.cl.api.LogFields;
65 import org.onap.aai.cl.api.LogLine;
66 import org.onap.aai.cl.api.Logger;
67 import org.onap.aai.cl.eelf.LoggerFactory;
68 import org.onap.aai.cl.mdc.MdcContext;
69 import org.onap.aai.cl.mdc.MdcOverride;
70 import org.onap.aai.sa.rest.AnalysisConfiguration;
71 import org.onap.aai.sa.rest.ApiUtils;
72 import org.onap.aai.sa.rest.BulkRequest;
73 import org.onap.aai.sa.rest.BulkRequest.OperationType;
74 import org.onap.aai.sa.rest.DocumentSchema;
75 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
76 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
77 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage;
78 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
80 import org.onap.aai.sa.searchdbabstraction.entity.Document;
81 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
83 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
84 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
85 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
86 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
87 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
88 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
89 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
90 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
91 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
92 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
93 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
94 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
95 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
96
97 /**
98  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
99  */
100 public class ElasticSearchHttpController implements DocumentStoreInterface {
101
102     private static ElasticSearchHttpController instance = null;
103
104     private static final Logger logger =
105             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
106     private static final Logger metricsLogger =
107             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
108
109     private static final String URL_QUERY_VERSION = "version=";
110
111     private static final String JSON_ATTR_VERSION = "_version";
112     private static final String JSON_ATTR_ERROR = "error";
113     private static final String JSON_ATTR_REASON = "reason";
114
115     private static final String DEFAULT_TYPE = "default";
116
117     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
118     private static final String MSG_RESPONSE_CODE = "Response Code : ";
119     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
120
121     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
122
123     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
124             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
125     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
126             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
127     private static final String BULK_IMPORT_INDEX_TEMPLATE =
128             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
129     private static final String BULK_DELETE_TEMPLATE =
130             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
131
132     private final ElasticSearchConfig config;
133
134     protected AnalysisConfiguration analysisConfig;
135
136
137     public ElasticSearchHttpController(ElasticSearchConfig config) {
138         this.config = config;
139         analysisConfig = new AnalysisConfiguration();
140
141         String rootUrl = null;
142         try {
143             rootUrl = buildUrl(createUriBuilder("")).toString();
144             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl);
145             checkConnection();
146             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl);
147         } catch (Exception e) {
148             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage());
149         }
150     }
151
152     public static ElasticSearchHttpController getInstance() {
153         synchronized (ElasticSearchHttpController.class) {
154             if (instance == null) {
155                 Properties properties = new Properties();
156                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
157                 try {
158                     properties.load(new FileInputStream(file));
159                 } catch (Exception e) {
160                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
161                             e.getLocalizedMessage());
162                 }
163
164                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
165                 instance = new ElasticSearchHttpController(config);
166             }
167         }
168
169         return instance;
170     }
171
172     public AnalysisConfiguration getAnalysisConfig() {
173         return analysisConfig;
174     }
175
176     @Override
177     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
178         try {
179             // Submit the request to ElasticSearch to create the index using a default document type.
180             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
181                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
182
183             // ElasticSearch will return us a 200 code on success when we
184             // want to report a 201, so translate the result here.
185             if (result.getResultCode() == Status.OK.getStatusCode()) {
186                 result.setResultCode(Status.CREATED.getStatusCode());
187             }
188
189             if (isSuccess(result)) {
190                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
191             }
192             return result;
193         } catch (DocumentStoreOperationException | IOException e) {
194             return new OperationResultBuilder().useDefaults()
195                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
196         }
197     }
198
199     @Override
200     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
201         try {
202             OperationResult result = createTable(index, dynamicSchema);
203
204             // ElasticSearch will return us a 200 code on success when we
205             // want to report a 201, so translate the result here.
206             if (result.getResultCode() == Status.OK.getStatusCode()) {
207                 result.setResultCode(Status.CREATED.getStatusCode());
208             }
209             if (isSuccess(result)) {
210                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
211             }
212             return result;
213         } catch (DocumentStoreOperationException e) {
214             return new OperationResultBuilder().useDefaults()
215                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
216         }
217     }
218
219     @Override
220     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
221         MdcOverride override = getStartTime(new MdcOverride());
222
223         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE);
224         OperationResult opResult = handleResponse(conn);
225         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
226         shutdownConnection(conn);
227
228         return opResult;
229     }
230
231     // @Override
232     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
233             throws DocumentStoreOperationException {
234         if (indexSettings == null) {
235             logger.debug("No settings provided.");
236         }
237
238         if (indexMappings == null) {
239             logger.debug("No mappings provided.");
240         }
241
242         MdcOverride override = getStartTime(new MdcOverride());
243
244         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
245
246         StringBuilder sb = new StringBuilder(128);
247         sb.append("{ \"settings\" : ");
248         sb.append(indexSettings);
249         sb.append(",");
250
251         sb.append("\"mappings\" : {");
252         sb.append("\"" + typeName + "\" :");
253         sb.append(indexMappings);
254         sb.append("}}");
255
256         try {
257             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
258         } catch (IOException e) {
259             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
260             throw new DocumentStoreOperationException(e.getMessage(), e);
261         }
262
263         logger.debug("Request content: " + sb);
264
265         OperationResult opResult = handleResponse(conn);
266         shutdownConnection(conn);
267         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
268
269         return opResult;
270     }
271
272     /**
273      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
274      * index.
275      *
276      * @param indexName - The name of the index to be created
277      * @param settingsAndMappings - The actual JSON object that will define the index
278      * @return - The operation result of writing into Elasticsearch
279      * @throws DocumentStoreOperationException
280      */
281     protected OperationResult createTable(String indexName, String settingsAndMappings)
282             throws DocumentStoreOperationException {
283         MdcOverride override = getStartTime(new MdcOverride());
284
285         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
286         try {
287             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
288         } catch (IOException e) {
289             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
290             throw new DocumentStoreOperationException(e.getMessage());
291         }
292
293         OperationResult result = handleResponse(conn);
294         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
295
296         return result;
297     }
298
299     @Override
300     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
301             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
302
303         if (!allowImplicitIndexCreation) {
304             // Before we do anything, make sure that the specified index actually exists in the
305             // document store - we don't want to rely on ElasticSearch to fail the document
306             // create because it could be configured to implicitly create a non-existent index,
307             // which can lead to hard-to-debug behaviour with queries down the road.
308             OperationResult indexExistsResult = checkIndexExistence(indexName);
309             if (!isSuccess(indexExistsResult)) {
310                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
311                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
312                         .result(resultMsg).failureCause(resultMsg).build();
313             }
314         }
315
316         if (document.getId() == null || document.getId().isEmpty()) {
317             return createDocumentWithoutId(indexName, document);
318         } else {
319             return createDocumentWithId(indexName, document);
320         }
321     }
322
323     @Override
324     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
325             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
326         if (!allowImplicitIndexCreation) {
327             // Before we do anything, make sure that the specified index actually exists in the
328             // document store - we don't want to rely on ElasticSearch to fail the document
329             // create because it could be configured to implicitly create a non-existent index,
330             // which can lead to hard-to-debug behaviour with queries down the road.
331             OperationResult indexExistsResult = checkIndexExistence(indexName);
332             if (!isSuccess(indexExistsResult)) {
333                 DocumentOperationResult opResult = new DocumentOperationResult();
334                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
335                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
336                 opResult.setResult(resultMsg);
337                 opResult.setFailureCause(resultMsg);
338                 return opResult;
339             }
340         }
341
342         MdcOverride override = getStartTime(new MdcOverride());
343
344         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
345                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
346
347         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
348         attachDocument(conn, document);
349
350         DocumentOperationResult opResult = getOperationResult(conn);
351         buildDocumentResult(opResult, indexName);
352
353         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
354
355         shutdownConnection(conn);
356
357         return opResult;
358     }
359
360     @Override
361     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
362             throws DocumentStoreOperationException {
363         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
364                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
365
366         MdcOverride override = getStartTime(new MdcOverride());
367
368         HttpURLConnection conn = createConnection(url, HttpMethod.DELETE);
369
370         DocumentOperationResult opResult = getOperationResult(conn);
371         buildDocumentResult(opResult, indexName);
372         // supress the etag and url in response for delete as they are not required
373         if (opResult.getDocument() != null) {
374             opResult.getDocument().setEtag(null);
375             opResult.getDocument().setUrl(null);
376         }
377
378         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
379
380         shutdownConnection(conn);
381
382         return opResult;
383     }
384
385     @Override
386     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
387             throws DocumentStoreOperationException {
388         final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId());
389         if (document.getVersion() != null) {
390             uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion());
391         }
392
393         MdcOverride override = getStartTime(new MdcOverride());
394         HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET");
395
396         DocumentOperationResult opResult = getOperationResult(conn);
397         buildDocumentResult(opResult, indexName);
398
399         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
400
401         shutdownConnection(conn);
402
403         return opResult;
404     }
405
406     @Override
407     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
408         final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString));
409
410         MdcOverride override = getStartTime(new MdcOverride());
411
412         HttpURLConnection conn = createConnection(url, "GET");
413         SearchOperationResult opResult = getSearchOperationResult(conn);
414         buildSearchResult(opResult, indexName);
415
416         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
417
418         return opResult;
419     }
420
421     @Override
422     public SearchOperationResult searchWithPayload(String indexName, String query)
423             throws DocumentStoreOperationException {
424         if (logger.isDebugEnabled()) {
425             logger.debug("Querying index: " + indexName + " with query string: " + query);
426         }
427         final URL url = buildUrl(createUriBuilder(indexName, "_search"));
428
429         MdcOverride override = getStartTime(new MdcOverride());
430
431         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
432         attachContent(conn, query);
433         logger.debug("Request body =  Elasticsearch query = " + query);
434
435         SearchOperationResult opResult = getSearchOperationResult(conn);
436         buildSearchResult(opResult, indexName);
437
438         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
439
440         shutdownConnection(conn);
441
442         return opResult;
443     }
444
445     @Override
446     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
447             throws DocumentStoreOperationException {
448         if (logger.isDebugEnabled()) {
449             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
450         }
451
452         MdcOverride override = getStartTime(new MdcOverride());
453         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST);
454         attachContent(conn, query);
455
456         logger.debug("Request body =  Elasticsearch query = " + query);
457
458         SearchOperationResult opResult = getSearchOperationResult(conn);
459         buildSuggestResult(opResult, indexName);
460
461         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
462
463         shutdownConnection(conn);
464
465         return opResult;
466     }
467
468     @Override
469     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
470         if (logger.isDebugEnabled()) {
471             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
472
473             for (BulkRequest request : requests) {
474                 dbgString.append("[").append(request).append("] ");
475             }
476
477             logger.debug(dbgString.toString());
478         }
479
480         MdcOverride override = getStartTime(new MdcOverride());
481
482         // Parse the supplied set of operations.
483         // Iterate over the list of operations which we were provided and
484         // translate them into a format that ElasticSearh understands.
485         int opCount = 0;
486         StringBuilder esOperationSet = new StringBuilder(128);
487         List<ElasticSearchResultItem> rejected = new ArrayList<>();
488         for (BulkRequest request : requests) {
489
490             // Convert the request to the syntax ElasticSearch likes.
491             if (buildEsOperation(request, esOperationSet, rejected)) {
492                 opCount++;
493             }
494         }
495
496         ElasticSearchBulkOperationResult opResult = null;
497         if (opCount > 0) {
498             HttpURLConnection conn;
499             try {
500                 conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection();
501                 conn.setRequestMethod(HttpMethod.PUT);
502                 conn.setDoOutput(true);
503                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
504                 conn.setRequestProperty("Connection", "Close");
505
506             } catch (IOException e) {
507
508                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
509                 if (logger.isDebugEnabled()) {
510                     logger.debug(Throwables.getStackTraceAsString(e));
511                 }
512
513                 throw new DocumentStoreOperationException(
514                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
515             }
516
517             StringBuilder bulkResult = new StringBuilder(128);
518             try {
519                 // Create an output stream to write our request to.
520                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
521
522                 if (logger.isDebugEnabled()) {
523                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
524                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
525                 }
526
527                 // Write the resulting request string to our output stream. (this sends the request to ES?)
528                 out.write(esOperationSet.toString());
529                 out.close();
530
531                 // Open an input stream on our connection in order to read back the results.
532                 InputStream is = conn.getInputStream();
533                 InputStreamReader inputstreamreader = new InputStreamReader(is);
534                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
535
536                 // Read the contents of the input stream into our result string...
537                 String esResponseString = null;
538
539                 while ((esResponseString = bufferedreader.readLine()) != null) {
540                     bulkResult.append(esResponseString).append("\n");
541                 }
542
543             } catch (IOException e) {
544
545                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
546                 if (logger.isDebugEnabled()) {
547                     StringWriter sw = new StringWriter();
548                     e.printStackTrace(new PrintWriter(sw));
549                     logger.debug(sw.toString());
550                 }
551
552                 throw new DocumentStoreOperationException(
553                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
554             }
555
556             if (logger.isDebugEnabled()) {
557                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
558             }
559
560             // ...and marshal the resulting string into a Java object.
561             try {
562                 opResult = marshallEsBulkResult(bulkResult.toString());
563
564             } catch (IOException e) {
565                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
566                 if (logger.isDebugEnabled()) {
567                     logger.debug(Throwables.getStackTraceAsString(e));
568                 }
569
570                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
571                         e);
572             }
573         }
574
575         OperationResult result = new OperationResultBuilder() //
576                 .resultCode(HttpStatus.MULTI_STATUS_207) //
577                 .result(buildGenericBulkResultSet(opResult, rejected)) //
578                 .build();
579
580         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
581         // it.
582         String resultStringForMetricsLog = result.getResult();
583         if (isSuccess(result)) {
584             resultStringForMetricsLog =
585                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
586         }
587
588         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
589                 new LogFields() //
590                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
591                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
592                 override);
593
594         return result;
595     }
596
597
598     /**
599      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
600      *
601      * @param indexName - The index to look for.
602      * @return - An operation result indicating the success or failure of the check.
603      * @throws DocumentStoreOperationException
604      */
605     private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
606         MdcOverride override = getStartTime(new MdcOverride());
607
608         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD);
609         int resultCode;
610         try {
611             resultCode = conn.getResponseCode();
612         } catch (IOException ex) {
613             shutdownConnection(conn);
614             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
615         }
616         logger.debug(MSG_RESPONSE_CODE + resultCode);
617
618         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
619         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
620         shutdownConnection(conn);
621
622         return opResult;
623     }
624
625     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
626             throws DocumentStoreOperationException {
627         // check if the document already exists
628         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
629
630         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
631             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
632                 opResult.setFailureCause("A document with the same id already exists.");
633             } else {
634                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
635             }
636             opResult.setResultCode(Status.CONFLICT.getStatusCode());
637             return opResult;
638         }
639
640         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()));
641         MdcOverride override = getStartTime(new MdcOverride());
642
643         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
644
645
646         attachDocument(conn, document);
647
648
649         opResult = getOperationResult(conn);
650         buildDocumentResult(opResult, indexName);
651
652         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
653
654         shutdownConnection(conn);
655
656         return opResult;
657     }
658
659     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
660             throws DocumentStoreOperationException {
661         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE));
662
663         MdcOverride override = getStartTime(new MdcOverride());
664         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
665         attachDocument(conn, document);
666
667         DocumentOperationResult response = getOperationResult(conn);
668         buildDocumentResult(response, indexName);
669
670         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
671
672         shutdownConnection(conn);
673
674         return response;
675     }
676
677     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
678             throws DocumentStoreOperationException {
679         conn.setRequestProperty("Connection", "Close");
680         attachContent(conn, doc.getContentInJson());
681     }
682
683     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
684             throws DocumentStoreOperationException {
685         MdcOverride override = getStartTime(new MdcOverride());
686         HttpURLConnection conn =
687                 createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD);
688         int resultCode;
689         try {
690             resultCode = conn.getResponseCode();
691         } catch (IOException ex) {
692             shutdownConnection(conn);
693             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
694         }
695
696         logger.debug(MSG_RESPONSE_CODE + resultCode);
697
698         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
699                 .useDefaults().resultCode(resultCode).build();
700
701         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
702         shutdownConnection(conn);
703
704         return opResult;
705     }
706
707     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
708         OutputStream outputStream = null;
709         OutputStreamWriter out = null;
710
711         try {
712             outputStream = conn.getOutputStream();
713         } catch (IOException e) {
714             shutdownConnection(conn);
715             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
716         }
717
718         out = new OutputStreamWriter(outputStream);
719
720         try {
721             out.write(content);
722             out.close();
723         } catch (IOException e) {
724             shutdownConnection(conn);
725             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
726         }
727     }
728
729     private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException {
730         HttpURLConnection conn = null;
731         try {
732             conn = (HttpURLConnection) url.openConnection();
733             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
734             conn.setDoOutput(true);
735         } catch (IOException e) {
736             shutdownConnection(conn);
737             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
738         }
739
740         return conn;
741     }
742
743     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
744         return handleResponse(conn, new OperationResultBuilder().useDefaults());
745     }
746
747     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
748             throws DocumentStoreOperationException {
749         int resultCode;
750
751         try {
752             resultCode = conn.getResponseCode();
753         } catch (IOException ex) {
754             shutdownConnection(conn);
755             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
756         }
757
758         logger.debug(MSG_RESPONSE_CODE + resultCode);
759
760         InputStream inputStream = null;
761
762         if (!isSuccessCode(resultCode)) {
763             inputStream = conn.getErrorStream();
764         } else {
765             try {
766                 inputStream = conn.getInputStream();
767             } catch (IOException e) {
768                 shutdownConnection(conn);
769                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
770             }
771         }
772
773         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
774         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
775
776         StringBuilder result = new StringBuilder(128);
777         String string = null;
778
779         try {
780             while ((string = bufferedreader.readLine()) != null) {
781                 result.append(string).append("\n");
782             }
783         } catch (IOException e) {
784             shutdownConnection(conn);
785             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
786         }
787
788         if (resultCode == Status.CONFLICT.getStatusCode()) {
789             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
790         } else {
791             rb.resultCode(resultCode);
792         }
793         if (logger.isDebugEnabled()) {
794             logger.debug("Raw result string from ElasticSearch = " + result.toString());
795         }
796         rb.result(result.toString());
797         rb.resultVersion(extractVersion(result.toString()));
798         return rb.build();
799     }
800
801     private String extractVersion(String result) {
802         JSONParser parser = new JSONParser();
803         String version = null;
804         try {
805             JSONObject root = (JSONObject) parser.parse(result);
806             if (root.get(JSON_ATTR_VERSION) != null) {
807                 version = root.get(JSON_ATTR_VERSION).toString();
808             }
809         } catch (ParseException e) {
810             // Not all responses from ElasticSearch include a version, so
811             // if we don't get one back, just return an empty string rather
812             // than trigger a false failure.
813             version = "";
814         }
815         return version;
816     }
817
818     /**
819      * This convenience method gets the current system time and stores it in an attribute in the supplied
820      * {@link MdcOverride} object so that it can be used later by the metrics logger.
821      *
822      * @param override - The {@link MdcOverride} object to update.
823      * @return - The supplied {@link MdcOverride} object.
824      */
825     private MdcOverride getStartTime(MdcOverride override) {
826
827         // Grab the current time...
828         long startTimeInMs = System.currentTimeMillis();
829
830         // ...and add it as an attribute to the supplied MDC Override object.
831         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
832         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
833
834         // Return the MdcOverride object that we were passed.
835         // This looks odd, but it allows us to do stuff like:
836         //
837         // MdcOverride ov = getStartTime(new MdcOverride())
838         //
839         // which is quite handy, but also allows us to pass in an existing
840         // MdcOverride object which already has some attributes set.
841         return override;
842     }
843
844     private boolean isSuccess(OperationResult result) {
845         return isSuccessCode(result.getResultCode());
846     }
847
848     private boolean isSuccessCode(int statusCode) {
849         return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
850     }
851
852     private UriBuilder createUriBuilder(String path, String... paths) {
853         UriBuilder builder = UriBuilder.fromPath(path);
854         for (String other : paths) {
855             builder.path(other);
856         }
857         builder.host(config.getIpAddress());
858         String port = Optional.ofNullable(config.getHttpPort()).orElse("0");
859         builder.port(Integer.valueOf(port));
860         builder.scheme("http");
861         return builder;
862     }
863
864     private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException {
865         try {
866             return builder.build().toURL();
867         } catch (MalformedURLException e) {
868             logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage());
869             throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e);
870         }
871     }
872
873     private HttpURLConnection createConnection(final URL url, final String method)
874             throws DocumentStoreOperationException {
875         HttpURLConnection conn = initializeConnection(url);
876         try {
877             logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL());
878             conn.setRequestMethod(method);
879         } catch (ProtocolException e) {
880             shutdownConnection(conn);
881             throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method);
882         }
883         return conn;
884     }
885
886     private OperationResult checkConnection() throws IOException, DocumentStoreOperationException {
887         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET);
888         int resultCode = conn.getResponseCode();
889         logger.debug("getClusterHealth() response Code : " + resultCode);
890         shutdownConnection(conn);
891         return new OperationResultBuilder().resultCode(resultCode).build();
892     }
893
894     private void shutdownConnection(HttpURLConnection connection) {
895         if (connection == null) {
896             return;
897         }
898
899         final String methodName = "shutdownConnection";
900         InputStream inputstream = null;
901         OutputStream outputstream = null;
902
903         try {
904             inputstream = connection.getInputStream();
905         } catch (IOException e) {
906             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
907         } finally {
908             if (inputstream != null) {
909                 try {
910                     inputstream.close();
911                 } catch (IOException e) {
912                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
913                 }
914             }
915         }
916
917         try {
918             outputstream = connection.getOutputStream();
919         } catch (IOException e) {
920             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
921         } finally {
922             if (outputstream != null) {
923                 try {
924                     outputstream.close();
925                 } catch (IOException e) {
926                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
927                 }
928             }
929         }
930
931         connection.disconnect();
932     }
933
934     /**
935      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
936      *
937      * @param request - The request to be performed.
938      * @param sb - The string builder to append the json data to
939      * @throws DocumentStoreOperationException
940      */
941     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
942             throws DocumentStoreOperationException {
943
944         boolean retVal = true;
945         // What kind of operation are we performing?
946         switch (request.getOperationType()) {
947
948             // Create a new document.
949             case CREATE:
950
951                 // Make sure that we were supplied a document payload.
952                 if (request.getOperation().getDocument() == null) {
953
954                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
955                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
956                     return false;
957                 }
958
959                 // Make sure that the supplied document URL is formatted
960                 // correctly.
961                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
962                     fails.add(generateRejectionEntry(request.getOperationType(),
963                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
964                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
965                     return false;
966                 }
967
968                 // Validate that the specified index actually exists before we
969                 // try to perform the create.
970                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
971
972                     fails.add(generateRejectionEntry(request.getOperationType(),
973                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
974                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
975                     return false;
976                 }
977
978                 // If we were supplied an id for the new document, then
979                 // include it in the bulk operation to Elastic Search
980                 if (request.getId() == null) {
981
982                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
983
984                     // Otherwise, we just leave that parameter off and ElasticSearch
985                     // will generate one for us.
986                 } else {
987                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
988                             request.getId()));
989                 }
990
991                 try {
992                     // Append the document that we want to create.
993                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
994                 } catch (JsonProcessingException e) {
995                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
996                 }
997
998                 break;
999
1000             // Update an existing document.
1001             case UPDATE:
1002
1003                 // Make sure that we were supplied a document payload.
1004                 if (request.getOperation().getDocument() == null) {
1005
1006                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1007                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1008                     return false;
1009                 }
1010
1011                 // Make sure that the supplied document URL is formatted
1012                 // correctly.
1013                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1014                     fails.add(generateRejectionEntry(request.getOperationType(),
1015                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1016                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1017                     return false;
1018                 }
1019
1020                 // Validate that the specified index actually exists before we
1021                 // try to perform the update.
1022                 if (!indexExists(request.getIndex())) {
1023
1024                     fails.add(generateRejectionEntry(request.getOperationType(),
1025                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1026                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1027                     return false;
1028                 }
1029
1030                 // Validate that the document we are trying to update actually
1031                 // exists before we try to perform the update.
1032                 if (!documentExists(request.getIndex(), request.getId())) {
1033
1034                     fails.add(generateRejectionEntry(request.getOperationType(),
1035                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1036                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1037                     return false;
1038                 }
1039
1040                 // It is mandatory that a version be supplied for an update operation,
1041                 // so validate that now.
1042                 if (request.getOperation().getMetaData().getEtag() == null) {
1043
1044                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1045                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1046                     return false;
1047                 }
1048
1049                 // Generate the update request...
1050                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1051                         request.getOperation().getMetaData().getEtag()));
1052
1053                 // ...and append the document that we want to update.
1054                 try {
1055                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1056                 } catch (JsonProcessingException e) {
1057                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1058                 }
1059                 break;
1060
1061             // Delete an existing document.
1062             case DELETE:
1063
1064                 // Make sure that the supplied document URL is formatted
1065                 // correctly.
1066                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1067                     fails.add(generateRejectionEntry(request.getOperationType(),
1068                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1069                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1070                     return false;
1071                 }
1072
1073                 // Validate that the specified index actually exists before we
1074                 // try to perform the delete.
1075                 if (!indexExists(request.getIndex())) {
1076
1077                     fails.add(generateRejectionEntry(request.getOperationType(),
1078                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1079                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1080                     return false;
1081                 }
1082
1083                 // Validate that the document we are trying to update actually
1084                 // exists before we try to perform the delete.
1085                 if (!documentExists(request.getIndex(), request.getId())) {
1086
1087                     fails.add(generateRejectionEntry(request.getOperationType(),
1088                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1089                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1090                     return false;
1091                 }
1092
1093                 // It is mandatory that a version be supplied for a delete operation,
1094                 // so validate that now.
1095                 if (request.getOperation().getMetaData().getEtag() == null) {
1096
1097                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1098                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1099                     return false;
1100                 }
1101
1102                 // Generate the delete request.
1103                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1104                         request.getOperation().getMetaData().getEtag()));
1105                 break;
1106             default:
1107         }
1108
1109         return retVal;
1110     }
1111
1112     private boolean indexExists(String index) throws DocumentStoreOperationException {
1113         return isSuccess(checkIndexExistence(index));
1114     }
1115
1116     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1117         return isSuccess(checkDocumentExistence(index, id));
1118     }
1119
1120     /**
1121      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1122      * document store.
1123      *
1124      * @param rejectReason - A message describing why the operation was rejected.
1125      * @param anId - The identifier associated with the document being acted on.
1126      * @param statusCode - An HTTP status code.
1127      * @return - A result set item.
1128      */
1129     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1130             String anId, int statusCode, String originalUrl) {
1131
1132         ElasticSearchError err = new ElasticSearchError();
1133         err.setReason(rejectReason);
1134
1135         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1136         op.setIndex(index);
1137         op.setId(anId);
1138         op.setStatus(statusCode);
1139         op.setError(err);
1140         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1141
1142         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1143
1144         switch (opType) {
1145             case CREATE:
1146                 rejectionResult.setCreate(op);
1147                 break;
1148             case UPDATE:
1149                 rejectionResult.setIndex(op);
1150                 break;
1151             case DELETE:
1152                 rejectionResult.setDelete(op);
1153                 break;
1154             default:
1155         }
1156
1157         return rejectionResult;
1158     }
1159
1160     /**
1161      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1162      * marshals it into a Java object.
1163      *
1164      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1165      * @return - The marshalled response.
1166      * @throws JsonParseException
1167      * @throws JsonMappingException
1168      * @throws IOException
1169      */
1170     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1171         if (jsonResult != null) {
1172             if (logger.isDebugEnabled()) {
1173                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1174             }
1175
1176             ObjectMapper mapper = new ObjectMapper();
1177             mapper.setSerializationInclusion(Include.NON_EMPTY);
1178
1179             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1180         }
1181
1182         return null;
1183     }
1184
1185     /**
1186      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1187      *
1188      * @param esResult - ElasticSearch bulk operations response.
1189      * @return - A generic result set.
1190      */
1191     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1192             List<ElasticSearchResultItem> rejectedOps) {
1193         int totalOps = 0;
1194         int totalSuccess = 0;
1195         int totalFails = 0;
1196
1197         if (logger.isDebugEnabled()) {
1198
1199             logger.debug("ESController: Build generic result set.  ES Results: "
1200                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1201         }
1202
1203         // Build a combined list of result items from the results returned
1204         // from ElasticSearch and the list of operations that we rejected
1205         // without sending to ElasticSearch.
1206         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1207         if (esResult != null) {
1208             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1209         }
1210         combinedResults.addAll(rejectedOps);
1211
1212         // Iterate over the individual results in the resulting result set.
1213         StringBuilder resultsBuilder = new StringBuilder();
1214         AtomicBoolean firstItem = new AtomicBoolean(true);
1215         for (ElasticSearchResultItem item : combinedResults) {
1216
1217             // Increment the operation counts.
1218             totalOps++;
1219             if (isSuccessCode(item.operationStatus().getStatus())) {
1220                 totalSuccess++;
1221             } else {
1222                 totalFails++;
1223             }
1224
1225             // Prepend a comma to our response string unless this it the
1226             // first result in the set.
1227             if (!firstItem.compareAndSet(true, false)) {
1228                 resultsBuilder.append(", ");
1229             }
1230
1231             // Append the current result as a generic json structure.
1232             resultsBuilder.append(item.toJson());
1233         }
1234
1235         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1236                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1237     }
1238
1239     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1240         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1241     }
1242
1243     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1244             throws DocumentStoreOperationException {
1245         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1246     }
1247
1248     private void buildDocumentResult(DocumentOperationResult result, String index)
1249             throws DocumentStoreOperationException {
1250
1251         JSONParser parser = new JSONParser();
1252         JSONObject root;
1253         try {
1254             root = (JSONObject) parser.parse(result.getResult());
1255             if (isSuccess(result)) {
1256                 // Success response object
1257                 Document doc = new Document();
1258                 doc.setEtag(result.getResultVersion());
1259                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1260
1261                 doc.setContent((JSONObject) root.get("_source"));
1262                 result.setDocument(doc);
1263
1264             } else {
1265                 // Error response object
1266                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1267                 if (error != null) {
1268                     result.setError(
1269                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1270                 }
1271
1272             }
1273         } catch (Exception e) {
1274             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1275         }
1276     }
1277
1278     private String buildDocumentResponseUrl(String index, String id) {
1279         return ApiUtils.buildDocumentUri(index, id);
1280     }
1281
1282     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1283         JSONParser parser = new JSONParser();
1284         JSONObject root;
1285
1286         try {
1287             root = (JSONObject) parser.parse(result.getResult());
1288             if (isSuccess(result)) {
1289                 JSONObject hits = (JSONObject) root.get("hits");
1290                 JSONArray hitArray = (JSONArray) hits.get("hits");
1291                 SearchHits searchHits = new SearchHits();
1292                 searchHits.setTotalHits(hits.get("total").toString());
1293                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1294
1295                 for (int i = 0; i < hitArray.size(); i++) {
1296                     JSONObject hit = (JSONObject) hitArray.get(i);
1297                     SearchHit searchHit = new SearchHit();
1298                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1299                     Document doc = new Document();
1300                     if (hit.get(JSON_ATTR_VERSION) != null) {
1301                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1302                     }
1303
1304                     doc.setUrl(
1305                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1306                     doc.setContent((JSONObject) hit.get("_source"));
1307                     searchHit.setDocument(doc);
1308                     searchHitArray.add(searchHit);
1309                 }
1310                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1311                 result.setSearchResult(searchHits);
1312
1313                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1314                 if (aggregations != null) {
1315                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1316                     AggregationResults aggs = new AggregationResults();
1317                     aggs.setAggregations(aggResults);
1318                     result.setAggregationResult(aggs);
1319                 }
1320
1321                 // success
1322             } else {
1323                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1324                 if (error != null) {
1325                     result.setError(
1326                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1327                 }
1328             }
1329         } catch (Exception e) {
1330             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1331         }
1332     }
1333
1334     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1335         JSONParser parser = new JSONParser();
1336         JSONObject root;
1337         try {
1338             root = (JSONObject) parser.parse(result.getResult());
1339             if (isSuccess(result)) {
1340                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1341                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1342                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1343                 SuggestHits suggestHits = new SuggestHits();
1344                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1345
1346                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1347
1348                 for (int i = 0; i < optionsArray.size(); i++) {
1349                     JSONObject hit = (JSONObject) optionsArray.get(i);
1350
1351                     SuggestHit suggestHit = new SuggestHit();
1352                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1353                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1354                     Document doc = new Document();
1355                     if (hit.get(JSON_ATTR_VERSION) != null) {
1356                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1357                     }
1358                     doc.setUrl(
1359                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1360
1361                     doc.setContent((JSONObject) hit.get("payload"));
1362                     suggestHit.setDocument(doc);
1363                     suggestHitArray.add(suggestHit);
1364                 }
1365                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1366                 result.setSuggestResult(suggestHits);
1367
1368                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1369                 if (aggregations != null) {
1370                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1371                     AggregationResults aggs = new AggregationResults();
1372                     aggs.setAggregations(aggResults);
1373                     result.setAggregationResult(aggs);
1374                 }
1375
1376                 // success
1377             } else {
1378                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1379                 if (error != null) {
1380                     result.setError(
1381                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1382                 }
1383             }
1384         } catch (Exception e) {
1385             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1386         }
1387     }
1388
1389     /**
1390      * Record the timing of the operation in the metrics log.
1391      *
1392      */
1393     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1394             String... args) {
1395         metricsLogger.info(message,
1396                 new LogFields() //
1397                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1398                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),
1399                 override, args);
1400     }
1401 }