Fix minor code smells
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Optional;
53 import java.util.Properties;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import javax.ws.rs.HttpMethod;
56 import javax.ws.rs.core.Response.Status;
57 import javax.ws.rs.core.UriBuilder;
58 import org.eclipse.jetty.http.HttpStatus;
59 import org.json.simple.JSONArray;
60 import org.json.simple.JSONObject;
61 import org.json.simple.parser.JSONParser;
62 import org.json.simple.parser.ParseException;
63 import org.onap.aai.cl.api.LogFields;
64 import org.onap.aai.cl.api.LogLine;
65 import org.onap.aai.cl.api.Logger;
66 import org.onap.aai.cl.eelf.LoggerFactory;
67 import org.onap.aai.cl.mdc.MdcContext;
68 import org.onap.aai.cl.mdc.MdcOverride;
69 import org.onap.aai.sa.rest.AnalysisConfiguration;
70 import org.onap.aai.sa.rest.ApiUtils;
71 import org.onap.aai.sa.rest.BulkRequest;
72 import org.onap.aai.sa.rest.BulkRequest.OperationType;
73 import org.onap.aai.sa.rest.DocumentSchema;
74 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
75 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
76 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage;
77 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
79 import org.onap.aai.sa.searchdbabstraction.entity.Document;
80 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
81 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
83 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
84 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
85 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
86 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
87 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
88 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
89 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
90 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
91 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
92 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
93 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
94 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
95
96 /**
97  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
98  */
99 public class ElasticSearchHttpController implements DocumentStoreInterface {
100
101     private static ElasticSearchHttpController instance = null;
102
103     private static final Logger logger =
104             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
105     private static final Logger metricsLogger =
106             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
107
108     private static final String URL_QUERY_VERSION = "version=";
109
110     private static final String JSON_ATTR_VERSION = "_version";
111     private static final String JSON_ATTR_ERROR = "error";
112     private static final String JSON_ATTR_REASON = "reason";
113
114     private static final String DEFAULT_TYPE = "default";
115
116     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
117     private static final String MSG_RESPONSE_CODE = "Response Code : ";
118     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
119
120     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
121
122     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
123             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
124     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
125             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
126     private static final String BULK_IMPORT_INDEX_TEMPLATE =
127             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
128     private static final String BULK_DELETE_TEMPLATE =
129             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
130
131     private final ElasticSearchConfig config;
132
133     protected AnalysisConfiguration analysisConfig;
134
135
136     public ElasticSearchHttpController(ElasticSearchConfig config) {
137         this.config = config;
138         analysisConfig = new AnalysisConfiguration();
139
140         String rootUrl = null;
141         try {
142             rootUrl = buildUrl(createUriBuilder("")).toString();
143             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl);
144             checkConnection();
145             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl);
146         } catch (Exception e) {
147             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage());
148         }
149     }
150
151     public static ElasticSearchHttpController getInstance() {
152         synchronized (ElasticSearchHttpController.class) {
153             if (instance == null) {
154                 Properties properties = new Properties();
155                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
156                 try {
157                     properties.load(new FileInputStream(file));
158                 } catch (Exception e) {
159                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
160                             e.getLocalizedMessage());
161                 }
162
163                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
164                 instance = new ElasticSearchHttpController(config);
165             }
166         }
167
168         return instance;
169     }
170
171     public AnalysisConfiguration getAnalysisConfig() {
172         return analysisConfig;
173     }
174
175     @Override
176     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
177         try {
178             // Submit the request to ElasticSearch to create the index using a default document type.
179             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
180                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
181
182             // ElasticSearch will return us a 200 code on success when we
183             // want to report a 201, so translate the result here.
184             if (result.getResultCode() == Status.OK.getStatusCode()) {
185                 result.setResultCode(Status.CREATED.getStatusCode());
186             }
187
188             if (isSuccess(result)) {
189                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
190             }
191             return result;
192         } catch (DocumentStoreOperationException | IOException e) {
193             return new OperationResultBuilder().useDefaults()
194                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
195         }
196     }
197
198     @Override
199     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
200         try {
201             OperationResult result = createTable(index, dynamicSchema);
202
203             // ElasticSearch will return us a 200 code on success when we
204             // want to report a 201, so translate the result here.
205             if (result.getResultCode() == Status.OK.getStatusCode()) {
206                 result.setResultCode(Status.CREATED.getStatusCode());
207             }
208             if (isSuccess(result)) {
209                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
210             }
211             return result;
212         } catch (DocumentStoreOperationException e) {
213             return new OperationResultBuilder().useDefaults()
214                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
215         }
216     }
217
218     @Override
219     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
220         MdcOverride override = getStartTime(new MdcOverride());
221
222         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE);
223         OperationResult opResult = handleResponse(conn);
224         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
225         shutdownConnection(conn);
226
227         return opResult;
228     }
229
230     // @Override
231     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
232             throws DocumentStoreOperationException {
233         if (indexSettings == null) {
234             logger.debug("No settings provided.");
235         }
236
237         if (indexMappings == null) {
238             logger.debug("No mappings provided.");
239         }
240
241         MdcOverride override = getStartTime(new MdcOverride());
242
243         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
244
245         StringBuilder sb = new StringBuilder(128);
246         sb.append("{ \"settings\" : ");
247         sb.append(indexSettings);
248         sb.append(",");
249
250         sb.append("\"mappings\" : {");
251         sb.append("\"" + typeName + "\" :");
252         sb.append(indexMappings);
253         sb.append("}}");
254
255         try {
256             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
257         } catch (IOException e) {
258             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
259             throw new DocumentStoreOperationException(e.getMessage(), e);
260         }
261
262         logger.debug("Request content: " + sb);
263
264         OperationResult opResult = handleResponse(conn);
265         shutdownConnection(conn);
266         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
267
268         return opResult;
269     }
270
271     /**
272      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
273      * index.
274      *
275      * @param indexName - The name of the index to be created
276      * @param settingsAndMappings - The actual JSON object that will define the index
277      * @return - The operation result of writing into Elasticsearch
278      * @throws DocumentStoreOperationException
279      */
280     protected OperationResult createTable(String indexName, String settingsAndMappings)
281             throws DocumentStoreOperationException {
282         MdcOverride override = getStartTime(new MdcOverride());
283
284         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
285         try {
286             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
287         } catch (IOException e) {
288             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
289             throw new DocumentStoreOperationException(e.getMessage());
290         }
291
292         OperationResult result = handleResponse(conn);
293         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
294
295         return result;
296     }
297
298     @Override
299     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
300             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
301
302         if (!allowImplicitIndexCreation) {
303             // Before we do anything, make sure that the specified index actually exists in the
304             // document store - we don't want to rely on ElasticSearch to fail the document
305             // create because it could be configured to implicitly create a non-existent index,
306             // which can lead to hard-to-debug behaviour with queries down the road.
307             OperationResult indexExistsResult = checkIndexExistence(indexName);
308             if (!isSuccess(indexExistsResult)) {
309                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
310                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
311                         .result(resultMsg).failureCause(resultMsg).build();
312             }
313         }
314
315         if (document.getId() == null || document.getId().isEmpty()) {
316             return createDocumentWithoutId(indexName, document);
317         } else {
318             return createDocumentWithId(indexName, document);
319         }
320     }
321
322     @Override
323     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
324             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
325         if (!allowImplicitIndexCreation) {
326             // Before we do anything, make sure that the specified index actually exists in the
327             // document store - we don't want to rely on ElasticSearch to fail the document
328             // create because it could be configured to implicitly create a non-existent index,
329             // which can lead to hard-to-debug behaviour with queries down the road.
330             OperationResult indexExistsResult = checkIndexExistence(indexName);
331             if (!isSuccess(indexExistsResult)) {
332                 DocumentOperationResult opResult = new DocumentOperationResult();
333                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
334                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
335                 opResult.setResult(resultMsg);
336                 opResult.setFailureCause(resultMsg);
337                 return opResult;
338             }
339         }
340
341         MdcOverride override = getStartTime(new MdcOverride());
342
343         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
344                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
345
346         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
347         attachDocument(conn, document);
348
349         DocumentOperationResult opResult = getOperationResult(conn);
350         buildDocumentResult(opResult, indexName);
351
352         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
353
354         shutdownConnection(conn);
355
356         return opResult;
357     }
358
359     @Override
360     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
361             throws DocumentStoreOperationException {
362         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
363                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
364
365         MdcOverride override = getStartTime(new MdcOverride());
366
367         HttpURLConnection conn = createConnection(url, HttpMethod.DELETE);
368
369         DocumentOperationResult opResult = getOperationResult(conn);
370         buildDocumentResult(opResult, indexName);
371         // supress the etag and url in response for delete as they are not required
372         if (opResult.getDocument() != null) {
373             opResult.getDocument().setEtag(null);
374             opResult.getDocument().setUrl(null);
375         }
376
377         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
378
379         shutdownConnection(conn);
380
381         return opResult;
382     }
383
384     @Override
385     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
386             throws DocumentStoreOperationException {
387         final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId());
388         if (document.getVersion() != null) {
389             uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion());
390         }
391
392         MdcOverride override = getStartTime(new MdcOverride());
393         HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET");
394
395         DocumentOperationResult opResult = getOperationResult(conn);
396         buildDocumentResult(opResult, indexName);
397
398         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
399
400         shutdownConnection(conn);
401
402         return opResult;
403     }
404
405     @Override
406     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
407         final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString));
408
409         MdcOverride override = getStartTime(new MdcOverride());
410
411         HttpURLConnection conn = createConnection(url, "GET");
412         SearchOperationResult opResult = getSearchOperationResult(conn);
413         buildSearchResult(opResult, indexName);
414
415         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
416
417         return opResult;
418     }
419
420     @Override
421     public SearchOperationResult searchWithPayload(String indexName, String query)
422             throws DocumentStoreOperationException {
423         if (logger.isDebugEnabled()) {
424             logger.debug("Querying index: " + indexName + " with query string: " + query);
425         }
426         final URL url = buildUrl(createUriBuilder(indexName, "_search"));
427
428         MdcOverride override = getStartTime(new MdcOverride());
429
430         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
431         attachContent(conn, query);
432         logger.debug("Request body =  Elasticsearch query = " + query);
433
434         SearchOperationResult opResult = getSearchOperationResult(conn);
435         buildSearchResult(opResult, indexName);
436
437         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
438
439         shutdownConnection(conn);
440
441         return opResult;
442     }
443
444     @Override
445     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
446             throws DocumentStoreOperationException {
447         if (logger.isDebugEnabled()) {
448             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
449         }
450
451         MdcOverride override = getStartTime(new MdcOverride());
452         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST);
453         attachContent(conn, query);
454
455         logger.debug("Request body =  Elasticsearch query = " + query);
456
457         SearchOperationResult opResult = getSearchOperationResult(conn);
458         buildSuggestResult(opResult, indexName);
459
460         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
461
462         shutdownConnection(conn);
463
464         return opResult;
465     }
466
467     @Override
468     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
469         if (logger.isDebugEnabled()) {
470             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
471
472             for (BulkRequest request : requests) {
473                 dbgString.append("[").append(request).append("] ");
474             }
475
476             logger.debug(dbgString.toString());
477         }
478
479         MdcOverride override = getStartTime(new MdcOverride());
480
481         // Parse the supplied set of operations.
482         // Iterate over the list of operations which we were provided and
483         // translate them into a format that ElasticSearh understands.
484         int opCount = 0;
485         StringBuilder esOperationSet = new StringBuilder(128);
486         List<ElasticSearchResultItem> rejected = new ArrayList<>();
487         for (BulkRequest request : requests) {
488
489             // Convert the request to the syntax ElasticSearch likes.
490             if (buildEsOperation(request, esOperationSet, rejected)) {
491                 opCount++;
492             }
493         }
494
495         ElasticSearchBulkOperationResult opResult = null;
496         if (opCount > 0) {
497             HttpURLConnection conn;
498             try {
499                 conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection();
500                 conn.setRequestMethod(HttpMethod.PUT);
501                 conn.setDoOutput(true);
502                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
503                 conn.setRequestProperty("Connection", "Close");
504
505             } catch (IOException e) {
506
507                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
508                 if (logger.isDebugEnabled()) {
509                     logger.debug(Throwables.getStackTraceAsString(e));
510                 }
511
512                 throw new DocumentStoreOperationException(
513                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
514             }
515
516             StringBuilder bulkResult = new StringBuilder(128);
517             try {
518                 // Create an output stream to write our request to.
519                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
520
521                 if (logger.isDebugEnabled()) {
522                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
523                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
524                 }
525
526                 // Write the resulting request string to our output stream. (this sends the request to ES?)
527                 out.write(esOperationSet.toString());
528                 out.close();
529
530                 // Open an input stream on our connection in order to read back the results.
531                 InputStream is = conn.getInputStream();
532                 InputStreamReader inputstreamreader = new InputStreamReader(is);
533                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
534
535                 // Read the contents of the input stream into our result string...
536                 String esResponseString = null;
537
538                 while ((esResponseString = bufferedreader.readLine()) != null) {
539                     bulkResult.append(esResponseString).append("\n");
540                 }
541
542             } catch (IOException e) {
543
544                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
545                 if (logger.isDebugEnabled()) {
546                     StringWriter sw = new StringWriter();
547                     e.printStackTrace(new PrintWriter(sw));
548                     logger.debug(sw.toString());
549                 }
550
551                 throw new DocumentStoreOperationException(
552                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
553             }
554
555             if (logger.isDebugEnabled()) {
556                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
557             }
558
559             // ...and marshal the resulting string into a Java object.
560             try {
561                 opResult = marshallEsBulkResult(bulkResult.toString());
562
563             } catch (IOException e) {
564                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
565                 if (logger.isDebugEnabled()) {
566                     logger.debug(Throwables.getStackTraceAsString(e));
567                 }
568
569                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
570                         e);
571             }
572         }
573
574         OperationResult result = new OperationResultBuilder() //
575                 .resultCode(HttpStatus.MULTI_STATUS_207) //
576                 .result(buildGenericBulkResultSet(opResult, rejected)) //
577                 .build();
578
579         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
580         // it.
581         String resultStringForMetricsLog = result.getResult();
582         if (isSuccess(result)) {
583             resultStringForMetricsLog =
584                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
585         }
586
587         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
588                 new LogFields() //
589                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
590                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
591                 override);
592
593         return result;
594     }
595
596
597     /**
598      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
599      *
600      * @param indexName - The index to look for.
601      * @return - An operation result indicating the success or failure of the check.
602      * @throws DocumentStoreOperationException
603      */
604     private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
605         MdcOverride override = getStartTime(new MdcOverride());
606
607         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD);
608         int resultCode;
609         try {
610             resultCode = conn.getResponseCode();
611         } catch (IOException ex) {
612             shutdownConnection(conn);
613             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
614         }
615         logger.debug(MSG_RESPONSE_CODE + resultCode);
616
617         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
618         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
619         shutdownConnection(conn);
620
621         return opResult;
622     }
623
624     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
625             throws DocumentStoreOperationException {
626         // check if the document already exists
627         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
628
629         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
630             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
631                 opResult.setFailureCause("A document with the same id already exists.");
632             } else {
633                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
634             }
635             opResult.setResultCode(Status.CONFLICT.getStatusCode());
636             return opResult;
637         }
638
639         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()));
640         MdcOverride override = getStartTime(new MdcOverride());
641
642         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
643         attachDocument(conn, document);
644
645         opResult = getOperationResult(conn);
646         buildDocumentResult(opResult, indexName);
647
648         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
649
650         shutdownConnection(conn);
651
652         return opResult;
653     }
654
655     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
656             throws DocumentStoreOperationException {
657         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE));
658
659         MdcOverride override = getStartTime(new MdcOverride());
660         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
661         attachDocument(conn, document);
662
663         DocumentOperationResult response = getOperationResult(conn);
664         buildDocumentResult(response, indexName);
665
666         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
667
668         shutdownConnection(conn);
669
670         return response;
671     }
672
673     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
674             throws DocumentStoreOperationException {
675         conn.setRequestProperty("Connection", "Close");
676         attachContent(conn, doc.getContentInJson());
677     }
678
679     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
680             throws DocumentStoreOperationException {
681         MdcOverride override = getStartTime(new MdcOverride());
682         HttpURLConnection conn =
683                 createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD);
684         int resultCode;
685         try {
686             resultCode = conn.getResponseCode();
687         } catch (IOException ex) {
688             shutdownConnection(conn);
689             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
690         }
691
692         logger.debug(MSG_RESPONSE_CODE + resultCode);
693
694         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
695                 .useDefaults().resultCode(resultCode).build();
696
697         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
698         shutdownConnection(conn);
699
700         return opResult;
701     }
702
703     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
704         OutputStream outputStream = null;
705         OutputStreamWriter out = null;
706
707         try {
708             outputStream = conn.getOutputStream();
709         } catch (IOException e) {
710             shutdownConnection(conn);
711             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
712         }
713
714         out = new OutputStreamWriter(outputStream);
715
716         try {
717             out.write(content);
718             out.close();
719         } catch (IOException e) {
720             shutdownConnection(conn);
721             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
722         }
723     }
724
725     private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException {
726         HttpURLConnection conn = null;
727         try {
728             conn = (HttpURLConnection) url.openConnection();
729             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
730             conn.setDoOutput(true);
731         } catch (IOException e) {
732             shutdownConnection(conn);
733             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
734         }
735
736         return conn;
737     }
738
739     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
740         return handleResponse(conn, new OperationResultBuilder().useDefaults());
741     }
742
743     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
744             throws DocumentStoreOperationException {
745         int resultCode;
746
747         try {
748             resultCode = conn.getResponseCode();
749         } catch (IOException ex) {
750             shutdownConnection(conn);
751             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
752         }
753
754         logger.debug(MSG_RESPONSE_CODE + resultCode);
755
756         InputStream inputStream = null;
757
758         if (!ApiUtils.isSuccessStatusCode(resultCode)) {
759             inputStream = conn.getErrorStream();
760         } else {
761             try {
762                 inputStream = conn.getInputStream();
763             } catch (IOException e) {
764                 shutdownConnection(conn);
765                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
766             }
767         }
768
769         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
770         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
771
772         StringBuilder result = new StringBuilder(128);
773         String string = null;
774
775         try {
776             while ((string = bufferedreader.readLine()) != null) {
777                 result.append(string).append("\n");
778             }
779         } catch (IOException e) {
780             shutdownConnection(conn);
781             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
782         }
783
784         if (resultCode == Status.CONFLICT.getStatusCode()) {
785             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
786         } else {
787             rb.resultCode(resultCode);
788         }
789         if (logger.isDebugEnabled()) {
790             logger.debug("Raw result string from ElasticSearch = " + result.toString());
791         }
792         rb.result(result.toString());
793         rb.resultVersion(extractVersion(result.toString()));
794         return rb.build();
795     }
796
797     private String extractVersion(String result) {
798         JSONParser parser = new JSONParser();
799         String version = null;
800         try {
801             JSONObject root = (JSONObject) parser.parse(result);
802             if (root.get(JSON_ATTR_VERSION) != null) {
803                 version = root.get(JSON_ATTR_VERSION).toString();
804             }
805         } catch (ParseException e) {
806             // Not all responses from ElasticSearch include a version, so
807             // if we don't get one back, just return an empty string rather
808             // than trigger a false failure.
809             version = "";
810         }
811         return version;
812     }
813
814     /**
815      * This convenience method gets the current system time and stores it in an attribute in the supplied
816      * {@link MdcOverride} object so that it can be used later by the metrics logger.
817      *
818      * @param override - The {@link MdcOverride} object to update.
819      * @return - The supplied {@link MdcOverride} object.
820      */
821     private MdcOverride getStartTime(MdcOverride override) {
822
823         // Grab the current time...
824         long startTimeInMs = System.currentTimeMillis();
825
826         // ...and add it as an attribute to the supplied MDC Override object.
827         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
828         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
829
830         // Return the MdcOverride object that we were passed.
831         // This looks odd, but it allows us to do stuff like:
832         //
833         // MdcOverride ov = getStartTime(new MdcOverride())
834         //
835         // which is quite handy, but also allows us to pass in an existing
836         // MdcOverride object which already has some attributes set.
837         return override;
838     }
839
840     private boolean isSuccess(OperationResult result) {
841         return ApiUtils.isSuccessStatusCode(result.getResultCode());
842     }
843
844     private UriBuilder createUriBuilder(String path, String... paths) {
845         UriBuilder builder = UriBuilder.fromPath(path);
846         for (String other : paths) {
847             builder.path(other);
848         }
849         builder.host(config.getIpAddress());
850         String port = Optional.ofNullable(config.getHttpPort()).orElse("0");
851         builder.port(Integer.valueOf(port));
852         builder.scheme("http");
853         return builder;
854     }
855
856     private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException {
857         try {
858             return builder.build().toURL();
859         } catch (MalformedURLException e) {
860             logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage());
861             throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e);
862         }
863     }
864
865     private HttpURLConnection createConnection(final URL url, final String method)
866             throws DocumentStoreOperationException {
867         HttpURLConnection conn = initializeConnection(url);
868         try {
869             logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL());
870             conn.setRequestMethod(method);
871         } catch (ProtocolException e) {
872             shutdownConnection(conn);
873             throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method);
874         }
875         return conn;
876     }
877
878     private OperationResult checkConnection() throws IOException, DocumentStoreOperationException {
879         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET);
880         int resultCode = conn.getResponseCode();
881         logger.debug("getClusterHealth() response Code : " + resultCode);
882         shutdownConnection(conn);
883         return new OperationResultBuilder().resultCode(resultCode).build();
884     }
885
886     private void shutdownConnection(HttpURLConnection connection) {
887         if (connection == null) {
888             return;
889         }
890
891         final String methodName = "shutdownConnection";
892         InputStream inputstream = null;
893         OutputStream outputstream = null;
894
895         try {
896             inputstream = connection.getInputStream();
897         } catch (IOException e) {
898             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
899         } finally {
900             if (inputstream != null) {
901                 try {
902                     inputstream.close();
903                 } catch (IOException e) {
904                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
905                 }
906             }
907         }
908
909         try {
910             outputstream = connection.getOutputStream();
911         } catch (IOException e) {
912             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
913         } finally {
914             if (outputstream != null) {
915                 try {
916                     outputstream.close();
917                 } catch (IOException e) {
918                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
919                 }
920             }
921         }
922
923         connection.disconnect();
924     }
925
926     /**
927      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
928      *
929      * @param request - The request to be performed.
930      * @param sb - The string builder to append the json data to
931      * @throws DocumentStoreOperationException
932      */
933     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
934             throws DocumentStoreOperationException {
935
936         boolean retVal = true;
937         // What kind of operation are we performing?
938         switch (request.getOperationType()) {
939
940             // Create a new document.
941             case CREATE:
942
943                 // Make sure that we were supplied a document payload.
944                 if (request.getOperation().getDocument() == null) {
945
946                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
947                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
948                     return false;
949                 }
950
951                 // Make sure that the supplied document URL is formatted
952                 // correctly.
953                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
954                     fails.add(generateRejectionEntry(request.getOperationType(),
955                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
956                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
957                     return false;
958                 }
959
960                 // Validate that the specified index actually exists before we
961                 // try to perform the create.
962                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
963
964                     fails.add(generateRejectionEntry(request.getOperationType(),
965                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
966                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
967                     return false;
968                 }
969
970                 // If we were supplied an id for the new document, then
971                 // include it in the bulk operation to Elastic Search
972                 if (request.getId() == null) {
973
974                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
975
976                     // Otherwise, we just leave that parameter off and ElasticSearch
977                     // will generate one for us.
978                 } else {
979                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
980                             request.getId()));
981                 }
982
983                 try {
984                     // Append the document that we want to create.
985                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
986                 } catch (JsonProcessingException e) {
987                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
988                 }
989
990                 break;
991
992             // Update an existing document.
993             case UPDATE:
994
995                 // Make sure that we were supplied a document payload.
996                 if (request.getOperation().getDocument() == null) {
997
998                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
999                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1000                     return false;
1001                 }
1002
1003                 // Make sure that the supplied document URL is formatted
1004                 // correctly.
1005                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1006                     fails.add(generateRejectionEntry(request.getOperationType(),
1007                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1008                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1009                     return false;
1010                 }
1011
1012                 // Validate that the specified index actually exists before we
1013                 // try to perform the update.
1014                 if (!indexExists(request.getIndex())) {
1015
1016                     fails.add(generateRejectionEntry(request.getOperationType(),
1017                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1018                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1019                     return false;
1020                 }
1021
1022                 // Validate that the document we are trying to update actually
1023                 // exists before we try to perform the update.
1024                 if (!documentExists(request.getIndex(), request.getId())) {
1025
1026                     fails.add(generateRejectionEntry(request.getOperationType(),
1027                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1028                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1029                     return false;
1030                 }
1031
1032                 // It is mandatory that a version be supplied for an update operation,
1033                 // so validate that now.
1034                 if (request.getOperation().getMetaData().getEtag() == null) {
1035
1036                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1037                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1038                     return false;
1039                 }
1040
1041                 // Generate the update request...
1042                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1043                         request.getOperation().getMetaData().getEtag()));
1044
1045                 // ...and append the document that we want to update.
1046                 try {
1047                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1048                 } catch (JsonProcessingException e) {
1049                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1050                 }
1051                 break;
1052
1053             // Delete an existing document.
1054             case DELETE:
1055
1056                 // Make sure that the supplied document URL is formatted
1057                 // correctly.
1058                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1059                     fails.add(generateRejectionEntry(request.getOperationType(),
1060                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1061                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1062                     return false;
1063                 }
1064
1065                 // Validate that the specified index actually exists before we
1066                 // try to perform the delete.
1067                 if (!indexExists(request.getIndex())) {
1068
1069                     fails.add(generateRejectionEntry(request.getOperationType(),
1070                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1071                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1072                     return false;
1073                 }
1074
1075                 // Validate that the document we are trying to update actually
1076                 // exists before we try to perform the delete.
1077                 if (!documentExists(request.getIndex(), request.getId())) {
1078
1079                     fails.add(generateRejectionEntry(request.getOperationType(),
1080                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1081                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1082                     return false;
1083                 }
1084
1085                 // It is mandatory that a version be supplied for a delete operation,
1086                 // so validate that now.
1087                 if (request.getOperation().getMetaData().getEtag() == null) {
1088
1089                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1090                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1091                     return false;
1092                 }
1093
1094                 // Generate the delete request.
1095                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1096                         request.getOperation().getMetaData().getEtag()));
1097                 break;
1098             default:
1099         }
1100
1101         return retVal;
1102     }
1103
1104     private boolean indexExists(String index) throws DocumentStoreOperationException {
1105         return isSuccess(checkIndexExistence(index));
1106     }
1107
1108     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1109         return isSuccess(checkDocumentExistence(index, id));
1110     }
1111
1112     /**
1113      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1114      * document store.
1115      *
1116      * @param rejectReason - A message describing why the operation was rejected.
1117      * @param anId - The identifier associated with the document being acted on.
1118      * @param statusCode - An HTTP status code.
1119      * @return - A result set item.
1120      */
1121     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1122             String anId, int statusCode, String originalUrl) {
1123
1124         ElasticSearchError err = new ElasticSearchError();
1125         err.setReason(rejectReason);
1126
1127         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1128         op.setIndex(index);
1129         op.setId(anId);
1130         op.setStatus(statusCode);
1131         op.setError(err);
1132         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1133
1134         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1135
1136         switch (opType) {
1137             case CREATE:
1138                 rejectionResult.setCreate(op);
1139                 break;
1140             case UPDATE:
1141                 rejectionResult.setIndex(op);
1142                 break;
1143             case DELETE:
1144                 rejectionResult.setDelete(op);
1145                 break;
1146             default:
1147         }
1148
1149         return rejectionResult;
1150     }
1151
1152     /**
1153      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1154      * marshals it into a Java object.
1155      *
1156      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1157      * @return - The marshalled response.
1158      * @throws JsonParseException
1159      * @throws JsonMappingException
1160      * @throws IOException
1161      */
1162     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1163         if (jsonResult != null) {
1164             if (logger.isDebugEnabled()) {
1165                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1166             }
1167
1168             ObjectMapper mapper = new ObjectMapper();
1169             mapper.setSerializationInclusion(Include.NON_EMPTY);
1170
1171             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1172         }
1173
1174         return null;
1175     }
1176
1177     /**
1178      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1179      *
1180      * @param esResult - ElasticSearch bulk operations response.
1181      * @return - A generic result set.
1182      */
1183     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1184             List<ElasticSearchResultItem> rejectedOps) {
1185         int totalOps = 0;
1186         int totalSuccess = 0;
1187         int totalFails = 0;
1188
1189         if (logger.isDebugEnabled()) {
1190
1191             logger.debug("ESController: Build generic result set.  ES Results: "
1192                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1193         }
1194
1195         // Build a combined list of result items from the results returned
1196         // from ElasticSearch and the list of operations that we rejected
1197         // without sending to ElasticSearch.
1198         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1199         if (esResult != null) {
1200             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1201         }
1202         combinedResults.addAll(rejectedOps);
1203
1204         // Iterate over the individual results in the resulting result set.
1205         StringBuilder resultsBuilder = new StringBuilder();
1206         AtomicBoolean firstItem = new AtomicBoolean(true);
1207         for (ElasticSearchResultItem item : combinedResults) {
1208
1209             // Increment the operation counts.
1210             totalOps++;
1211             if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) {
1212                 totalSuccess++;
1213             } else {
1214                 totalFails++;
1215             }
1216
1217             // Prepend a comma to our response string unless this it the
1218             // first result in the set.
1219             if (!firstItem.compareAndSet(true, false)) {
1220                 resultsBuilder.append(", ");
1221             }
1222
1223             // Append the current result as a generic json structure.
1224             resultsBuilder.append(item.toJson());
1225         }
1226
1227         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1228                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1229     }
1230
1231     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1232         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1233     }
1234
1235     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1236             throws DocumentStoreOperationException {
1237         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1238     }
1239
1240     private void buildDocumentResult(DocumentOperationResult result, String index)
1241             throws DocumentStoreOperationException {
1242
1243         JSONParser parser = new JSONParser();
1244         JSONObject root;
1245         try {
1246             root = (JSONObject) parser.parse(result.getResult());
1247             if (isSuccess(result)) {
1248                 // Success response object
1249                 Document doc = new Document();
1250                 doc.setEtag(result.getResultVersion());
1251                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1252
1253                 doc.setContent((JSONObject) root.get("_source"));
1254                 result.setDocument(doc);
1255
1256             } else {
1257                 // Error response object
1258                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1259                 if (error != null) {
1260                     result.setError(
1261                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1262                 }
1263
1264             }
1265         } catch (Exception e) {
1266             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1267         }
1268     }
1269
1270     private String buildDocumentResponseUrl(String index, String id) {
1271         return ApiUtils.buildDocumentUri(index, id);
1272     }
1273
1274     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1275         JSONParser parser = new JSONParser();
1276         JSONObject root;
1277
1278         try {
1279             root = (JSONObject) parser.parse(result.getResult());
1280             if (isSuccess(result)) {
1281                 JSONObject hits = (JSONObject) root.get("hits");
1282                 JSONArray hitArray = (JSONArray) hits.get("hits");
1283                 SearchHits searchHits = new SearchHits();
1284                 searchHits.setTotalHits(hits.get("total").toString());
1285                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1286
1287                 for (int i = 0; i < hitArray.size(); i++) {
1288                     JSONObject hit = (JSONObject) hitArray.get(i);
1289                     SearchHit searchHit = new SearchHit();
1290                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1291                     Document doc = new Document();
1292                     if (hit.get(JSON_ATTR_VERSION) != null) {
1293                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1294                     }
1295
1296                     doc.setUrl(
1297                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1298                     doc.setContent((JSONObject) hit.get("_source"));
1299                     searchHit.setDocument(doc);
1300                     searchHitArray.add(searchHit);
1301                 }
1302                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1303                 result.setSearchResult(searchHits);
1304
1305                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1306                 if (aggregations != null) {
1307                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1308                     AggregationResults aggs = new AggregationResults();
1309                     aggs.setAggregations(aggResults);
1310                     result.setAggregationResult(aggs);
1311                 }
1312
1313                 // success
1314             } else {
1315                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1316                 if (error != null) {
1317                     result.setError(
1318                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1319                 }
1320             }
1321         } catch (Exception e) {
1322             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1323         }
1324     }
1325
1326     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1327         JSONParser parser = new JSONParser();
1328         JSONObject root;
1329         try {
1330             root = (JSONObject) parser.parse(result.getResult());
1331             if (isSuccess(result)) {
1332                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1333                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1334                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1335                 SuggestHits suggestHits = new SuggestHits();
1336                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1337
1338                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1339
1340                 for (int i = 0; i < optionsArray.size(); i++) {
1341                     JSONObject hit = (JSONObject) optionsArray.get(i);
1342
1343                     SuggestHit suggestHit = new SuggestHit();
1344                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1345                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1346                     Document doc = new Document();
1347                     if (hit.get(JSON_ATTR_VERSION) != null) {
1348                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1349                     }
1350                     doc.setUrl(
1351                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1352
1353                     doc.setContent((JSONObject) hit.get("payload"));
1354                     suggestHit.setDocument(doc);
1355                     suggestHitArray.add(suggestHit);
1356                 }
1357                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1358                 result.setSuggestResult(suggestHits);
1359
1360                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1361                 if (aggregations != null) {
1362                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1363                     AggregationResults aggs = new AggregationResults();
1364                     aggs.setAggregations(aggResults);
1365                     result.setAggregationResult(aggs);
1366                 }
1367
1368                 // success
1369             } else {
1370                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1371                 if (error != null) {
1372                     result.setError(
1373                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1374                 }
1375             }
1376         } catch (Exception e) {
1377             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1378         }
1379     }
1380
1381     /**
1382      * Record the timing of the operation in the metrics log.
1383      *
1384      */
1385     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1386             String... args) {
1387         metricsLogger.info(message,
1388                 new LogFields() //
1389                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1390                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),
1391                 override, args);
1392     }
1393 }