60874884e78f92ce526e35f730b16545bc3859ee
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Optional;
53 import java.util.Properties;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import javax.ws.rs.HttpMethod;
56 import javax.ws.rs.core.Response.Status;
57 import javax.ws.rs.core.UriBuilder;
58 import org.eclipse.jetty.http.HttpStatus;
59 import org.json.simple.JSONArray;
60 import org.json.simple.JSONObject;
61 import org.json.simple.parser.JSONParser;
62 import org.json.simple.parser.ParseException;
63 import org.onap.aai.cl.api.LogFields;
64 import org.onap.aai.cl.api.LogLine;
65 import org.onap.aai.cl.api.Logger;
66 import org.onap.aai.cl.eelf.LoggerFactory;
67 import org.onap.aai.cl.mdc.MdcContext;
68 import org.onap.aai.cl.mdc.MdcOverride;
69 import org.onap.aai.sa.rest.AnalysisConfiguration;
70 import org.onap.aai.sa.rest.ApiUtils;
71 import org.onap.aai.sa.rest.BulkRequest;
72 import org.onap.aai.sa.rest.BulkRequest.OperationType;
73 import org.onap.aai.sa.rest.DocumentSchema;
74 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
75 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
76 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage;
77 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
79 import org.onap.aai.sa.searchdbabstraction.entity.Document;
80 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
81 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
83 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
84 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
85 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
86 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
87 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
88 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
89 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
90 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
91 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
92 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
93 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
94 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
95
96 /**
97  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
98  */
99 public class ElasticSearchHttpController implements DocumentStoreInterface {
100
101     private static ElasticSearchHttpController instance = null;
102
103     private static final Logger logger =
104             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
105     private static final Logger metricsLogger =
106             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
107
108     private static final String URL_QUERY_VERSION = "version=";
109
110     private static final String JSON_ATTR_VERSION = "_version";
111     private static final String JSON_ATTR_ERROR = "error";
112     private static final String JSON_ATTR_REASON = "reason";
113
114     private static final String DEFAULT_TYPE = "default";
115
116     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
117     private static final String MSG_RESPONSE_CODE = "Response Code : ";
118     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
119
120     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
121
122     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
123             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
124     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
125             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
126     private static final String BULK_IMPORT_INDEX_TEMPLATE =
127             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
128     private static final String BULK_DELETE_TEMPLATE =
129             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
130
131     private final ElasticSearchConfig config;
132
133     protected AnalysisConfiguration analysisConfig;
134
135     public ElasticSearchHttpController(ElasticSearchConfig config) {
136         this.config = config;
137         analysisConfig = new AnalysisConfiguration();
138
139         String rootUrl = null;
140         try {
141             if ("https".equals(config.getUriScheme())) {
142                 new ElasticSearchHttpsController(config);
143             }
144             rootUrl = buildUrl(createUriBuilder("")).toString();
145             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl);
146             checkConnection();
147             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl);
148         } catch (Exception e) {
149             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage());
150         }
151     }
152
153     public static ElasticSearchHttpController getInstance() {
154         synchronized (ElasticSearchHttpController.class) {
155             if (instance == null) {
156                 Properties properties = new Properties();
157                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
158                 try {
159                     properties.load(new FileInputStream(file));
160                 } catch (Exception e) {
161                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
162                             e.getLocalizedMessage());
163                 }
164
165                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
166                 instance = new ElasticSearchHttpController(config);
167             }
168         }
169
170         return instance;
171     }
172
173     public AnalysisConfiguration getAnalysisConfig() {
174         return analysisConfig;
175     }
176
177     @Override
178     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
179         try {
180             // Submit the request to ElasticSearch to create the index using a default document type.
181             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
182                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
183
184             // ElasticSearch will return us a 200 code on success when we
185             // want to report a 201, so translate the result here.
186             if (result.getResultCode() == Status.OK.getStatusCode()) {
187                 result.setResultCode(Status.CREATED.getStatusCode());
188             }
189
190             if (isSuccess(result)) {
191                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
192             }
193             return result;
194         } catch (DocumentStoreOperationException | IOException e) {
195             return new OperationResultBuilder().useDefaults()
196                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
197         }
198     }
199
200     @Override
201     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
202         try {
203             OperationResult result = createTable(index, dynamicSchema);
204
205             // ElasticSearch will return us a 200 code on success when we
206             // want to report a 201, so translate the result here.
207             if (result.getResultCode() == Status.OK.getStatusCode()) {
208                 result.setResultCode(Status.CREATED.getStatusCode());
209             }
210             if (isSuccess(result)) {
211                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
212             }
213             return result;
214         } catch (DocumentStoreOperationException e) {
215             return new OperationResultBuilder().useDefaults()
216                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
217         }
218     }
219
220     @Override
221     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
222         MdcOverride override = getStartTime(new MdcOverride());
223
224         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE);
225         OperationResult opResult = handleResponse(conn);
226         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
227         shutdownConnection(conn);
228
229         return opResult;
230     }
231
232     // @Override
233     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
234             throws DocumentStoreOperationException {
235         if (indexSettings == null) {
236             logger.debug("No settings provided.");
237         }
238
239         if (indexMappings == null) {
240             logger.debug("No mappings provided.");
241         }
242
243         MdcOverride override = getStartTime(new MdcOverride());
244
245         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
246
247         StringBuilder sb = new StringBuilder(128);
248         sb.append("{ \"settings\" : ");
249         sb.append(indexSettings);
250         sb.append(",");
251
252         sb.append("\"mappings\" : {");
253         sb.append("\"" + typeName + "\" :");
254         sb.append(indexMappings);
255         sb.append("}}");
256
257         try {
258             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
259         } catch (IOException e) {
260             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
261             throw new DocumentStoreOperationException(e.getMessage(), e);
262         }
263
264         logger.debug("Request content: " + sb);
265
266         OperationResult opResult = handleResponse(conn);
267         shutdownConnection(conn);
268         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
269
270         return opResult;
271     }
272
273     /**
274      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
275      * index.
276      *
277      * @param indexName - The name of the index to be created
278      * @param settingsAndMappings - The actual JSON object that will define the index
279      * @return - The operation result of writing into Elasticsearch
280      * @throws DocumentStoreOperationException
281      */
282     protected OperationResult createTable(String indexName, String settingsAndMappings)
283             throws DocumentStoreOperationException {
284         MdcOverride override = getStartTime(new MdcOverride());
285
286         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
287         try {
288             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
289         } catch (IOException e) {
290             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
291             throw new DocumentStoreOperationException(e.getMessage());
292         }
293
294         OperationResult result = handleResponse(conn);
295         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
296
297         return result;
298     }
299
300     @Override
301     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
302             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
303
304         if (!allowImplicitIndexCreation) {
305             // Before we do anything, make sure that the specified index actually exists in the
306             // document store - we don't want to rely on ElasticSearch to fail the document
307             // create because it could be configured to implicitly create a non-existent index,
308             // which can lead to hard-to-debug behaviour with queries down the road.
309             OperationResult indexExistsResult = checkIndexExistence(indexName);
310             if (!isSuccess(indexExistsResult)) {
311                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
312                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
313                         .result(resultMsg).failureCause(resultMsg).build();
314             }
315         }
316
317         if (document.getId() == null || document.getId().isEmpty()) {
318             return createDocumentWithoutId(indexName, document);
319         } else {
320             return createDocumentWithId(indexName, document);
321         }
322     }
323
324     @Override
325     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
326             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
327         if (!allowImplicitIndexCreation) {
328             // Before we do anything, make sure that the specified index actually exists in the
329             // document store - we don't want to rely on ElasticSearch to fail the document
330             // create because it could be configured to implicitly create a non-existent index,
331             // which can lead to hard-to-debug behaviour with queries down the road.
332             OperationResult indexExistsResult = checkIndexExistence(indexName);
333             if (!isSuccess(indexExistsResult)) {
334                 DocumentOperationResult opResult = new DocumentOperationResult();
335                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
336                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
337                 opResult.setResult(resultMsg);
338                 opResult.setFailureCause(resultMsg);
339                 return opResult;
340             }
341         }
342
343         MdcOverride override = getStartTime(new MdcOverride());
344
345         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
346                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
347
348         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
349         attachDocument(conn, document);
350
351         DocumentOperationResult opResult = getOperationResult(conn);
352         buildDocumentResult(opResult, indexName);
353
354         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
355
356         shutdownConnection(conn);
357
358         return opResult;
359     }
360
361     @Override
362     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
363             throws DocumentStoreOperationException {
364         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
365                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
366
367         MdcOverride override = getStartTime(new MdcOverride());
368
369         HttpURLConnection conn = createConnection(url, HttpMethod.DELETE);
370
371         DocumentOperationResult opResult = getOperationResult(conn);
372         buildDocumentResult(opResult, indexName);
373         // supress the etag and url in response for delete as they are not required
374         if (opResult.getDocument() != null) {
375             opResult.getDocument().setEtag(null);
376             opResult.getDocument().setUrl(null);
377         }
378
379         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
380
381         shutdownConnection(conn);
382
383         return opResult;
384     }
385
386     @Override
387     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
388             throws DocumentStoreOperationException {
389         final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId());
390         if (document.getVersion() != null) {
391             uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion());
392         }
393
394         MdcOverride override = getStartTime(new MdcOverride());
395         HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET");
396
397         DocumentOperationResult opResult = getOperationResult(conn);
398         buildDocumentResult(opResult, indexName);
399
400         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
401
402         shutdownConnection(conn);
403
404         return opResult;
405     }
406
407     @Override
408     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
409         final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString));
410
411         MdcOverride override = getStartTime(new MdcOverride());
412
413         HttpURLConnection conn = createConnection(url, "GET");
414         SearchOperationResult opResult = getSearchOperationResult(conn);
415         buildSearchResult(opResult, indexName);
416
417         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
418
419         return opResult;
420     }
421
422     @Override
423     public SearchOperationResult searchWithPayload(String indexName, String query)
424             throws DocumentStoreOperationException {
425         if (logger.isDebugEnabled()) {
426             logger.debug("Querying index: " + indexName + " with query string: " + query);
427         }
428         final URL url = buildUrl(createUriBuilder(indexName, "_search"));
429
430         MdcOverride override = getStartTime(new MdcOverride());
431
432         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
433         attachContent(conn, query);
434         logger.debug("Request body =  Elasticsearch query = " + query);
435
436         SearchOperationResult opResult = getSearchOperationResult(conn);
437         buildSearchResult(opResult, indexName);
438
439         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
440
441         shutdownConnection(conn);
442
443         return opResult;
444     }
445
446     @Override
447     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
448             throws DocumentStoreOperationException {
449         if (logger.isDebugEnabled()) {
450             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
451         }
452
453         MdcOverride override = getStartTime(new MdcOverride());
454         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST);
455         attachContent(conn, query);
456
457         logger.debug("Request body =  Elasticsearch query = " + query);
458
459         SearchOperationResult opResult = getSearchOperationResult(conn);
460         buildSuggestResult(opResult, indexName);
461
462         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
463
464         shutdownConnection(conn);
465
466         return opResult;
467     }
468
469     @Override
470     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
471         if (logger.isDebugEnabled()) {
472             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
473
474             for (BulkRequest request : requests) {
475                 dbgString.append("[").append(request).append("] ");
476             }
477
478             logger.debug(dbgString.toString());
479         }
480
481         MdcOverride override = getStartTime(new MdcOverride());
482
483         // Parse the supplied set of operations.
484         // Iterate over the list of operations which we were provided and
485         // translate them into a format that ElasticSearh understands.
486         int opCount = 0;
487         StringBuilder esOperationSet = new StringBuilder(128);
488         List<ElasticSearchResultItem> rejected = new ArrayList<>();
489         for (BulkRequest request : requests) {
490
491             // Convert the request to the syntax ElasticSearch likes.
492             if (buildEsOperation(request, esOperationSet, rejected)) {
493                 opCount++;
494             }
495         }
496
497         ElasticSearchBulkOperationResult opResult = null;
498         if (opCount > 0) {
499             HttpURLConnection conn;
500             try {
501                 conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection();
502                 conn.setRequestMethod(HttpMethod.PUT);
503                 conn.setDoOutput(true);
504                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
505                 conn.setRequestProperty("Connection", "Close");
506
507             } catch (IOException e) {
508
509                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
510                 if (logger.isDebugEnabled()) {
511                     logger.debug(Throwables.getStackTraceAsString(e));
512                 }
513
514                 throw new DocumentStoreOperationException(
515                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
516             }
517
518             StringBuilder bulkResult = new StringBuilder(128);
519             try {
520                 // Create an output stream to write our request to.
521                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
522
523                 if (logger.isDebugEnabled()) {
524                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
525                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
526                 }
527
528                 // Write the resulting request string to our output stream. (this sends the request to ES?)
529                 out.write(esOperationSet.toString());
530                 out.close();
531
532                 // Open an input stream on our connection in order to read back the results.
533                 InputStream is = conn.getInputStream();
534                 InputStreamReader inputstreamreader = new InputStreamReader(is);
535                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
536
537                 // Read the contents of the input stream into our result string...
538                 String esResponseString = null;
539
540                 while ((esResponseString = bufferedreader.readLine()) != null) {
541                     bulkResult.append(esResponseString).append("\n");
542                 }
543
544             } catch (IOException e) {
545
546                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
547                 if (logger.isDebugEnabled()) {
548                     StringWriter sw = new StringWriter();
549                     e.printStackTrace(new PrintWriter(sw));
550                     logger.debug(sw.toString());
551                 }
552
553                 throw new DocumentStoreOperationException(
554                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
555             }
556
557             if (logger.isDebugEnabled()) {
558                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
559             }
560
561             // ...and marshal the resulting string into a Java object.
562             try {
563                 opResult = marshallEsBulkResult(bulkResult.toString());
564
565             } catch (IOException e) {
566                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
567                 if (logger.isDebugEnabled()) {
568                     logger.debug(Throwables.getStackTraceAsString(e));
569                 }
570
571                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
572                         e);
573             }
574         }
575
576         OperationResult result = new OperationResultBuilder() //
577                 .resultCode(HttpStatus.MULTI_STATUS_207) //
578                 .result(buildGenericBulkResultSet(opResult, rejected)) //
579                 .build();
580
581         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
582         // it.
583         String resultStringForMetricsLog = result.getResult();
584         if (isSuccess(result)) {
585             resultStringForMetricsLog =
586                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
587         }
588
589         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
590                 new LogFields() //
591                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
592                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
593                 override);
594
595         return result;
596     }
597
598
599     /**
600      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
601      *
602      * @param indexName - The index to look for.
603      * @return - An operation result indicating the success or failure of the check.
604      * @throws DocumentStoreOperationException
605      */
606     private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
607         MdcOverride override = getStartTime(new MdcOverride());
608
609         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD);
610         int resultCode;
611         try {
612             resultCode = conn.getResponseCode();
613         } catch (IOException ex) {
614             shutdownConnection(conn);
615             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
616         }
617         logger.debug(MSG_RESPONSE_CODE + resultCode);
618
619         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
620         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
621         shutdownConnection(conn);
622
623         return opResult;
624     }
625
626     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
627             throws DocumentStoreOperationException {
628         // check if the document already exists
629         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
630
631         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
632             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
633                 opResult.setFailureCause("A document with the same id already exists.");
634             } else {
635                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
636             }
637             opResult.setResultCode(Status.CONFLICT.getStatusCode());
638             return opResult;
639         }
640
641         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()));
642         MdcOverride override = getStartTime(new MdcOverride());
643
644         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
645         attachDocument(conn, document);
646
647         opResult = getOperationResult(conn);
648         buildDocumentResult(opResult, indexName);
649
650         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
651
652         shutdownConnection(conn);
653
654         return opResult;
655     }
656
657     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
658             throws DocumentStoreOperationException {
659         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE));
660
661         MdcOverride override = getStartTime(new MdcOverride());
662         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
663         attachDocument(conn, document);
664
665         DocumentOperationResult response = getOperationResult(conn);
666         buildDocumentResult(response, indexName);
667
668         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
669
670         shutdownConnection(conn);
671
672         return response;
673     }
674
675     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
676             throws DocumentStoreOperationException {
677         conn.setRequestProperty("Connection", "Close");
678         attachContent(conn, doc.getContentInJson());
679     }
680
681     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
682             throws DocumentStoreOperationException {
683         MdcOverride override = getStartTime(new MdcOverride());
684         HttpURLConnection conn =
685                 createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD);
686         int resultCode;
687         try {
688             resultCode = conn.getResponseCode();
689         } catch (IOException ex) {
690             shutdownConnection(conn);
691             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
692         }
693
694         logger.debug(MSG_RESPONSE_CODE + resultCode);
695
696         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
697                 .useDefaults().resultCode(resultCode).build();
698
699         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
700         shutdownConnection(conn);
701
702         return opResult;
703     }
704
705     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
706         OutputStream outputStream = null;
707         OutputStreamWriter out = null;
708
709         try {
710             outputStream = conn.getOutputStream();
711         } catch (IOException e) {
712             shutdownConnection(conn);
713             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
714         }
715
716         out = new OutputStreamWriter(outputStream);
717
718         try {
719             out.write(content);
720             out.close();
721         } catch (IOException e) {
722             shutdownConnection(conn);
723             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
724         }
725     }
726
727     private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException {
728         HttpURLConnection conn = null;
729         try {
730             conn = (HttpURLConnection) url.openConnection();
731             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
732             conn.setDoOutput(true);
733             if (config.useAuth()) {
734                 conn.setRequestProperty("Authorization", config.getAuthValue());
735             }
736         } catch (IOException e) {
737             shutdownConnection(conn);
738             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
739         }
740
741         return conn;
742     }
743
744     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
745         return handleResponse(conn, new OperationResultBuilder().useDefaults());
746     }
747
748     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
749             throws DocumentStoreOperationException {
750         int resultCode;
751
752         try {
753             resultCode = conn.getResponseCode();
754         } catch (IOException ex) {
755             shutdownConnection(conn);
756             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
757         }
758
759         logger.debug(MSG_RESPONSE_CODE + resultCode);
760
761         InputStream inputStream = null;
762
763         if (!ApiUtils.isSuccessStatusCode(resultCode)) {
764             inputStream = conn.getErrorStream();
765         } else {
766             try {
767                 inputStream = conn.getInputStream();
768             } catch (IOException e) {
769                 shutdownConnection(conn);
770                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
771             }
772         }
773
774         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
775         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
776
777         StringBuilder result = new StringBuilder(128);
778         String string = null;
779
780         try {
781             while ((string = bufferedreader.readLine()) != null) {
782                 result.append(string).append("\n");
783             }
784         } catch (IOException e) {
785             shutdownConnection(conn);
786             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
787         }
788
789         if (resultCode == Status.CONFLICT.getStatusCode()) {
790             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
791         } else {
792             rb.resultCode(resultCode);
793         }
794         if (logger.isDebugEnabled()) {
795             logger.debug("Raw result string from ElasticSearch = " + result.toString());
796         }
797         rb.result(result.toString());
798         rb.resultVersion(extractVersion(result.toString()));
799         return rb.build();
800     }
801
802     private String extractVersion(String result) {
803         JSONParser parser = new JSONParser();
804         String version = null;
805         try {
806             JSONObject root = (JSONObject) parser.parse(result);
807             if (root.get(JSON_ATTR_VERSION) != null) {
808                 version = root.get(JSON_ATTR_VERSION).toString();
809             }
810         } catch (ParseException e) {
811             // Not all responses from ElasticSearch include a version, so
812             // if we don't get one back, just return an empty string rather
813             // than trigger a false failure.
814             version = "";
815         }
816         return version;
817     }
818
819     /**
820      * This convenience method gets the current system time and stores it in an attribute in the supplied
821      * {@link MdcOverride} object so that it can be used later by the metrics logger.
822      *
823      * @param override - The {@link MdcOverride} object to update.
824      * @return - The supplied {@link MdcOverride} object.
825      */
826     private MdcOverride getStartTime(MdcOverride override) {
827
828         // Grab the current time...
829         long startTimeInMs = System.currentTimeMillis();
830
831         // ...and add it as an attribute to the supplied MDC Override object.
832         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
833         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
834
835         // Return the MdcOverride object that we were passed.
836         // This looks odd, but it allows us to do stuff like:
837         //
838         // MdcOverride ov = getStartTime(new MdcOverride())
839         //
840         // which is quite handy, but also allows us to pass in an existing
841         // MdcOverride object which already has some attributes set.
842         return override;
843     }
844
845     private boolean isSuccess(OperationResult result) {
846         return ApiUtils.isSuccessStatusCode(result.getResultCode());
847     }
848
849     private UriBuilder createUriBuilder(String path, String... paths) {
850         UriBuilder builder = UriBuilder.fromPath(path);
851         for (String other : paths) {
852             builder.path(other);
853         }
854         builder.host(config.getIpAddress());
855         String port = Optional.ofNullable(config.getHttpPort()).orElse("0");
856         builder.port(Integer.valueOf(port));
857         builder.scheme(config.getUriScheme());
858         return builder;
859     }
860
861     private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException {
862         try {
863             return builder.build().toURL();
864         } catch (MalformedURLException e) {
865             logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage());
866             throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e);
867         }
868     }
869
870     private HttpURLConnection createConnection(final URL url, final String method)
871             throws DocumentStoreOperationException {
872         HttpURLConnection conn = initializeConnection(url);
873         try {
874             logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL());
875             conn.setRequestMethod(method);
876         } catch (ProtocolException e) {
877             shutdownConnection(conn);
878             throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method);
879         }
880         return conn;
881     }
882
883     private OperationResult checkConnection() throws IOException, DocumentStoreOperationException {
884         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET);
885         int resultCode = conn.getResponseCode();
886         logger.debug("getClusterHealth() response Code : " + resultCode);
887         shutdownConnection(conn);
888         return new OperationResultBuilder().resultCode(resultCode).build();
889     }
890
891     private void shutdownConnection(HttpURLConnection connection) {
892         if (connection == null) {
893             return;
894         }
895
896         final String methodName = "shutdownConnection";
897         InputStream inputstream = null;
898         OutputStream outputstream = null;
899
900         try {
901             inputstream = connection.getInputStream();
902         } catch (IOException e) {
903             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
904         } finally {
905             if (inputstream != null) {
906                 try {
907                     inputstream.close();
908                 } catch (IOException e) {
909                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
910                 }
911             }
912         }
913
914         try {
915             outputstream = connection.getOutputStream();
916         } catch (IOException e) {
917             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
918         } finally {
919             if (outputstream != null) {
920                 try {
921                     outputstream.close();
922                 } catch (IOException e) {
923                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
924                 }
925             }
926         }
927
928         connection.disconnect();
929     }
930
931     /**
932      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
933      *
934      * @param request - The request to be performed.
935      * @param sb - The string builder to append the json data to
936      * @throws DocumentStoreOperationException
937      */
938     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
939             throws DocumentStoreOperationException {
940
941         boolean retVal = true;
942         // What kind of operation are we performing?
943         switch (request.getOperationType()) {
944
945             // Create a new document.
946             case CREATE:
947
948                 // Make sure that we were supplied a document payload.
949                 if (request.getOperation().getDocument() == null) {
950
951                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
952                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
953                     return false;
954                 }
955
956                 // Make sure that the supplied document URL is formatted
957                 // correctly.
958                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
959                     fails.add(generateRejectionEntry(request.getOperationType(),
960                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
961                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
962                     return false;
963                 }
964
965                 // Validate that the specified index actually exists before we
966                 // try to perform the create.
967                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
968
969                     fails.add(generateRejectionEntry(request.getOperationType(),
970                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
971                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
972                     return false;
973                 }
974
975                 // If we were supplied an id for the new document, then
976                 // include it in the bulk operation to Elastic Search
977                 if (request.getId() == null) {
978
979                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
980
981                     // Otherwise, we just leave that parameter off and ElasticSearch
982                     // will generate one for us.
983                 } else {
984                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
985                             request.getId()));
986                 }
987
988                 try {
989                     // Append the document that we want to create.
990                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
991                 } catch (JsonProcessingException e) {
992                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
993                 }
994
995                 break;
996
997             // Update an existing document.
998             case UPDATE:
999
1000                 // Make sure that we were supplied a document payload.
1001                 if (request.getOperation().getDocument() == null) {
1002
1003                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1004                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1005                     return false;
1006                 }
1007
1008                 // Make sure that the supplied document URL is formatted
1009                 // correctly.
1010                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1011                     fails.add(generateRejectionEntry(request.getOperationType(),
1012                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1013                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1014                     return false;
1015                 }
1016
1017                 // Validate that the specified index actually exists before we
1018                 // try to perform the update.
1019                 if (!indexExists(request.getIndex())) {
1020
1021                     fails.add(generateRejectionEntry(request.getOperationType(),
1022                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1023                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1024                     return false;
1025                 }
1026
1027                 // Validate that the document we are trying to update actually
1028                 // exists before we try to perform the update.
1029                 if (!documentExists(request.getIndex(), request.getId())) {
1030
1031                     fails.add(generateRejectionEntry(request.getOperationType(),
1032                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1033                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1034                     return false;
1035                 }
1036
1037                 // It is mandatory that a version be supplied for an update operation,
1038                 // so validate that now.
1039                 if (request.getOperation().getMetaData().getEtag() == null) {
1040
1041                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1042                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1043                     return false;
1044                 }
1045
1046                 // Generate the update request...
1047                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1048                         request.getOperation().getMetaData().getEtag()));
1049
1050                 // ...and append the document that we want to update.
1051                 try {
1052                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1053                 } catch (JsonProcessingException e) {
1054                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1055                 }
1056                 break;
1057
1058             // Delete an existing document.
1059             case DELETE:
1060
1061                 // Make sure that the supplied document URL is formatted
1062                 // correctly.
1063                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1064                     fails.add(generateRejectionEntry(request.getOperationType(),
1065                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1066                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1067                     return false;
1068                 }
1069
1070                 // Validate that the specified index actually exists before we
1071                 // try to perform the delete.
1072                 if (!indexExists(request.getIndex())) {
1073
1074                     fails.add(generateRejectionEntry(request.getOperationType(),
1075                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1076                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1077                     return false;
1078                 }
1079
1080                 // Validate that the document we are trying to update actually
1081                 // exists before we try to perform the delete.
1082                 if (!documentExists(request.getIndex(), request.getId())) {
1083
1084                     fails.add(generateRejectionEntry(request.getOperationType(),
1085                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1086                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1087                     return false;
1088                 }
1089
1090                 // It is mandatory that a version be supplied for a delete operation,
1091                 // so validate that now.
1092                 if (request.getOperation().getMetaData().getEtag() == null) {
1093
1094                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1095                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1096                     return false;
1097                 }
1098
1099                 // Generate the delete request.
1100                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1101                         request.getOperation().getMetaData().getEtag()));
1102                 break;
1103             default:
1104         }
1105
1106         return retVal;
1107     }
1108
1109     private boolean indexExists(String index) throws DocumentStoreOperationException {
1110         return isSuccess(checkIndexExistence(index));
1111     }
1112
1113     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1114         return isSuccess(checkDocumentExistence(index, id));
1115     }
1116
1117     /**
1118      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1119      * document store.
1120      *
1121      * @param rejectReason - A message describing why the operation was rejected.
1122      * @param anId - The identifier associated with the document being acted on.
1123      * @param statusCode - An HTTP status code.
1124      * @return - A result set item.
1125      */
1126     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1127             String anId, int statusCode, String originalUrl) {
1128
1129         ElasticSearchError err = new ElasticSearchError();
1130         err.setReason(rejectReason);
1131
1132         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1133         op.setIndex(index);
1134         op.setId(anId);
1135         op.setStatus(statusCode);
1136         op.setError(err);
1137         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1138
1139         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1140
1141         switch (opType) {
1142             case CREATE:
1143                 rejectionResult.setCreate(op);
1144                 break;
1145             case UPDATE:
1146                 rejectionResult.setIndex(op);
1147                 break;
1148             case DELETE:
1149                 rejectionResult.setDelete(op);
1150                 break;
1151             default:
1152         }
1153
1154         return rejectionResult;
1155     }
1156
1157     /**
1158      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1159      * marshals it into a Java object.
1160      *
1161      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1162      * @return - The marshalled response.
1163      * @throws JsonParseException
1164      * @throws JsonMappingException
1165      * @throws IOException
1166      */
1167     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1168         if (jsonResult != null) {
1169             if (logger.isDebugEnabled()) {
1170                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1171             }
1172
1173             ObjectMapper mapper = new ObjectMapper();
1174             mapper.setSerializationInclusion(Include.NON_EMPTY);
1175
1176             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1177         }
1178
1179         return null;
1180     }
1181
1182     /**
1183      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1184      *
1185      * @param esResult - ElasticSearch bulk operations response.
1186      * @return - A generic result set.
1187      */
1188     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1189             List<ElasticSearchResultItem> rejectedOps) {
1190         int totalOps = 0;
1191         int totalSuccess = 0;
1192         int totalFails = 0;
1193
1194         if (logger.isDebugEnabled()) {
1195
1196             logger.debug("ESController: Build generic result set.  ES Results: "
1197                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1198         }
1199
1200         // Build a combined list of result items from the results returned
1201         // from ElasticSearch and the list of operations that we rejected
1202         // without sending to ElasticSearch.
1203         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1204         if (esResult != null) {
1205             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1206         }
1207         combinedResults.addAll(rejectedOps);
1208
1209         // Iterate over the individual results in the resulting result set.
1210         StringBuilder resultsBuilder = new StringBuilder();
1211         AtomicBoolean firstItem = new AtomicBoolean(true);
1212         for (ElasticSearchResultItem item : combinedResults) {
1213
1214             // Increment the operation counts.
1215             totalOps++;
1216             if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) {
1217                 totalSuccess++;
1218             } else {
1219                 totalFails++;
1220             }
1221
1222             // Prepend a comma to our response string unless this it the
1223             // first result in the set.
1224             if (!firstItem.compareAndSet(true, false)) {
1225                 resultsBuilder.append(", ");
1226             }
1227
1228             // Append the current result as a generic json structure.
1229             resultsBuilder.append(item.toJson());
1230         }
1231
1232         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1233                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1234     }
1235
1236     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1237         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1238     }
1239
1240     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1241             throws DocumentStoreOperationException {
1242         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1243     }
1244
1245     private void buildDocumentResult(DocumentOperationResult result, String index)
1246             throws DocumentStoreOperationException {
1247
1248         JSONParser parser = new JSONParser();
1249         JSONObject root;
1250         try {
1251             root = (JSONObject) parser.parse(result.getResult());
1252             if (isSuccess(result)) {
1253                 // Success response object
1254                 Document doc = new Document();
1255                 doc.setEtag(result.getResultVersion());
1256                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1257
1258                 doc.setContent((JSONObject) root.get("_source"));
1259                 result.setDocument(doc);
1260
1261             } else {
1262                 // Error response object
1263                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1264                 if (error != null) {
1265                     result.setError(
1266                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1267                 }
1268
1269             }
1270         } catch (Exception e) {
1271             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1272         }
1273     }
1274
1275     private String buildDocumentResponseUrl(String index, String id) {
1276         return ApiUtils.buildDocumentUri(index, id);
1277     }
1278
1279     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1280         JSONParser parser = new JSONParser();
1281         JSONObject root;
1282
1283         try {
1284             root = (JSONObject) parser.parse(result.getResult());
1285             if (isSuccess(result)) {
1286                 JSONObject hits = (JSONObject) root.get("hits");
1287                 JSONArray hitArray = (JSONArray) hits.get("hits");
1288                 SearchHits searchHits = new SearchHits();
1289                 searchHits.setTotalHits(hits.get("total").toString());
1290                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1291
1292                 for (int i = 0; i < hitArray.size(); i++) {
1293                     JSONObject hit = (JSONObject) hitArray.get(i);
1294                     SearchHit searchHit = new SearchHit();
1295                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1296                     Document doc = new Document();
1297                     if (hit.get(JSON_ATTR_VERSION) != null) {
1298                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1299                     }
1300
1301                     doc.setUrl(
1302                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1303                     doc.setContent((JSONObject) hit.get("_source"));
1304                     searchHit.setDocument(doc);
1305                     searchHitArray.add(searchHit);
1306                 }
1307                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1308                 result.setSearchResult(searchHits);
1309
1310                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1311                 if (aggregations != null) {
1312                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1313                     AggregationResults aggs = new AggregationResults();
1314                     aggs.setAggregations(aggResults);
1315                     result.setAggregationResult(aggs);
1316                 }
1317
1318                 // success
1319             } else {
1320                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1321                 if (error != null) {
1322                     result.setError(
1323                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1324                 }
1325             }
1326         } catch (Exception e) {
1327             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1328         }
1329     }
1330
1331     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1332         JSONParser parser = new JSONParser();
1333         JSONObject root;
1334         try {
1335             root = (JSONObject) parser.parse(result.getResult());
1336             if (isSuccess(result)) {
1337                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1338                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1339                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1340                 SuggestHits suggestHits = new SuggestHits();
1341                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1342
1343                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1344
1345                 for (int i = 0; i < optionsArray.size(); i++) {
1346                     JSONObject hit = (JSONObject) optionsArray.get(i);
1347
1348                     SuggestHit suggestHit = new SuggestHit();
1349                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1350                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1351                     Document doc = new Document();
1352                     if (hit.get(JSON_ATTR_VERSION) != null) {
1353                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1354                     }
1355                     doc.setUrl(
1356                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1357
1358                     doc.setContent((JSONObject) hit.get("payload"));
1359                     suggestHit.setDocument(doc);
1360                     suggestHitArray.add(suggestHit);
1361                 }
1362                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1363                 result.setSuggestResult(suggestHits);
1364
1365                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1366                 if (aggregations != null) {
1367                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1368                     AggregationResults aggs = new AggregationResults();
1369                     aggs.setAggregations(aggResults);
1370                     result.setAggregationResult(aggs);
1371                 }
1372
1373                 // success
1374             } else {
1375                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1376                 if (error != null) {
1377                     result.setError(
1378                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1379                 }
1380             }
1381         } catch (Exception e) {
1382             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1383         }
1384     }
1385
1386     /**
1387      * Record the timing of the operation in the metrics log.
1388      *
1389      */
1390     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1391             String... args) {
1392         metricsLogger.info(message,
1393                 new LogFields() //
1394                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1395                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),
1396                 override, args);
1397     }
1398 }