fix bulk api to support secure rest calls to an ES
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Optional;
53 import java.util.Properties;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import javax.ws.rs.HttpMethod;
56 import javax.ws.rs.core.MediaType;
57 import javax.ws.rs.core.Response.Status;
58 import javax.ws.rs.core.UriBuilder;
59 import org.eclipse.jetty.http.HttpStatus;
60 import org.json.simple.JSONArray;
61 import org.json.simple.JSONObject;
62 import org.json.simple.parser.JSONParser;
63 import org.json.simple.parser.ParseException;
64 import org.onap.aai.cl.api.LogFields;
65 import org.onap.aai.cl.api.LogLine;
66 import org.onap.aai.cl.api.Logger;
67 import org.onap.aai.cl.eelf.LoggerFactory;
68 import org.onap.aai.cl.mdc.MdcContext;
69 import org.onap.aai.cl.mdc.MdcOverride;
70 import org.onap.aai.sa.rest.AnalysisConfiguration;
71 import org.onap.aai.sa.rest.ApiUtils;
72 import org.onap.aai.sa.rest.BulkRequest;
73 import org.onap.aai.sa.rest.BulkRequest.OperationType;
74 import org.onap.aai.sa.rest.DocumentSchema;
75 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
76 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
77 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage;
78 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
80 import org.onap.aai.sa.searchdbabstraction.entity.Document;
81 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
83 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
84 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
85 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
86 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
87 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
88 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
89 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
90 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
91 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
92 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
93 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
94 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
95 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
96
97 /**
98  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
99  */
100 public class ElasticSearchHttpController implements DocumentStoreInterface {
101
102     private static ElasticSearchHttpController instance = null;
103
104     private static final Logger logger =
105             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
106     private static final Logger metricsLogger =
107             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
108
109     private static final String URL_QUERY_VERSION = "version=";
110
111     private static final String JSON_ATTR_VERSION = "_version";
112     private static final String JSON_ATTR_ERROR = "error";
113     private static final String JSON_ATTR_REASON = "reason";
114
115     private static final String DEFAULT_TYPE = "default";
116
117     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
118     private static final String MSG_RESPONSE_CODE = "Response Code : ";
119     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
120
121     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
122
123     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
124             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
125     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
126             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
127     private static final String BULK_IMPORT_INDEX_TEMPLATE =
128             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
129     private static final String BULK_DELETE_TEMPLATE =
130             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
131     public final static String APPLICATION_XND_JSON_TYPE = new MediaType("application", "x-ndjson").toString();
132
133     private final ElasticSearchConfig config;
134
135     protected AnalysisConfiguration analysisConfig;
136
137     public ElasticSearchHttpController(ElasticSearchConfig config) {
138         this.config = config;
139         analysisConfig = new AnalysisConfiguration();
140
141         String rootUrl = null;
142         try {
143             if ("https".equals(config.getUriScheme())) {
144                 new ElasticSearchHttpsController(config);
145             }
146             rootUrl = buildUrl(createUriBuilder("")).toString();
147             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl);
148             checkConnection();
149             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl);
150         } catch (Exception e) {
151             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage());
152         }
153     }
154
155     public static ElasticSearchHttpController getInstance() {
156         synchronized (ElasticSearchHttpController.class) {
157             if (instance == null) {
158                 Properties properties = new Properties();
159                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
160                 try {
161                     properties.load(new FileInputStream(file));
162                 } catch (Exception e) {
163                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
164                             e.getLocalizedMessage());
165                 }
166
167                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
168                 instance = new ElasticSearchHttpController(config);
169             }
170         }
171
172         return instance;
173     }
174
175     public AnalysisConfiguration getAnalysisConfig() {
176         return analysisConfig;
177     }
178
179     @Override
180     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
181         try {
182             // Submit the request to ElasticSearch to create the index using a default document type.
183             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
184                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
185
186             // ElasticSearch will return us a 200 code on success when we
187             // want to report a 201, so translate the result here.
188             if (result.getResultCode() == Status.OK.getStatusCode()) {
189                 result.setResultCode(Status.CREATED.getStatusCode());
190             }
191
192             if (isSuccess(result)) {
193                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
194             }
195             return result;
196         } catch (DocumentStoreOperationException | IOException e) {
197             return new OperationResultBuilder().useDefaults()
198                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
199         }
200     }
201
202     @Override
203     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
204         try {
205             OperationResult result = createTable(index, dynamicSchema);
206
207             // ElasticSearch will return us a 200 code on success when we
208             // want to report a 201, so translate the result here.
209             if (result.getResultCode() == Status.OK.getStatusCode()) {
210                 result.setResultCode(Status.CREATED.getStatusCode());
211             }
212             if (isSuccess(result)) {
213                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
214             }
215             return result;
216         } catch (DocumentStoreOperationException e) {
217             return new OperationResultBuilder().useDefaults()
218                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
219         }
220     }
221
222     @Override
223     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
224         MdcOverride override = getStartTime(new MdcOverride());
225
226         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE);
227         OperationResult opResult = handleResponse(conn);
228         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
229         shutdownConnection(conn);
230
231         return opResult;
232     }
233
234     // @Override
235     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
236             throws DocumentStoreOperationException {
237         if (indexSettings == null) {
238             logger.debug("No settings provided.");
239         }
240
241         if (indexMappings == null) {
242             logger.debug("No mappings provided.");
243         }
244
245         MdcOverride override = getStartTime(new MdcOverride());
246
247         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
248
249         StringBuilder sb = new StringBuilder(128);
250         sb.append("{ \"settings\" : ");
251         sb.append(indexSettings);
252         sb.append(",");
253
254         sb.append("\"mappings\" : {");
255         sb.append("\"" + typeName + "\" :");
256         sb.append(indexMappings);
257         sb.append("}}");
258
259         try {
260             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
261         } catch (IOException e) {
262             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
263             throw new DocumentStoreOperationException(e.getMessage(), e);
264         }
265
266         logger.debug("Request content: " + sb);
267
268         OperationResult opResult = handleResponse(conn);
269         shutdownConnection(conn);
270         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
271
272         return opResult;
273     }
274
275     /**
276      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
277      * index.
278      *
279      * @param indexName - The name of the index to be created
280      * @param settingsAndMappings - The actual JSON object that will define the index
281      * @return - The operation result of writing into Elasticsearch
282      * @throws DocumentStoreOperationException
283      */
284     protected OperationResult createTable(String indexName, String settingsAndMappings)
285             throws DocumentStoreOperationException {
286         MdcOverride override = getStartTime(new MdcOverride());
287
288         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT);
289         try {
290             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
291         } catch (IOException e) {
292             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
293             throw new DocumentStoreOperationException(e.getMessage());
294         }
295
296         OperationResult result = handleResponse(conn);
297         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
298
299         return result;
300     }
301
302     @Override
303     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
304             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
305
306         if (!allowImplicitIndexCreation) {
307             // Before we do anything, make sure that the specified index actually exists in the
308             // document store - we don't want to rely on ElasticSearch to fail the document
309             // create because it could be configured to implicitly create a non-existent index,
310             // which can lead to hard-to-debug behaviour with queries down the road.
311             OperationResult indexExistsResult = checkIndexExistence(indexName);
312             if (!isSuccess(indexExistsResult)) {
313                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
314                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
315                         .result(resultMsg).failureCause(resultMsg).build();
316             }
317         }
318
319         if (document.getId() == null || document.getId().isEmpty()) {
320             return createDocumentWithoutId(indexName, document);
321         } else {
322             return createDocumentWithId(indexName, document);
323         }
324     }
325
326     @Override
327     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
328             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
329         if (!allowImplicitIndexCreation) {
330             // Before we do anything, make sure that the specified index actually exists in the
331             // document store - we don't want to rely on ElasticSearch to fail the document
332             // create because it could be configured to implicitly create a non-existent index,
333             // which can lead to hard-to-debug behaviour with queries down the road.
334             OperationResult indexExistsResult = checkIndexExistence(indexName);
335             if (!isSuccess(indexExistsResult)) {
336                 DocumentOperationResult opResult = new DocumentOperationResult();
337                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
338                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
339                 opResult.setResult(resultMsg);
340                 opResult.setFailureCause(resultMsg);
341                 return opResult;
342             }
343         }
344
345         MdcOverride override = getStartTime(new MdcOverride());
346
347         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
348                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
349
350         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
351         attachDocument(conn, document);
352
353         DocumentOperationResult opResult = getOperationResult(conn);
354         buildDocumentResult(opResult, indexName);
355
356         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
357
358         shutdownConnection(conn);
359
360         return opResult;
361     }
362
363     @Override
364     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
365             throws DocumentStoreOperationException {
366         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())
367                 .replaceQuery(URL_QUERY_VERSION + document.getVersion()));
368
369         MdcOverride override = getStartTime(new MdcOverride());
370
371         HttpURLConnection conn = createConnection(url, HttpMethod.DELETE);
372
373         DocumentOperationResult opResult = getOperationResult(conn);
374         buildDocumentResult(opResult, indexName);
375         // supress the etag and url in response for delete as they are not required
376         if (opResult.getDocument() != null) {
377             opResult.getDocument().setEtag(null);
378             opResult.getDocument().setUrl(null);
379         }
380
381         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
382
383         shutdownConnection(conn);
384
385         return opResult;
386     }
387
388     @Override
389     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
390             throws DocumentStoreOperationException {
391         final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId());
392         if (document.getVersion() != null) {
393             uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion());
394         }
395
396         MdcOverride override = getStartTime(new MdcOverride());
397         HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET");
398
399         DocumentOperationResult opResult = getOperationResult(conn);
400         buildDocumentResult(opResult, indexName);
401
402         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
403
404         shutdownConnection(conn);
405
406         return opResult;
407     }
408
409     @Override
410     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
411         final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString));
412
413         MdcOverride override = getStartTime(new MdcOverride());
414
415         HttpURLConnection conn = createConnection(url, "GET");
416         SearchOperationResult opResult = getSearchOperationResult(conn);
417         buildSearchResult(opResult, indexName);
418
419         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
420
421         return opResult;
422     }
423
424     @Override
425     public SearchOperationResult searchWithPayload(String indexName, String query)
426             throws DocumentStoreOperationException {
427         if (logger.isDebugEnabled()) {
428             logger.debug("Querying index: " + indexName + " with query string: " + query);
429         }
430         final URL url = buildUrl(createUriBuilder(indexName, "_search"));
431
432         MdcOverride override = getStartTime(new MdcOverride());
433
434         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
435         attachContent(conn, query);
436         logger.debug("Request body =  Elasticsearch query = " + query);
437
438         SearchOperationResult opResult = getSearchOperationResult(conn);
439         buildSearchResult(opResult, indexName);
440
441         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
442
443         shutdownConnection(conn);
444
445         return opResult;
446     }
447
448     @Override
449     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
450             throws DocumentStoreOperationException {
451         if (logger.isDebugEnabled()) {
452             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
453         }
454
455         MdcOverride override = getStartTime(new MdcOverride());
456         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST);
457         attachContent(conn, query);
458
459         logger.debug("Request body =  Elasticsearch query = " + query);
460
461         SearchOperationResult opResult = getSearchOperationResult(conn);
462         buildSuggestResult(opResult, indexName);
463
464         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
465
466         shutdownConnection(conn);
467
468         return opResult;
469     }
470
471     @Override
472     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
473         if (logger.isDebugEnabled()) {
474             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
475
476             for (BulkRequest request : requests) {
477                 dbgString.append("[").append(request).append("] ");
478             }
479
480             logger.debug(dbgString.toString());
481         }
482
483         MdcOverride override = getStartTime(new MdcOverride());
484
485         // Parse the supplied set of operations.
486         // Iterate over the list of operations which we were provided and
487         // translate them into a format that ElasticSearh understands.
488         int opCount = 0;
489         StringBuilder esOperationSet = new StringBuilder(128);
490         List<ElasticSearchResultItem> rejected = new ArrayList<>();
491         for (BulkRequest request : requests) {
492
493             // Convert the request to the syntax ElasticSearch likes.
494             if (buildEsOperation(request, esOperationSet, rejected)) {
495                 opCount++;
496             }
497         }
498
499         ElasticSearchBulkOperationResult opResult = null;
500         if (opCount > 0) {
501             HttpURLConnection conn;
502             try {
503                 conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection();
504                 conn.setRequestMethod(HttpMethod.PUT);
505                 conn.setDoOutput(true);
506                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_XND_JSON_TYPE);
507                 if(config.useAuth()){
508                         conn.setRequestProperty("Authorization", config.getAuthValue());
509                 }
510                 conn.setRequestProperty("Connection", "Close");
511
512             } catch (IOException e) {
513
514                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
515                 if (logger.isDebugEnabled()) {
516                     logger.debug(Throwables.getStackTraceAsString(e));
517                 }
518
519                 throw new DocumentStoreOperationException(
520                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
521             }
522
523             StringBuilder bulkResult = new StringBuilder(128);
524             try {
525                 // Create an output stream to write our request to.
526                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
527
528                 if (logger.isDebugEnabled()) {
529                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
530                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
531                 }
532
533                 // Write the resulting request string to our output stream. (this sends the request to ES?)
534                 out.write(esOperationSet.toString());
535                 out.close();
536
537                 // Open an input stream on our connection in order to read back the results.
538                 InputStream is = conn.getInputStream();
539                 InputStreamReader inputstreamreader = new InputStreamReader(is);
540                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
541
542                 // Read the contents of the input stream into our result string...
543                 String esResponseString = null;
544
545                 while ((esResponseString = bufferedreader.readLine()) != null) {
546                     bulkResult.append(esResponseString).append("\n");
547                 }
548
549             } catch (IOException e) {
550
551                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
552                 if (logger.isDebugEnabled()) {
553                     StringWriter sw = new StringWriter();
554                     e.printStackTrace(new PrintWriter(sw));
555                     logger.debug(sw.toString());
556                 }
557
558                 throw new DocumentStoreOperationException(
559                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
560             }
561
562             if (logger.isDebugEnabled()) {
563                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
564             }
565
566             // ...and marshal the resulting string into a Java object.
567             try {
568                 opResult = marshallEsBulkResult(bulkResult.toString());
569
570             } catch (IOException e) {
571                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
572                 if (logger.isDebugEnabled()) {
573                     logger.debug(Throwables.getStackTraceAsString(e));
574                 }
575
576                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
577                         e);
578             }
579         }
580
581         OperationResult result = new OperationResultBuilder() //
582                 .resultCode(HttpStatus.MULTI_STATUS_207) //
583                 .result(buildGenericBulkResultSet(opResult, rejected)) //
584                 .build();
585
586         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
587         // it.
588         String resultStringForMetricsLog = result.getResult();
589         if (isSuccess(result)) {
590             resultStringForMetricsLog =
591                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
592         }
593
594         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
595                 new LogFields() //
596                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
597                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
598                 override);
599
600         return result;
601     }
602
603
604     /**
605      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
606      *
607      * @param indexName - The index to look for.
608      * @return - An operation result indicating the success or failure of the check.
609      * @throws DocumentStoreOperationException
610      */
611     private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
612         MdcOverride override = getStartTime(new MdcOverride());
613
614         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD);
615         int resultCode;
616         try {
617             resultCode = conn.getResponseCode();
618         } catch (IOException ex) {
619             shutdownConnection(conn);
620             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
621         }
622         logger.debug(MSG_RESPONSE_CODE + resultCode);
623
624         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
625         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
626         shutdownConnection(conn);
627
628         return opResult;
629     }
630
631     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
632             throws DocumentStoreOperationException {
633         // check if the document already exists
634         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
635
636         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
637             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
638                 opResult.setFailureCause("A document with the same id already exists.");
639             } else {
640                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
641             }
642             opResult.setResultCode(Status.CONFLICT.getStatusCode());
643             return opResult;
644         }
645
646         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()));
647         MdcOverride override = getStartTime(new MdcOverride());
648
649         HttpURLConnection conn = createConnection(url, HttpMethod.PUT);
650         attachDocument(conn, document);
651
652         opResult = getOperationResult(conn);
653         buildDocumentResult(opResult, indexName);
654
655         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
656
657         shutdownConnection(conn);
658
659         return opResult;
660     }
661
662     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
663             throws DocumentStoreOperationException {
664         final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE));
665
666         MdcOverride override = getStartTime(new MdcOverride());
667         HttpURLConnection conn = createConnection(url, HttpMethod.POST);
668         attachDocument(conn, document);
669
670         DocumentOperationResult response = getOperationResult(conn);
671         buildDocumentResult(response, indexName);
672
673         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
674
675         shutdownConnection(conn);
676
677         return response;
678     }
679
680     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
681             throws DocumentStoreOperationException {
682         conn.setRequestProperty("Connection", "Close");
683         attachContent(conn, doc.getContentInJson());
684     }
685
686     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
687             throws DocumentStoreOperationException {
688         MdcOverride override = getStartTime(new MdcOverride());
689         HttpURLConnection conn =
690                 createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD);
691         int resultCode;
692         try {
693             resultCode = conn.getResponseCode();
694         } catch (IOException ex) {
695             shutdownConnection(conn);
696             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
697         }
698
699         logger.debug(MSG_RESPONSE_CODE + resultCode);
700
701         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
702                 .useDefaults().resultCode(resultCode).build();
703
704         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
705         shutdownConnection(conn);
706
707         return opResult;
708     }
709
710     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
711         OutputStream outputStream = null;
712         OutputStreamWriter out = null;
713
714         try {
715             outputStream = conn.getOutputStream();
716         } catch (IOException e) {
717             shutdownConnection(conn);
718             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
719         }
720
721         out = new OutputStreamWriter(outputStream);
722
723         try {
724             out.write(content);
725             out.close();
726         } catch (IOException e) {
727             shutdownConnection(conn);
728             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
729         }
730     }
731
732     private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException {
733         HttpURLConnection conn = null;
734         try {
735             conn = (HttpURLConnection) url.openConnection();
736             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
737             conn.setDoOutput(true);
738             if (config.useAuth()) {
739                 conn.setRequestProperty("Authorization", config.getAuthValue());
740             }
741         } catch (IOException e) {
742             shutdownConnection(conn);
743             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
744         }
745
746         return conn;
747     }
748
749     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
750         return handleResponse(conn, new OperationResultBuilder().useDefaults());
751     }
752
753     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
754             throws DocumentStoreOperationException {
755         int resultCode;
756
757         try {
758             resultCode = conn.getResponseCode();
759         } catch (IOException ex) {
760             shutdownConnection(conn);
761             throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex);
762         }
763
764         logger.debug(MSG_RESPONSE_CODE + resultCode);
765
766         InputStream inputStream = null;
767
768         if (!ApiUtils.isSuccessStatusCode(resultCode)) {
769             inputStream = conn.getErrorStream();
770         } else {
771             try {
772                 inputStream = conn.getInputStream();
773             } catch (IOException e) {
774                 shutdownConnection(conn);
775                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
776             }
777         }
778
779         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
780         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
781
782         StringBuilder result = new StringBuilder(128);
783         String string = null;
784
785         try {
786             while ((string = bufferedreader.readLine()) != null) {
787                 result.append(string).append("\n");
788             }
789         } catch (IOException e) {
790             shutdownConnection(conn);
791             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
792         }
793
794         if (resultCode == Status.CONFLICT.getStatusCode()) {
795             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
796         } else {
797             rb.resultCode(resultCode);
798         }
799         if (logger.isDebugEnabled()) {
800             logger.debug("Raw result string from ElasticSearch = " + result.toString());
801         }
802         rb.result(result.toString());
803         rb.resultVersion(extractVersion(result.toString()));
804         return rb.build();
805     }
806
807     private String extractVersion(String result) {
808         JSONParser parser = new JSONParser();
809         String version = null;
810         try {
811             JSONObject root = (JSONObject) parser.parse(result);
812             if (root.get(JSON_ATTR_VERSION) != null) {
813                 version = root.get(JSON_ATTR_VERSION).toString();
814             }
815         } catch (ParseException e) {
816             // Not all responses from ElasticSearch include a version, so
817             // if we don't get one back, just return an empty string rather
818             // than trigger a false failure.
819             version = "";
820         }
821         return version;
822     }
823
824     /**
825      * This convenience method gets the current system time and stores it in an attribute in the supplied
826      * {@link MdcOverride} object so that it can be used later by the metrics logger.
827      *
828      * @param override - The {@link MdcOverride} object to update.
829      * @return - The supplied {@link MdcOverride} object.
830      */
831     private MdcOverride getStartTime(MdcOverride override) {
832
833         // Grab the current time...
834         long startTimeInMs = System.currentTimeMillis();
835
836         // ...and add it as an attribute to the supplied MDC Override object.
837         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
838         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
839
840         // Return the MdcOverride object that we were passed.
841         // This looks odd, but it allows us to do stuff like:
842         //
843         // MdcOverride ov = getStartTime(new MdcOverride())
844         //
845         // which is quite handy, but also allows us to pass in an existing
846         // MdcOverride object which already has some attributes set.
847         return override;
848     }
849
850     private boolean isSuccess(OperationResult result) {
851         return ApiUtils.isSuccessStatusCode(result.getResultCode());
852     }
853
854     private UriBuilder createUriBuilder(String path, String... paths) {
855         UriBuilder builder = UriBuilder.fromPath(path);
856         for (String other : paths) {
857             builder.path(other);
858         }
859         builder.host(config.getIpAddress());
860         String port = Optional.ofNullable(config.getHttpPort()).orElse("0");
861         builder.port(Integer.valueOf(port));
862         builder.scheme(config.getUriScheme());
863         return builder;
864     }
865
866     private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException {
867         try {
868             return builder.build().toURL();
869         } catch (MalformedURLException e) {
870             logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage());
871             throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e);
872         }
873     }
874
875     private HttpURLConnection createConnection(final URL url, final String method)
876             throws DocumentStoreOperationException {
877         HttpURLConnection conn = initializeConnection(url);
878         try {
879             logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL());
880             conn.setRequestMethod(method);
881         } catch (ProtocolException e) {
882             shutdownConnection(conn);
883             throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method);
884         }
885         return conn;
886     }
887
888     private OperationResult checkConnection() throws IOException, DocumentStoreOperationException {
889         HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET);
890         int resultCode = conn.getResponseCode();
891         logger.debug("getClusterHealth() response Code : " + resultCode);
892         shutdownConnection(conn);
893         return new OperationResultBuilder().resultCode(resultCode).build();
894     }
895
896     private void shutdownConnection(HttpURLConnection connection) {
897         if (connection == null) {
898             return;
899         }
900
901         final String methodName = "shutdownConnection";
902         InputStream inputstream = null;
903         OutputStream outputstream = null;
904
905         try {
906             inputstream = connection.getInputStream();
907         } catch (IOException e) {
908             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
909         } finally {
910             if (inputstream != null) {
911                 try {
912                     inputstream.close();
913                 } catch (IOException e) {
914                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
915                 }
916             }
917         }
918
919         try {
920             outputstream = connection.getOutputStream();
921         } catch (IOException e) {
922             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
923         } finally {
924             if (outputstream != null) {
925                 try {
926                     outputstream.close();
927                 } catch (IOException e) {
928                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
929                 }
930             }
931         }
932
933         connection.disconnect();
934     }
935
936     /**
937      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
938      *
939      * @param request - The request to be performed.
940      * @param sb - The string builder to append the json data to
941      * @throws DocumentStoreOperationException
942      */
943     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
944             throws DocumentStoreOperationException {
945
946         boolean retVal = true;
947         // What kind of operation are we performing?
948         switch (request.getOperationType()) {
949
950             // Create a new document.
951             case CREATE:
952
953                 // Make sure that we were supplied a document payload.
954                 if (request.getOperation().getDocument() == null) {
955
956                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
957                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
958                     return false;
959                 }
960
961                 // Make sure that the supplied document URL is formatted
962                 // correctly.
963                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
964                     fails.add(generateRejectionEntry(request.getOperationType(),
965                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
966                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
967                     return false;
968                 }
969
970                 // Validate that the specified index actually exists before we
971                 // try to perform the create.
972                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
973
974                     fails.add(generateRejectionEntry(request.getOperationType(),
975                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
976                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
977                     return false;
978                 }
979
980                 // If we were supplied an id for the new document, then
981                 // include it in the bulk operation to Elastic Search
982                 if (request.getId() == null) {
983
984                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
985
986                     // Otherwise, we just leave that parameter off and ElasticSearch
987                     // will generate one for us.
988                 } else {
989                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
990                             request.getId()));
991                 }
992
993                 try {
994                     // Append the document that we want to create.
995                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
996                 } catch (JsonProcessingException e) {
997                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
998                 }
999
1000                 break;
1001
1002             // Update an existing document.
1003             case UPDATE:
1004
1005                 // Make sure that we were supplied a document payload.
1006                 if (request.getOperation().getDocument() == null) {
1007
1008                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1009                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1010                     return false;
1011                 }
1012
1013                 // Make sure that the supplied document URL is formatted
1014                 // correctly.
1015                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1016                     fails.add(generateRejectionEntry(request.getOperationType(),
1017                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1018                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1019                     return false;
1020                 }
1021
1022                 // Validate that the specified index actually exists before we
1023                 // try to perform the update.
1024                 if (!indexExists(request.getIndex())) {
1025
1026                     fails.add(generateRejectionEntry(request.getOperationType(),
1027                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1028                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1029                     return false;
1030                 }
1031
1032                 // Validate that the document we are trying to update actually
1033                 // exists before we try to perform the update.
1034                 if (!documentExists(request.getIndex(), request.getId())) {
1035
1036                     fails.add(generateRejectionEntry(request.getOperationType(),
1037                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1038                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1039                     return false;
1040                 }
1041
1042                 // It is mandatory that a version be supplied for an update operation,
1043                 // so validate that now.
1044                 if (request.getOperation().getMetaData().getEtag() == null) {
1045
1046                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1047                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1048                     return false;
1049                 }
1050
1051                 // Generate the update request...
1052                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1053                         request.getOperation().getMetaData().getEtag()));
1054
1055                 // ...and append the document that we want to update.
1056                 try {
1057                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1058                 } catch (JsonProcessingException e) {
1059                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1060                 }
1061                 break;
1062
1063             // Delete an existing document.
1064             case DELETE:
1065
1066                 // Make sure that the supplied document URL is formatted
1067                 // correctly.
1068                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1069                     fails.add(generateRejectionEntry(request.getOperationType(),
1070                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1071                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1072                     return false;
1073                 }
1074
1075                 // Validate that the specified index actually exists before we
1076                 // try to perform the delete.
1077                 if (!indexExists(request.getIndex())) {
1078
1079                     fails.add(generateRejectionEntry(request.getOperationType(),
1080                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1081                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1082                     return false;
1083                 }
1084
1085                 // Validate that the document we are trying to update actually
1086                 // exists before we try to perform the delete.
1087                 if (!documentExists(request.getIndex(), request.getId())) {
1088
1089                     fails.add(generateRejectionEntry(request.getOperationType(),
1090                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1091                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1092                     return false;
1093                 }
1094
1095                 // It is mandatory that a version be supplied for a delete operation,
1096                 // so validate that now.
1097                 if (request.getOperation().getMetaData().getEtag() == null) {
1098
1099                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1100                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1101                     return false;
1102                 }
1103
1104                 // Generate the delete request.
1105                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1106                         request.getOperation().getMetaData().getEtag()));
1107                 break;
1108             default:
1109         }
1110
1111         return retVal;
1112     }
1113
1114     private boolean indexExists(String index) throws DocumentStoreOperationException {
1115         return isSuccess(checkIndexExistence(index));
1116     }
1117
1118     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1119         return isSuccess(checkDocumentExistence(index, id));
1120     }
1121
1122     /**
1123      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1124      * document store.
1125      *
1126      * @param rejectReason - A message describing why the operation was rejected.
1127      * @param anId - The identifier associated with the document being acted on.
1128      * @param statusCode - An HTTP status code.
1129      * @return - A result set item.
1130      */
1131     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1132             String anId, int statusCode, String originalUrl) {
1133
1134         ElasticSearchError err = new ElasticSearchError();
1135         err.setReason(rejectReason);
1136
1137         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1138         op.setIndex(index);
1139         op.setId(anId);
1140         op.setStatus(statusCode);
1141         op.setError(err);
1142         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1143
1144         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1145
1146         switch (opType) {
1147             case CREATE:
1148                 rejectionResult.setCreate(op);
1149                 break;
1150             case UPDATE:
1151                 rejectionResult.setIndex(op);
1152                 break;
1153             case DELETE:
1154                 rejectionResult.setDelete(op);
1155                 break;
1156             default:
1157         }
1158
1159         return rejectionResult;
1160     }
1161
1162     /**
1163      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1164      * marshals it into a Java object.
1165      *
1166      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1167      * @return - The marshalled response.
1168      * @throws JsonParseException
1169      * @throws JsonMappingException
1170      * @throws IOException
1171      */
1172     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1173         if (jsonResult != null) {
1174             if (logger.isDebugEnabled()) {
1175                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1176             }
1177
1178             ObjectMapper mapper = new ObjectMapper();
1179             mapper.setSerializationInclusion(Include.NON_EMPTY);
1180
1181             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1182         }
1183
1184         return null;
1185     }
1186
1187     /**
1188      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1189      *
1190      * @param esResult - ElasticSearch bulk operations response.
1191      * @return - A generic result set.
1192      */
1193     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1194             List<ElasticSearchResultItem> rejectedOps) {
1195         int totalOps = 0;
1196         int totalSuccess = 0;
1197         int totalFails = 0;
1198
1199         if (logger.isDebugEnabled()) {
1200
1201             logger.debug("ESController: Build generic result set.  ES Results: "
1202                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1203         }
1204
1205         // Build a combined list of result items from the results returned
1206         // from ElasticSearch and the list of operations that we rejected
1207         // without sending to ElasticSearch.
1208         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1209         if (esResult != null) {
1210             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1211         }
1212         combinedResults.addAll(rejectedOps);
1213
1214         // Iterate over the individual results in the resulting result set.
1215         StringBuilder resultsBuilder = new StringBuilder();
1216         AtomicBoolean firstItem = new AtomicBoolean(true);
1217         for (ElasticSearchResultItem item : combinedResults) {
1218
1219             // Increment the operation counts.
1220             totalOps++;
1221             if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) {
1222                 totalSuccess++;
1223             } else {
1224                 totalFails++;
1225             }
1226
1227             // Prepend a comma to our response string unless this it the
1228             // first result in the set.
1229             if (!firstItem.compareAndSet(true, false)) {
1230                 resultsBuilder.append(", ");
1231             }
1232
1233             // Append the current result as a generic json structure.
1234             resultsBuilder.append(item.toJson());
1235         }
1236
1237         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1238                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1239     }
1240
1241     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1242         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1243     }
1244
1245     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1246             throws DocumentStoreOperationException {
1247         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1248     }
1249
1250     private void buildDocumentResult(DocumentOperationResult result, String index)
1251             throws DocumentStoreOperationException {
1252
1253         JSONParser parser = new JSONParser();
1254         JSONObject root;
1255         try {
1256             root = (JSONObject) parser.parse(result.getResult());
1257             if (isSuccess(result)) {
1258                 // Success response object
1259                 Document doc = new Document();
1260                 doc.setEtag(result.getResultVersion());
1261                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1262
1263                 doc.setContent((JSONObject) root.get("_source"));
1264                 result.setDocument(doc);
1265
1266             } else {
1267                 // Error response object
1268                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1269                 if (error != null) {
1270                     result.setError(
1271                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1272                 }
1273
1274             }
1275         } catch (Exception e) {
1276             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1277         }
1278     }
1279
1280     private String buildDocumentResponseUrl(String index, String id) {
1281         return ApiUtils.buildDocumentUri(index, id);
1282     }
1283
1284     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1285         JSONParser parser = new JSONParser();
1286         JSONObject root;
1287
1288         try {
1289             root = (JSONObject) parser.parse(result.getResult());
1290             if (isSuccess(result)) {
1291                 JSONObject hits = (JSONObject) root.get("hits");
1292                 JSONArray hitArray = (JSONArray) hits.get("hits");
1293                 SearchHits searchHits = new SearchHits();
1294                 searchHits.setTotalHits(hits.get("total").toString());
1295                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1296
1297                 for (int i = 0; i < hitArray.size(); i++) {
1298                     JSONObject hit = (JSONObject) hitArray.get(i);
1299                     SearchHit searchHit = new SearchHit();
1300                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1301                     Document doc = new Document();
1302                     if (hit.get(JSON_ATTR_VERSION) != null) {
1303                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1304                     }
1305
1306                     doc.setUrl(
1307                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1308                     doc.setContent((JSONObject) hit.get("_source"));
1309                     searchHit.setDocument(doc);
1310                     searchHitArray.add(searchHit);
1311                 }
1312                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1313                 result.setSearchResult(searchHits);
1314
1315                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1316                 if (aggregations != null) {
1317                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1318                     AggregationResults aggs = new AggregationResults();
1319                     aggs.setAggregations(aggResults);
1320                     result.setAggregationResult(aggs);
1321                 }
1322
1323                 // success
1324             } else {
1325                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1326                 if (error != null) {
1327                     result.setError(
1328                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1329                 }
1330             }
1331         } catch (Exception e) {
1332             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1333         }
1334     }
1335
1336     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1337         JSONParser parser = new JSONParser();
1338         JSONObject root;
1339         try {
1340             root = (JSONObject) parser.parse(result.getResult());
1341             if (isSuccess(result)) {
1342                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1343                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1344                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1345                 SuggestHits suggestHits = new SuggestHits();
1346                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1347
1348                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1349
1350                 for (int i = 0; i < optionsArray.size(); i++) {
1351                     JSONObject hit = (JSONObject) optionsArray.get(i);
1352
1353                     SuggestHit suggestHit = new SuggestHit();
1354                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1355                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1356                     Document doc = new Document();
1357                     if (hit.get(JSON_ATTR_VERSION) != null) {
1358                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1359                     }
1360                     doc.setUrl(
1361                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1362
1363                     doc.setContent((JSONObject) hit.get("payload"));
1364                     suggestHit.setDocument(doc);
1365                     suggestHitArray.add(suggestHit);
1366                 }
1367                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1368                 result.setSuggestResult(suggestHits);
1369
1370                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1371                 if (aggregations != null) {
1372                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1373                     AggregationResults aggs = new AggregationResults();
1374                     aggs.setAggregations(aggResults);
1375                     result.setAggregationResult(aggs);
1376                 }
1377
1378                 // success
1379             } else {
1380                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1381                 if (error != null) {
1382                     result.setError(
1383                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1384                 }
1385             }
1386         } catch (Exception e) {
1387             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1388         }
1389     }
1390
1391     /**
1392      * Record the timing of the operation in the metrics log.
1393      *
1394      */
1395     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1396             String... args) {
1397         metricsLogger.info(message,
1398                 new LogFields() //
1399                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1400                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),
1401                 override, args);
1402     }
1403 }