Replace duplicated String literals with Constants
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.json.simple.JSONArray;
57 import org.json.simple.JSONObject;
58 import org.json.simple.parser.JSONParser;
59 import org.json.simple.parser.ParseException;
60 import org.onap.aai.cl.api.LogFields;
61 import org.onap.aai.cl.api.LogLine;
62 import org.onap.aai.cl.api.Logger;
63 import org.onap.aai.cl.eelf.LoggerFactory;
64 import org.onap.aai.cl.mdc.MdcContext;
65 import org.onap.aai.cl.mdc.MdcOverride;
66 import org.onap.aai.sa.rest.AnalysisConfiguration;
67 import org.onap.aai.sa.rest.ApiUtils;
68 import org.onap.aai.sa.rest.BulkRequest;
69 import org.onap.aai.sa.rest.BulkRequest.OperationType;
70 import org.onap.aai.sa.rest.DocumentSchema;
71 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
73 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
75 import org.onap.aai.sa.searchdbabstraction.entity.Document;
76 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
77 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
80 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
81 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
84 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
85 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
86 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
87 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
88 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
89
90 /**
91  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
92  */
93 public class ElasticSearchHttpController implements DocumentStoreInterface {
94
95     private static ElasticSearchHttpController instance = null;
96
97     private static final Logger logger =
98             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
99     private static final Logger metricsLogger =
100             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
101
102     private static final String JSON_ATTR_VERSION = "_version";
103     private static final String JSON_ATTR_ERROR = "error";
104     private static final String JSON_ATTR_REASON = "reason";
105
106     private static final String QUERY_PARAM_VERSION = "?version=";
107
108     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
109     private static final String MSG_RESPONSE_CODE = "Response Code : ";
110     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
111     private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
112     private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
113     private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
114             "Internal Error: ElasticSearch operation fault occurred";
115     private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
116             "Failed to get the response code from the connection.";
117     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
118
119     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
120             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
121     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
122             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
123     private static final String BULK_IMPORT_INDEX_TEMPLATE =
124             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
125     private static final String BULK_DELETE_TEMPLATE =
126             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
127
128     private final ElasticSearchConfig config;
129
130     private static final String DEFAULT_TYPE = "default";
131
132     protected AnalysisConfiguration analysisConfig;
133
134     public static ElasticSearchHttpController getInstance() {
135         synchronized (ElasticSearchHttpController.class) {
136             if (instance == null) {
137                 Properties properties = new Properties();
138                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
139                 try {
140                     properties.load(new FileInputStream(file));
141                 } catch (Exception e) {
142                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
143                             e.getLocalizedMessage());
144                 }
145
146                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
147                 instance = new ElasticSearchHttpController(config);
148             }
149         }
150
151         return instance;
152     }
153
154     public ElasticSearchHttpController(ElasticSearchConfig config) {
155         this.config = config;
156         analysisConfig = new AnalysisConfiguration();
157
158         try {
159             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
160             checkConnection();
161             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
162         } catch (Exception e) {
163             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
164                     e.getMessage());
165         }
166     }
167
168     public AnalysisConfiguration getAnalysisConfig() {
169         return analysisConfig;
170     }
171
172     @Override
173     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
174         OperationResult result = new OperationResult();
175         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
176
177         try {
178
179             // Submit the request to ElasticSearch to create the index using a
180             // default document type.
181             result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
182                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
183
184             // ElasticSearch will return us a 200 code on success when we
185             // want to report a 201, so translate the result here.
186             if (result.getResultCode() == Status.OK.getStatusCode()) {
187                 result.setResultCode(Status.CREATED.getStatusCode());
188             }
189
190             if (isSuccess(result)) {
191                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
192             }
193         } catch (DocumentStoreOperationException | IOException e) {
194             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
195         }
196
197         return result;
198     }
199
200     @Override
201     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
202         OperationResult result = new OperationResult();
203         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
204
205         try {
206             result = createTable(index, dynamicSchema);
207
208             // ElasticSearch will return us a 200 code on success when we
209             // want to report a 201, so translate the result here.
210             if (result.getResultCode() == Status.OK.getStatusCode()) {
211                 result.setResultCode(Status.CREATED.getStatusCode());
212             }
213             if (isSuccess(result)) {
214                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
215             }
216         } catch (DocumentStoreOperationException e) {
217             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
218         }
219
220         return result;
221     }
222
223     @Override
224     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
225
226         // Initialize operation result with a failure codes / fault string
227         OperationResult opResult = new OperationResult();
228         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
229         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
230
231         // Grab the current time so we can use it to generate a metrics log.
232         MdcOverride override = getStartTime(new MdcOverride());
233
234         String fullUrl = getFullUrl("/" + indexName + "/", false);
235         HttpURLConnection conn = initializeConnection(fullUrl);
236
237         logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
238
239         try {
240             conn.setRequestMethod("DELETE");
241         } catch (ProtocolException e) {
242             shutdownConnection(conn);
243             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
244         }
245
246         handleResponse(conn, opResult);
247
248         // Generate a metrics log so we can track how long the operation took.
249         metricsLogger
250                 .info(SearchDbMsgs.DELETE_INDEX_TIME,
251                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
252                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
253                         override, indexName);
254
255         shutdownConnection(conn);
256
257         return opResult;
258     }
259
260     private OperationResult checkConnection() throws IOException {
261         String fullUrl = getFullUrl("/_cluster/health", false);
262         URL url = null;
263         HttpURLConnection conn = null;
264
265         url = new URL(fullUrl);
266         conn = (HttpURLConnection) url.openConnection();
267         conn.setRequestMethod("GET");
268         conn.setDoOutput(true);
269         logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
270
271         int resultCode = conn.getResponseCode();
272         logger.debug("getClusterHealth() response Code : " + resultCode);
273         OperationResult opResult = new OperationResult();
274         opResult.setResultCode(resultCode);
275
276         shutdownConnection(conn);
277
278         return opResult;
279     }
280
281     private String getFullUrl(String resourceUrl, boolean isSecure) {
282
283         final String host = config.getIpAddress();
284         final String port = config.getHttpPort();
285
286         if (isSecure) {
287             return String.format("https://%s:%s%s", host, port, resourceUrl);
288         } else {
289             return String.format("http://%s:%s%s", host, port, resourceUrl);
290         }
291     }
292
293     private void shutdownConnection(HttpURLConnection connection) {
294         if (connection == null) {
295             return;
296         }
297
298         final String methodName = "shutdownConnection";
299         InputStream inputstream = null;
300         OutputStream outputstream = null;
301
302         try {
303             inputstream = connection.getInputStream();
304         } catch (IOException e) {
305             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
306         } finally {
307             if (inputstream != null) {
308                 try {
309                     inputstream.close();
310                 } catch (IOException e) {
311                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
312                 }
313             }
314         }
315
316         try {
317             outputstream = connection.getOutputStream();
318         } catch (IOException e) {
319             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
320         } finally {
321             if (outputstream != null) {
322                 try {
323                     outputstream.close();
324                 } catch (IOException e) {
325                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
326                 }
327             }
328         }
329
330         connection.disconnect();
331     }
332
333     // @Override
334     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
335             throws DocumentStoreOperationException {
336
337         if (indexSettings == null) {
338             logger.debug("No settings provided.");
339         }
340
341         if (indexMappings == null) {
342             logger.debug("No mappings provided.");
343         }
344
345         OperationResult opResult = new OperationResult();
346
347         // Initialize operation result with a failure codes / fault string
348         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
349         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
350
351         // Grab the current time so we can use it to generate a metrics log.
352         MdcOverride override = getStartTime(new MdcOverride());
353
354         String fullUrl = getFullUrl("/" + indexName + "/", false);
355         HttpURLConnection conn = initializeConnection(fullUrl);
356
357         try {
358             conn.setRequestMethod("PUT");
359         } catch (ProtocolException e) {
360             shutdownConnection(conn);
361             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
362         }
363
364         StringBuilder sb = new StringBuilder(128);
365         sb.append("{ \"settings\" : ");
366         sb.append(indexSettings);
367         sb.append(",");
368
369         sb.append("\"mappings\" : {");
370         sb.append("\"" + typeName + "\" :");
371         sb.append(indexMappings);
372         sb.append("}}");
373
374         try {
375             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
376         } catch (IOException e) {
377             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
378             throw new DocumentStoreOperationException(e.getMessage(), e);
379         }
380
381         logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
382         logger.debug("Request content: " + sb.toString());
383
384         handleResponse(conn, opResult);
385
386         shutdownConnection(conn);
387
388         // Generate a metrics log so we can track how long the operation took.
389         metricsLogger
390                 .info(SearchDbMsgs.CREATE_INDEX_TIME,
391                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
392                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
393                         override, indexName);
394
395         return opResult;
396     }
397
398     /**
399      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
400      * index.
401      *
402      * @param indexName - The name of the index to be created
403      * @param settingsAndMappings - The actual JSON object that will define the index
404      * @return - The operation result of writing into Elasticsearch
405      * @throws DocumentStoreOperationException
406      */
407     protected OperationResult createTable(String indexName, String settingsAndMappings)
408             throws DocumentStoreOperationException {
409         OperationResult result = new OperationResult();
410         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
411         result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
412
413         // Grab the current time so we can use it to generate a metrics log.
414         MdcOverride override = getStartTime(new MdcOverride());
415
416         String fullUrl = getFullUrl("/" + indexName + "/", false);
417         HttpURLConnection conn = initializeConnection(fullUrl);
418
419         try {
420             conn.setRequestMethod("PUT");
421         } catch (ProtocolException e) {
422             shutdownConnection(conn);
423             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
424         }
425
426         try {
427             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
428         } catch (IOException e) {
429             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
430             throw new DocumentStoreOperationException(e.getMessage());
431         }
432         handleResponse(conn, result);
433
434         // Generate a metrics log so we can track how long the operation took.
435         metricsLogger
436                 .info(SearchDbMsgs.CREATE_INDEX_TIME,
437                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
438                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
439                         override, indexName);
440
441         return result;
442     }
443
444     @Override
445     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
446             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
447
448         if (!allowImplicitIndexCreation) {
449             // Before we do anything, make sure that the specified index actually exists in the
450             // document store - we don't want to rely on ElasticSearch to fail the document
451             // create because it could be configured to implicitly create a non-existent index,
452             // which can lead to hard-to-debug behaviour with queries down the road.
453             OperationResult indexExistsResult = checkIndexExistence(indexName);
454             if (!isSuccess(indexExistsResult)) {
455                 DocumentOperationResult opResult = new DocumentOperationResult();
456                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
457                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
458                 opResult.setResult(resultMsg);
459                 opResult.setFailureCause(resultMsg);
460                 return opResult;
461             }
462         }
463
464         if (document.getId() == null || document.getId().isEmpty()) {
465             return createDocumentWithoutId(indexName, document);
466         } else {
467             return createDocumentWithId(indexName, document);
468         }
469     }
470
471     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
472             throws DocumentStoreOperationException {
473         // check if the document already exists
474         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
475
476         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
477             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
478                 opResult.setFailureCause("A document with the same id already exists.");
479             } else {
480                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
481             }
482             opResult.setResultCode(Status.CONFLICT.getStatusCode());
483             return opResult;
484         }
485
486         opResult = new DocumentOperationResult();
487         // Initialize operation result with a failure codes / fault string
488         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
489         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
490
491         // Grab the current time so we can use it to generate a metrics log.
492         MdcOverride override = getStartTime(new MdcOverride());
493
494         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
495         HttpURLConnection conn = initializeConnection(fullUrl);
496
497         try {
498             conn.setRequestMethod("PUT");
499         } catch (ProtocolException e) {
500             shutdownConnection(conn);
501             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
502         }
503
504         attachDocument(conn, document);
505
506         logger.debug("Sending 'PUT' request to: " + conn.getURL());
507
508         handleResponse(conn, opResult);
509         buildDocumentResult(opResult, indexName);
510
511         // Generate a metrics log so we can track how long the operation took.
512         metricsLogger
513                 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
514                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
515                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
516                         override, indexName);
517
518         shutdownConnection(conn);
519
520         return opResult;
521     }
522
523     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
524             throws DocumentStoreOperationException {
525
526         DocumentOperationResult response = new DocumentOperationResult();
527         // Initialize operation result with a failure codes / fault string
528         response.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
529         response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
530
531         // Grab the current time so we can use it to generate a metrics log.
532         MdcOverride override = getStartTime(new MdcOverride());
533
534         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
535         HttpURLConnection conn = initializeConnection(fullUrl);
536
537         try {
538             conn.setRequestMethod("POST");
539         } catch (ProtocolException e) {
540             shutdownConnection(conn);
541             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
542         }
543
544         attachDocument(conn, document);
545
546         logger.debug("Sending 'POST' request to: " + conn.getURL());
547
548         handleResponse(conn, response);
549         buildDocumentResult(response, indexName);
550
551         // Generate a metrics log so we can track how long the operation took.
552         metricsLogger
553                 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
554                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
555                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
556                         override, indexName);
557
558         shutdownConnection(conn);
559
560         return response;
561     }
562
563     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
564             throws DocumentStoreOperationException {
565         conn.setRequestProperty("Connection", "Close");
566         attachContent(conn, doc.getContentInJson());
567     }
568
569     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
570             throws DocumentStoreOperationException {
571         DocumentOperationResult opResult = new DocumentOperationResult();
572
573         // Initialize operation result with a failure codes / fault string
574         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
575
576         // Grab the current time so we can use it to generate a metrics log.
577         MdcOverride override = getStartTime(new MdcOverride());
578
579         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
580         HttpURLConnection conn = initializeConnection(fullUrl);
581
582         try {
583             conn.setRequestMethod("HEAD");
584         } catch (ProtocolException e) {
585             shutdownConnection(conn);
586             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
587         }
588
589         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
590
591         int resultCode;
592         try {
593             resultCode = conn.getResponseCode();
594         } catch (IOException e) {
595             shutdownConnection(conn);
596             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
597         }
598
599         logger.debug(MSG_RESPONSE_CODE + resultCode);
600
601         opResult.setResultCode(resultCode);
602
603         // Generate a metrics log so we can track how long the operation took.
604         metricsLogger
605                 .info(SearchDbMsgs.GET_DOCUMENT_TIME,
606                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
607                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
608                         override, indexName, docId);
609
610         shutdownConnection(conn);
611
612         return opResult;
613     }
614
615     @Override
616     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
617             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
618
619         if (!allowImplicitIndexCreation) {
620             // Before we do anything, make sure that the specified index actually exists in the
621             // document store - we don't want to rely on ElasticSearch to fail the document
622             // create because it could be configured to implicitly create a non-existent index,
623             // which can lead to hard-to-debug behaviour with queries down the road.
624             OperationResult indexExistsResult = checkIndexExistence(indexName);
625             if (!isSuccess(indexExistsResult)) {
626                 DocumentOperationResult opResult = new DocumentOperationResult();
627                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
628                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
629                 opResult.setResult(resultMsg);
630                 opResult.setFailureCause(resultMsg);
631                 return opResult;
632             }
633         }
634
635         DocumentOperationResult opResult = new DocumentOperationResult();
636
637         // Initialize operation result with a failure codes / fault string
638         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
639         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
640
641         // Grab the current time so we can use it to generate a metrics log.
642         MdcOverride override = getStartTime(new MdcOverride());
643
644         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
645                 + document.getVersion(), false);
646         HttpURLConnection conn = initializeConnection(fullUrl);
647
648         try {
649             conn.setRequestMethod("PUT");
650         } catch (ProtocolException e) {
651             shutdownConnection(conn);
652             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
653         }
654
655         attachDocument(conn, document);
656
657         logger.debug("Sending 'PUT' request to: " + conn.getURL());
658
659         handleResponse(conn, opResult);
660         buildDocumentResult(opResult, indexName);
661
662         // Generate a metrics log so we can track how long the operation took.
663         metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
664                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
665                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
666                 override, indexName, document.getId());
667
668         shutdownConnection(conn);
669
670         return opResult;
671     }
672
673     @Override
674     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
675             throws DocumentStoreOperationException {
676         DocumentOperationResult opResult = new DocumentOperationResult();
677
678         // Initialize operation result with a failure codes / fault string
679         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
680         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
681
682         // Grab the current time so we can use it to generate a metrics log.
683         MdcOverride override = getStartTime(new MdcOverride());
684
685         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
686                 + document.getVersion(), false);
687         HttpURLConnection conn = initializeConnection(fullUrl);
688
689         try {
690             conn.setRequestMethod("DELETE");
691         } catch (ProtocolException e) {
692             shutdownConnection(conn);
693             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
694         }
695
696         logger.debug("\nSending 'DELETE' request to " + conn.getURL());
697
698         handleResponse(conn, opResult);
699         buildDocumentResult(opResult, indexName);
700         // supress the etag and url in response for delete as they are not required
701         if (opResult.getDocument() != null) {
702             opResult.getDocument().setEtag(null);
703             opResult.getDocument().setUrl(null);
704         }
705
706         // Generate a metrics log so we can track how long the operation took.
707         metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
708                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
709                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
710                 override, indexName, document.getId());
711
712         shutdownConnection(conn);
713
714         return opResult;
715     }
716
717     @Override
718     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
719             throws DocumentStoreOperationException {
720         DocumentOperationResult opResult = new DocumentOperationResult();
721
722         // Initialize operation result with a failure codes / fault string
723         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
724         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
725
726         // Grab the current time so we can use it to generate a metrics log.
727         MdcOverride override = getStartTime(new MdcOverride());
728
729         String fullUrl = null;
730         if (document.getVersion() == null) {
731             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
732         } else {
733             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
734                     + document.getVersion(), false);
735         }
736         HttpURLConnection conn = initializeConnection(fullUrl);
737
738         logger.debug("\nSending 'GET' request to: " + conn.getURL());
739
740         handleResponse(conn, opResult);
741         buildDocumentResult(opResult, indexName);
742
743         // Generate a metrics log so we can track how long the operation took.
744         metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
745                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
746                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
747                 override, indexName, document.getId());
748
749         shutdownConnection(conn);
750
751         return opResult;
752     }
753
754     @Override
755     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
756         SearchOperationResult opResult = new SearchOperationResult();
757
758         // Initialize operation result with a failure codes / fault string
759         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
760         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
761
762         String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
763
764         // Grab the current time so we can use it to generate a metrics log.
765         MdcOverride override = getStartTime(new MdcOverride());
766
767         HttpURLConnection conn = initializeConnection(fullUrl);
768
769         try {
770             conn.setRequestMethod("GET");
771         } catch (ProtocolException e) {
772             shutdownConnection(conn);
773             throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
774         }
775
776         logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
777
778         handleResponse(conn, opResult);
779         buildSearchResult(opResult, indexName);
780
781
782         metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
783                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
784                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
785                 override, indexName, queryString);
786
787         return opResult;
788     }
789
790     @Override
791     public SearchOperationResult searchWithPayload(String indexName, String query)
792             throws DocumentStoreOperationException {
793         SearchOperationResult opResult = new SearchOperationResult();
794
795         if (logger.isDebugEnabled()) {
796             logger.debug("Querying index: " + indexName + " with query string: " + query);
797         }
798
799         // Initialize operation result with a failure codes / fault string
800         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
801         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
802
803         String fullUrl = getFullUrl("/" + indexName + "/_search", false);
804
805         // Grab the current time so we can use it to generate a metrics log.
806         MdcOverride override = getStartTime(new MdcOverride());
807
808         HttpURLConnection conn = initializeConnection(fullUrl);
809
810         try {
811             conn.setRequestMethod("POST");
812         } catch (ProtocolException e) {
813             shutdownConnection(conn);
814             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
815         }
816
817         attachContent(conn, query);
818
819         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
820         logger.debug("Request body =  Elasticsearch query = " + query);
821
822         handleResponse(conn, opResult);
823         buildSearchResult(opResult, indexName);
824
825         metricsLogger
826                 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
827                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
828                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
829                         override, indexName, query);
830
831         shutdownConnection(conn);
832
833         return opResult;
834     }
835
836
837     @Override
838     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
839             throws DocumentStoreOperationException {
840
841         SearchOperationResult opResult = new SearchOperationResult();
842
843         if (logger.isDebugEnabled()) {
844             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
845         }
846
847         // Initialize operation result with a failure codes / fault string
848         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
849         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
850
851         String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
852
853         // Grab the current time so we can use it to generate a metrics log.
854         MdcOverride override = getStartTime(new MdcOverride());
855
856         HttpURLConnection conn = initializeConnection(fullUrl);
857
858         try {
859             conn.setRequestMethod("POST");
860         } catch (ProtocolException e) {
861             shutdownConnection(conn);
862             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
863         }
864
865         attachContent(conn, query);
866
867         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
868         logger.debug("Request body =  Elasticsearch query = " + query);
869
870         handleResponse(conn, opResult);
871         buildSuggestResult(opResult, indexName);
872
873         metricsLogger
874                 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
875                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
876                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
877                         override, indexName, query);
878
879         shutdownConnection(conn);
880
881         return opResult;
882     }
883
884     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
885         OutputStream outputStream = null;
886         OutputStreamWriter out = null;
887
888         try {
889             outputStream = conn.getOutputStream();
890         } catch (IOException e) {
891             shutdownConnection(conn);
892             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
893         }
894
895         out = new OutputStreamWriter(outputStream);
896
897         try {
898             out.write(content);
899             out.close();
900         } catch (IOException e) {
901             shutdownConnection(conn);
902             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
903         }
904     }
905
906     private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
907         URL url = null;
908         HttpURLConnection conn = null;
909
910         try {
911             url = new URL(fullUrl);
912         } catch (MalformedURLException e) {
913             throw new DocumentStoreOperationException("Error building a URL with " + url, e);
914         }
915
916         try {
917             conn = (HttpURLConnection) url.openConnection();
918             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
919             conn.setDoOutput(true);
920         } catch (IOException e) {
921             shutdownConnection(conn);
922             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
923         }
924
925         return conn;
926     }
927
928     private void handleResponse(HttpURLConnection conn, OperationResult opResult)
929             throws DocumentStoreOperationException {
930         int resultCode;
931
932         try {
933             resultCode = conn.getResponseCode();
934         } catch (IOException e) {
935             shutdownConnection(conn);
936             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
937         }
938
939         logger.debug(MSG_RESPONSE_CODE + resultCode);
940
941         InputStream inputStream = null;
942
943         if (!isSuccessCode(resultCode)) {
944             inputStream = conn.getErrorStream();
945         } else {
946             try {
947                 inputStream = conn.getInputStream();
948             } catch (IOException e) {
949                 shutdownConnection(conn);
950                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
951             }
952         }
953
954         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
955         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
956
957         StringBuilder result = new StringBuilder(128);
958         String string = null;
959
960         try {
961             while ((string = bufferedreader.readLine()) != null) {
962                 result.append(string).append("\n");
963             }
964         } catch (IOException e) {
965             shutdownConnection(conn);
966             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
967         }
968
969         if (resultCode == Status.CONFLICT.getStatusCode()) {
970             opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
971         } else {
972             opResult.setResultCode(resultCode);
973         }
974         if (logger.isDebugEnabled()) {
975             logger.debug("Raw result string from ElasticSearch = " + result.toString());
976         }
977         opResult.setResult(result.toString());
978         opResult.setResultVersion(extractVersion(result.toString()));
979     }
980
981     private String extractVersion(String result) {
982         JSONParser parser = new JSONParser();
983         String version = null;
984         try {
985             JSONObject root = (JSONObject) parser.parse(result);
986             if (root.get(JSON_ATTR_VERSION) != null) {
987                 version = root.get(JSON_ATTR_VERSION).toString();
988             }
989         } catch (ParseException e) {
990             // Not all responses from ElasticSearch include a version, so
991             // if we don't get one back, just return an empty string rather
992             // than trigger a false failure.
993             version = "";
994         }
995         return version;
996     }
997
998     /**
999      * This convenience method gets the current system time and stores it in an attribute in the supplied
1000      * {@link MdcOverride} object so that it can be used later by the metrics logger.
1001      *
1002      * @param override - The {@link MdcOverride} object to update.
1003      * @return - The supplied {@link MdcOverride} object.
1004      */
1005     private MdcOverride getStartTime(MdcOverride override) {
1006
1007         // Grab the current time...
1008         long startTimeInMs = System.currentTimeMillis();
1009
1010         // ...and add it as an attribute to the supplied MDC Override
1011         // object.
1012         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1013         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1014
1015         // Return the MdcOverride object that we were passed.
1016         // This looks odd, but it allows us to do stuff like:
1017         //
1018         // MdcOverride ov = getStartTime(new MdcOverride())
1019         //
1020         // which is quite handy, but also allows us to pass in an existing
1021         // MdcOverride object which already has some attributes set.
1022         return override;
1023     }
1024
1025     private boolean isSuccess(OperationResult result) {
1026         return isSuccessCode(result.getResultCode());
1027     }
1028
1029     private boolean isSuccessCode(int statusCode) {
1030         return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
1031     }
1032
1033     @Override
1034     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
1035
1036         if (logger.isDebugEnabled()) {
1037             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
1038
1039             for (BulkRequest request : requests) {
1040                 dbgString.append("[").append(request).append("] ");
1041             }
1042
1043             logger.debug(dbgString.toString());
1044         }
1045
1046         // Grab the current time so we can use it to generate a metrics log.
1047         MdcOverride override = getStartTime(new MdcOverride());
1048
1049         // Parse the supplied set of operations.
1050         // Iterate over the list of operations which we were provided and
1051         // translate them into a format that ElasticSearh understands.
1052         int opCount = 0;
1053         StringBuilder esOperationSet = new StringBuilder(128);
1054         List<ElasticSearchResultItem> rejected = new ArrayList<>();
1055         for (BulkRequest request : requests) {
1056
1057             // Convert the request to the syntax ElasticSearch likes.
1058             if (buildEsOperation(request, esOperationSet, rejected)) {
1059                 opCount++;
1060             }
1061         }
1062
1063         ElasticSearchBulkOperationResult opResult = null;
1064         if (opCount > 0) {
1065
1066             // Open an HTTP connection to the ElasticSearch back end.
1067             String fullUrl = getFullUrl("/_bulk", false);
1068             URL url;
1069             HttpURLConnection conn;
1070             try {
1071
1072                 url = new URL(fullUrl);
1073                 conn = (HttpURLConnection) url.openConnection();
1074                 conn.setRequestMethod("PUT");
1075                 conn.setDoOutput(true);
1076                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
1077                 conn.setRequestProperty("Connection", "Close");
1078
1079             } catch (IOException e) {
1080
1081                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1082                 if (logger.isDebugEnabled()) {
1083                     logger.debug(Throwables.getStackTraceAsString(e));
1084                 }
1085
1086                 throw new DocumentStoreOperationException(
1087                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
1088             }
1089
1090             StringBuilder bulkResult = new StringBuilder(128);
1091             try {
1092                 // Create an output stream to write our request to.
1093                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1094
1095                 if (logger.isDebugEnabled()) {
1096                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1097                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1098                 }
1099
1100                 // Write the resulting request string to our output stream. (this sends the request to ES?)
1101                 out.write(esOperationSet.toString());
1102                 out.close();
1103
1104                 // Open an input stream on our connection in order to read back the results.
1105                 InputStream is = conn.getInputStream();
1106                 InputStreamReader inputstreamreader = new InputStreamReader(is);
1107                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1108
1109                 // Read the contents of the input stream into our result string...
1110                 String esResponseString = null;
1111
1112                 while ((esResponseString = bufferedreader.readLine()) != null) {
1113                     bulkResult.append(esResponseString).append("\n");
1114                 }
1115
1116             } catch (IOException e) {
1117
1118                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1119                 if (logger.isDebugEnabled()) {
1120                     StringWriter sw = new StringWriter();
1121                     e.printStackTrace(new PrintWriter(sw));
1122                     logger.debug(sw.toString());
1123                 }
1124
1125                 throw new DocumentStoreOperationException(
1126                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
1127             }
1128
1129             if (logger.isDebugEnabled()) {
1130                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1131             }
1132
1133             // ...and marshal the resulting string into a Java object.
1134             try {
1135                 opResult = marshallEsBulkResult(bulkResult.toString());
1136
1137             } catch (IOException e) {
1138
1139                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1140                 if (logger.isDebugEnabled()) {
1141                     logger.debug(Throwables.getStackTraceAsString(e));
1142                 }
1143
1144                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
1145                         e);
1146             }
1147         }
1148
1149         // Finally, build the operation result and return it to the caller.
1150         OperationResult result = new OperationResult();
1151         result.setResultCode(207);
1152         result.setResult(buildGenericBulkResultSet(opResult, rejected));
1153
1154         // In the success case we don't want the entire result string to be
1155         // dumped into the metrics log, so concatenate it.
1156         String resultStringForMetricsLog = result.getResult();
1157         if (isSuccess(result)) {
1158             resultStringForMetricsLog =
1159                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1160         }
1161
1162         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1163                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1164                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1165                 override);
1166
1167         return result;
1168     }
1169
1170
1171     /**
1172      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1173      *
1174      * @param request - The request to be performed.
1175      * @param sb - The string builder to append the json data to
1176      * @throws DocumentStoreOperationException
1177      */
1178     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1179             throws DocumentStoreOperationException {
1180
1181         boolean retVal = true;
1182         // What kind of operation are we performing?
1183         switch (request.getOperationType()) {
1184
1185             // Create a new document.
1186             case CREATE:
1187
1188                 // Make sure that we were supplied a document payload.
1189                 if (request.getOperation().getDocument() == null) {
1190
1191                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1192                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1193                     return false;
1194                 }
1195
1196                 // Make sure that the supplied document URL is formatted
1197                 // correctly.
1198                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1199                     fails.add(generateRejectionEntry(request.getOperationType(),
1200                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1201                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1202                     return false;
1203                 }
1204
1205                 // Validate that the specified index actually exists before we
1206                 // try to perform the create.
1207                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1208
1209                     fails.add(generateRejectionEntry(request.getOperationType(),
1210                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1211                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1212                     return false;
1213                 }
1214
1215                 // If we were supplied an id for the new document, then
1216                 // include it in the bulk operation to Elastic Search
1217                 if (request.getId() == null) {
1218
1219                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1220
1221                     // Otherwise, we just leave that parameter off and ElasticSearch
1222                     // will generate one for us.
1223                 } else {
1224                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1225                             request.getId()));
1226                 }
1227
1228                 try {
1229                     // Append the document that we want to create.
1230                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1231                 } catch (JsonProcessingException e) {
1232                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1233                 }
1234
1235                 break;
1236
1237             // Update an existing document.
1238             case UPDATE:
1239
1240                 // Make sure that we were supplied a document payload.
1241                 if (request.getOperation().getDocument() == null) {
1242
1243                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1244                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1245                     return false;
1246                 }
1247
1248                 // Make sure that the supplied document URL is formatted
1249                 // correctly.
1250                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1251                     fails.add(generateRejectionEntry(request.getOperationType(),
1252                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1253                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1254                     return false;
1255                 }
1256
1257                 // Validate that the specified index actually exists before we
1258                 // try to perform the update.
1259                 if (!indexExists(request.getIndex())) {
1260
1261                     fails.add(generateRejectionEntry(request.getOperationType(),
1262                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1263                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1264                     return false;
1265                 }
1266
1267                 // Validate that the document we are trying to update actually
1268                 // exists before we try to perform the update.
1269                 if (!documentExists(request.getIndex(), request.getId())) {
1270
1271                     fails.add(generateRejectionEntry(request.getOperationType(),
1272                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1273                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1274                     return false;
1275                 }
1276
1277                 // It is mandatory that a version be supplied for an update operation,
1278                 // so validate that now.
1279                 if (request.getOperation().getMetaData().getEtag() == null) {
1280
1281                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1282                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1283                     return false;
1284                 }
1285
1286                 // Generate the update request...
1287                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1288                         request.getOperation().getMetaData().getEtag()));
1289
1290                 // ...and append the document that we want to update.
1291                 try {
1292                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1293                 } catch (JsonProcessingException e) {
1294                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1295                 }
1296                 break;
1297
1298             // Delete an existing document.
1299             case DELETE:
1300
1301                 // Make sure that the supplied document URL is formatted
1302                 // correctly.
1303                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1304                     fails.add(generateRejectionEntry(request.getOperationType(),
1305                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1306                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1307                     return false;
1308                 }
1309
1310                 // Validate that the specified index actually exists before we
1311                 // try to perform the delete.
1312                 if (!indexExists(request.getIndex())) {
1313
1314                     fails.add(generateRejectionEntry(request.getOperationType(),
1315                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1316                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1317                     return false;
1318                 }
1319
1320                 // Validate that the document we are trying to update actually
1321                 // exists before we try to perform the delete.
1322                 if (!documentExists(request.getIndex(), request.getId())) {
1323
1324                     fails.add(generateRejectionEntry(request.getOperationType(),
1325                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1326                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1327                     return false;
1328                 }
1329
1330                 // It is mandatory that a version be supplied for a delete operation,
1331                 // so validate that now.
1332                 if (request.getOperation().getMetaData().getEtag() == null) {
1333
1334                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1335                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1336                     return false;
1337                 }
1338
1339                 // Generate the delete request.
1340                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1341                         request.getOperation().getMetaData().getEtag()));
1342                 break;
1343             default:
1344         }
1345
1346         return retVal;
1347     }
1348
1349     private boolean indexExists(String index) throws DocumentStoreOperationException {
1350         return isSuccess(checkIndexExistence(index));
1351     }
1352
1353     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1354         return isSuccess(checkDocumentExistence(index, id));
1355     }
1356
1357     /**
1358      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1359      * document store.
1360      *
1361      * @param rejectReason - A message describing why the operation was rejected.
1362      * @param anId - The identifier associated with the document being acted on.
1363      * @param statusCode - An HTTP status code.
1364      * @return - A result set item.
1365      */
1366     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1367             String anId, int statusCode, String originalUrl) {
1368
1369         ElasticSearchError err = new ElasticSearchError();
1370         err.setReason(rejectReason);
1371
1372         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1373         op.setIndex(index);
1374         op.setId(anId);
1375         op.setStatus(statusCode);
1376         op.setError(err);
1377         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1378
1379         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1380
1381         switch (opType) {
1382             case CREATE:
1383                 rejectionResult.setCreate(op);
1384                 break;
1385             case UPDATE:
1386                 rejectionResult.setIndex(op);
1387                 break;
1388             case DELETE:
1389                 rejectionResult.setDelete(op);
1390                 break;
1391             default:
1392         }
1393
1394         return rejectionResult;
1395     }
1396
1397     /**
1398      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1399      * marshals it into a Java object.
1400      *
1401      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1402      * @return - The marshalled response.
1403      * @throws JsonParseException
1404      * @throws JsonMappingException
1405      * @throws IOException
1406      */
1407     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1408         if (jsonResult != null) {
1409             if (logger.isDebugEnabled()) {
1410                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1411             }
1412
1413             ObjectMapper mapper = new ObjectMapper();
1414             mapper.setSerializationInclusion(Include.NON_EMPTY);
1415
1416             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1417         }
1418
1419         return null;
1420     }
1421
1422     /**
1423      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1424      *
1425      * @param esResult - ElasticSearch bulk operations response.
1426      * @return - A generic result set.
1427      */
1428     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1429             List<ElasticSearchResultItem> rejectedOps) {
1430         int totalOps = 0;
1431         int totalSuccess = 0;
1432         int totalFails = 0;
1433
1434         if (logger.isDebugEnabled()) {
1435
1436             logger.debug("ESController: Build generic result set.  ES Results: "
1437                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1438         }
1439
1440         // Build a combined list of result items from the results returned
1441         // from ElasticSearch and the list of operations that we rejected
1442         // without sending to ElasticSearch.
1443         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1444         if (esResult != null) {
1445             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1446         }
1447         combinedResults.addAll(rejectedOps);
1448
1449         // Iterate over the individual results in the resulting result set.
1450         StringBuilder resultsBuilder = new StringBuilder();
1451         AtomicBoolean firstItem = new AtomicBoolean(true);
1452         for (ElasticSearchResultItem item : combinedResults) {
1453
1454             // Increment the operation counts.
1455             totalOps++;
1456             if (isSuccessCode(item.operationStatus().getStatus())) {
1457                 totalSuccess++;
1458             } else {
1459                 totalFails++;
1460             }
1461
1462             // Prepend a comma to our response string unless this it the
1463             // first result in the set.
1464             if (!firstItem.compareAndSet(true, false)) {
1465                 resultsBuilder.append(", ");
1466             }
1467
1468             // Append the current result as a generic json structure.
1469             resultsBuilder.append(item.toJson());
1470         }
1471
1472         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1473                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1474     }
1475
1476
1477     /**
1478      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1479      *
1480      * @param indexName - The index to look for.
1481      * @return - An operation result indicating the success or failure of the check.
1482      * @throws DocumentStoreOperationException
1483      */
1484     public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1485
1486         // Initialize operation result with a failure codes / fault string
1487         OperationResult opResult = new OperationResult();
1488         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
1489
1490         // Grab the current time so we can use it to generate a metrics log.
1491         MdcOverride override = getStartTime(new MdcOverride());
1492
1493         String fullUrl = getFullUrl("/" + indexName, false);
1494         HttpURLConnection conn = initializeConnection(fullUrl);
1495
1496         try {
1497             conn.setRequestMethod("HEAD");
1498
1499         } catch (ProtocolException e) {
1500             shutdownConnection(conn);
1501             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1502         }
1503
1504         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1505
1506         int resultCode;
1507         try {
1508             resultCode = conn.getResponseCode();
1509         } catch (IOException e) {
1510             shutdownConnection(conn);
1511             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
1512         }
1513         logger.debug(MSG_RESPONSE_CODE + resultCode);
1514
1515         opResult.setResultCode(resultCode);
1516
1517         // Generate a metrics log so we can track how long the operation took.
1518         metricsLogger
1519                 .info(SearchDbMsgs.CHECK_INDEX_TIME,
1520                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1521                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1522                         override, indexName);
1523
1524         shutdownConnection(conn);
1525
1526         return opResult;
1527     }
1528
1529
1530     private void buildDocumentResult(DocumentOperationResult result, String index)
1531             throws DocumentStoreOperationException {
1532
1533         JSONParser parser = new JSONParser();
1534         JSONObject root;
1535         try {
1536             root = (JSONObject) parser.parse(result.getResult());
1537             if (isSuccess(result)) {
1538                 // Success response object
1539                 Document doc = new Document();
1540                 doc.setEtag(result.getResultVersion());
1541                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1542
1543                 doc.setContent((JSONObject) root.get("_source"));
1544                 result.setDocument(doc);
1545
1546             } else {
1547                 // Error response object
1548                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1549                 if (error != null) {
1550                     result.setError(
1551                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1552                 }
1553
1554             }
1555         } catch (Exception e) {
1556             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1557         }
1558     }
1559
1560     private String buildDocumentResponseUrl(String index, String id) {
1561         return ApiUtils.buildDocumentUri(index, id);
1562     }
1563
1564     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1565
1566         JSONParser parser = new JSONParser();
1567         JSONObject root;
1568
1569         try {
1570             root = (JSONObject) parser.parse(result.getResult());
1571             if (isSuccess(result)) {
1572                 JSONObject hits = (JSONObject) root.get("hits");
1573                 JSONArray hitArray = (JSONArray) hits.get("hits");
1574                 SearchHits searchHits = new SearchHits();
1575                 searchHits.setTotalHits(hits.get("total").toString());
1576                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1577
1578                 for (int i = 0; i < hitArray.size(); i++) {
1579                     JSONObject hit = (JSONObject) hitArray.get(i);
1580                     SearchHit searchHit = new SearchHit();
1581                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1582                     Document doc = new Document();
1583                     if (hit.get(JSON_ATTR_VERSION) != null) {
1584                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1585                     }
1586
1587                     doc.setUrl(
1588                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1589                     doc.setContent((JSONObject) hit.get("_source"));
1590                     searchHit.setDocument(doc);
1591                     searchHitArray.add(searchHit);
1592                 }
1593                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1594                 result.setSearchResult(searchHits);
1595
1596                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1597                 if (aggregations != null) {
1598                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1599                     AggregationResults aggs = new AggregationResults();
1600                     aggs.setAggregations(aggResults);
1601                     result.setAggregationResult(aggs);
1602                 }
1603
1604                 // success
1605             } else {
1606                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1607                 if (error != null) {
1608                     result.setError(
1609                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1610                 }
1611             }
1612         } catch (Exception e) {
1613             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1614         }
1615
1616     }
1617
1618     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1619         JSONParser parser = new JSONParser();
1620         JSONObject root;
1621         try {
1622             root = (JSONObject) parser.parse(result.getResult());
1623             if (isSuccess(result)) {
1624                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1625                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1626                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1627                 SuggestHits suggestHits = new SuggestHits();
1628                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1629
1630                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1631
1632                 for (int i = 0; i < optionsArray.size(); i++) {
1633                     JSONObject hit = (JSONObject) optionsArray.get(i);
1634
1635                     SuggestHit suggestHit = new SuggestHit();
1636                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1637                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1638                     Document doc = new Document();
1639                     if (hit.get(JSON_ATTR_VERSION) != null) {
1640                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1641                     }
1642                     doc.setUrl(
1643                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1644
1645                     doc.setContent((JSONObject) hit.get("payload"));
1646                     suggestHit.setDocument(doc);
1647                     suggestHitArray.add(suggestHit);
1648                 }
1649                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1650                 result.setSuggestResult(suggestHits);
1651
1652                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1653                 if (aggregations != null) {
1654                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1655                     AggregationResults aggs = new AggregationResults();
1656                     aggs.setAggregations(aggResults);
1657                     result.setAggregationResult(aggs);
1658                 }
1659
1660                 // success
1661             } else {
1662                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1663                 if (error != null) {
1664                     result.setError(
1665                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1666                 }
1667             }
1668         } catch (Exception e) {
1669             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1670         }
1671     }
1672 }