Reduce code duplication in the HTTP Controller
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.json.simple.JSONArray;
57 import org.json.simple.JSONObject;
58 import org.json.simple.parser.JSONParser;
59 import org.json.simple.parser.ParseException;
60 import org.onap.aai.cl.api.LogFields;
61 import org.onap.aai.cl.api.LogLine;
62 import org.onap.aai.cl.api.Logger;
63 import org.onap.aai.cl.eelf.LoggerFactory;
64 import org.onap.aai.cl.mdc.MdcContext;
65 import org.onap.aai.cl.mdc.MdcOverride;
66 import org.onap.aai.sa.rest.AnalysisConfiguration;
67 import org.onap.aai.sa.rest.ApiUtils;
68 import org.onap.aai.sa.rest.BulkRequest;
69 import org.onap.aai.sa.rest.BulkRequest.OperationType;
70 import org.onap.aai.sa.rest.DocumentSchema;
71 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
73 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
75 import org.onap.aai.sa.searchdbabstraction.entity.Document;
76 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
77 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
80 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
81 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
84 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
85 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
86 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
87 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
88 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
89
90 /**
91  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
92  */
93 public class ElasticSearchHttpController implements DocumentStoreInterface {
94
95     private static ElasticSearchHttpController instance = null;
96
97     private static final Logger logger =
98             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
99     private static final Logger metricsLogger =
100             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
101
102     private static final String JSON_ATTR_VERSION = "_version";
103     private static final String JSON_ATTR_ERROR = "error";
104     private static final String JSON_ATTR_REASON = "reason";
105
106     private static final String DEFAULT_TYPE = "default";
107     private static final String QUERY_PARAM_VERSION = "?version=";
108
109     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
110     private static final String MSG_RESPONSE_CODE = "Response Code : ";
111     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
112     private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
113     private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
114     private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
115             "Internal Error: ElasticSearch operation fault occurred";
116     private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
117             "Failed to get the response code from the connection.";
118     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
119
120     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
121             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
122     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
123             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
124     private static final String BULK_IMPORT_INDEX_TEMPLATE =
125             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
126     private static final String BULK_DELETE_TEMPLATE =
127             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
128
129     private final ElasticSearchConfig config;
130
131     protected AnalysisConfiguration analysisConfig;
132
133
134     public ElasticSearchHttpController(ElasticSearchConfig config) {
135         this.config = config;
136         analysisConfig = new AnalysisConfiguration();
137
138         try {
139             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
140             checkConnection();
141             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
142         } catch (Exception e) {
143             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
144                     e.getMessage());
145         }
146     }
147
148     public static ElasticSearchHttpController getInstance() {
149         synchronized (ElasticSearchHttpController.class) {
150             if (instance == null) {
151                 Properties properties = new Properties();
152                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
153                 try {
154                     properties.load(new FileInputStream(file));
155                 } catch (Exception e) {
156                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
157                             e.getLocalizedMessage());
158                 }
159
160                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
161                 instance = new ElasticSearchHttpController(config);
162             }
163         }
164
165         return instance;
166     }
167
168     public AnalysisConfiguration getAnalysisConfig() {
169         return analysisConfig;
170     }
171
172     @Override
173     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
174         OperationResult result = new OperationResult();
175         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
176
177         try {
178             // Submit the request to ElasticSearch to create the index using a
179             // default document type.
180             result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
181                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
182
183             // ElasticSearch will return us a 200 code on success when we
184             // want to report a 201, so translate the result here.
185             if (result.getResultCode() == Status.OK.getStatusCode()) {
186                 result.setResultCode(Status.CREATED.getStatusCode());
187             }
188
189             if (isSuccess(result)) {
190                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
191             }
192         } catch (DocumentStoreOperationException | IOException e) {
193             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
194         }
195
196         return result;
197     }
198
199     @Override
200     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
201         OperationResult result = new OperationResult();
202         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
203
204         try {
205             result = createTable(index, dynamicSchema);
206
207             // ElasticSearch will return us a 200 code on success when we
208             // want to report a 201, so translate the result here.
209             if (result.getResultCode() == Status.OK.getStatusCode()) {
210                 result.setResultCode(Status.CREATED.getStatusCode());
211             }
212             if (isSuccess(result)) {
213                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
214             }
215         } catch (DocumentStoreOperationException e) {
216             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
217         }
218
219         return result;
220     }
221
222     @Override
223     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
224
225         // Initialize operation result with a failure codes / fault string
226         OperationResult opResult = new OperationResult();
227         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
228         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
229
230         // Grab the current time so we can use it to generate a metrics log.
231         MdcOverride override = getStartTime(new MdcOverride());
232
233         String fullUrl = getFullUrl("/" + indexName + "/", false);
234         HttpURLConnection conn = initializeConnection(fullUrl);
235
236         logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
237
238         try {
239             conn.setRequestMethod("DELETE");
240         } catch (ProtocolException e) {
241             shutdownConnection(conn);
242             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
243         }
244
245         handleResponse(conn, opResult);
246
247         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
248
249         shutdownConnection(conn);
250
251         return opResult;
252     }
253
254     private OperationResult checkConnection() throws IOException {
255         String fullUrl = getFullUrl("/_cluster/health", false);
256         URL url = null;
257         HttpURLConnection conn = null;
258
259         url = new URL(fullUrl);
260         conn = (HttpURLConnection) url.openConnection();
261         conn.setRequestMethod("GET");
262         conn.setDoOutput(true);
263         logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
264
265         int resultCode = conn.getResponseCode();
266         logger.debug("getClusterHealth() response Code : " + resultCode);
267         OperationResult opResult = new OperationResult();
268         opResult.setResultCode(resultCode);
269
270         shutdownConnection(conn);
271
272         return opResult;
273     }
274
275     private String getFullUrl(String resourceUrl, boolean isSecure) {
276
277         final String host = config.getIpAddress();
278         final String port = config.getHttpPort();
279
280         if (isSecure) {
281             return String.format("https://%s:%s%s", host, port, resourceUrl);
282         } else {
283             return String.format("http://%s:%s%s", host, port, resourceUrl);
284         }
285     }
286
287     private void shutdownConnection(HttpURLConnection connection) {
288         if (connection == null) {
289             return;
290         }
291
292         final String methodName = "shutdownConnection";
293         InputStream inputstream = null;
294         OutputStream outputstream = null;
295
296         try {
297             inputstream = connection.getInputStream();
298         } catch (IOException e) {
299             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
300         } finally {
301             if (inputstream != null) {
302                 try {
303                     inputstream.close();
304                 } catch (IOException e) {
305                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
306                 }
307             }
308         }
309
310         try {
311             outputstream = connection.getOutputStream();
312         } catch (IOException e) {
313             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
314         } finally {
315             if (outputstream != null) {
316                 try {
317                     outputstream.close();
318                 } catch (IOException e) {
319                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
320                 }
321             }
322         }
323
324         connection.disconnect();
325     }
326
327     // @Override
328     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
329             throws DocumentStoreOperationException {
330
331         if (indexSettings == null) {
332             logger.debug("No settings provided.");
333         }
334
335         if (indexMappings == null) {
336             logger.debug("No mappings provided.");
337         }
338
339         OperationResult opResult = new OperationResult();
340
341         // Initialize operation result with a failure codes / fault string
342         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
343         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
344
345         // Grab the current time so we can use it to generate a metrics log.
346         MdcOverride override = getStartTime(new MdcOverride());
347
348         String fullUrl = getFullUrl("/" + indexName + "/", false);
349         HttpURLConnection conn = initializeConnection(fullUrl);
350
351         try {
352             conn.setRequestMethod("PUT");
353         } catch (ProtocolException e) {
354             shutdownConnection(conn);
355             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
356         }
357
358         StringBuilder sb = new StringBuilder(128);
359         sb.append("{ \"settings\" : ");
360         sb.append(indexSettings);
361         sb.append(",");
362
363         sb.append("\"mappings\" : {");
364         sb.append("\"" + typeName + "\" :");
365         sb.append(indexMappings);
366         sb.append("}}");
367
368         try {
369             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
370         } catch (IOException e) {
371             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
372             throw new DocumentStoreOperationException(e.getMessage(), e);
373         }
374
375         logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
376         logger.debug("Request content: " + sb.toString());
377
378         handleResponse(conn, opResult);
379
380         shutdownConnection(conn);
381
382         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
383
384         return opResult;
385     }
386
387     /**
388      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
389      * index.
390      *
391      * @param indexName - The name of the index to be created
392      * @param settingsAndMappings - The actual JSON object that will define the index
393      * @return - The operation result of writing into Elasticsearch
394      * @throws DocumentStoreOperationException
395      */
396     protected OperationResult createTable(String indexName, String settingsAndMappings)
397             throws DocumentStoreOperationException {
398         OperationResult result = new OperationResult();
399         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
400         result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
401
402         // Grab the current time so we can use it to generate a metrics log.
403         MdcOverride override = getStartTime(new MdcOverride());
404
405         String fullUrl = getFullUrl("/" + indexName + "/", false);
406         HttpURLConnection conn = initializeConnection(fullUrl);
407
408         try {
409             conn.setRequestMethod("PUT");
410         } catch (ProtocolException e) {
411             shutdownConnection(conn);
412             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
413         }
414
415         try {
416             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
417         } catch (IOException e) {
418             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
419             throw new DocumentStoreOperationException(e.getMessage());
420         }
421         handleResponse(conn, result);
422
423         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
424
425         return result;
426     }
427
428     @Override
429     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
430             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
431
432         if (!allowImplicitIndexCreation) {
433             // Before we do anything, make sure that the specified index actually exists in the
434             // document store - we don't want to rely on ElasticSearch to fail the document
435             // create because it could be configured to implicitly create a non-existent index,
436             // which can lead to hard-to-debug behaviour with queries down the road.
437             OperationResult indexExistsResult = checkIndexExistence(indexName);
438             if (!isSuccess(indexExistsResult)) {
439                 DocumentOperationResult opResult = new DocumentOperationResult();
440                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
441                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
442                 opResult.setResult(resultMsg);
443                 opResult.setFailureCause(resultMsg);
444                 return opResult;
445             }
446         }
447
448         if (document.getId() == null || document.getId().isEmpty()) {
449             return createDocumentWithoutId(indexName, document);
450         } else {
451             return createDocumentWithId(indexName, document);
452         }
453     }
454
455     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
456             throws DocumentStoreOperationException {
457         // check if the document already exists
458         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
459
460         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
461             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
462                 opResult.setFailureCause("A document with the same id already exists.");
463             } else {
464                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
465             }
466             opResult.setResultCode(Status.CONFLICT.getStatusCode());
467             return opResult;
468         }
469
470
471         // Grab the current time so we can use it to generate a metrics log.
472         MdcOverride override = getStartTime(new MdcOverride());
473
474         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
475         HttpURLConnection conn = initializeConnection(fullUrl);
476
477         try {
478             conn.setRequestMethod("PUT");
479         } catch (ProtocolException e) {
480             shutdownConnection(conn);
481             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
482         }
483
484         attachDocument(conn, document);
485
486         logger.debug("Sending 'PUT' request to: " + conn.getURL());
487
488         opResult = getOperationResult(conn);
489         buildDocumentResult(opResult, indexName);
490
491         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
492
493         shutdownConnection(conn);
494
495         return opResult;
496     }
497
498     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
499             throws DocumentStoreOperationException {
500         // Grab the current time so we can use it to generate a metrics log.
501         MdcOverride override = getStartTime(new MdcOverride());
502
503         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
504         HttpURLConnection conn = initializeConnection(fullUrl);
505
506         try {
507             conn.setRequestMethod("POST");
508         } catch (ProtocolException e) {
509             shutdownConnection(conn);
510             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
511         }
512
513         attachDocument(conn, document);
514
515         logger.debug("Sending 'POST' request to: " + conn.getURL());
516
517         DocumentOperationResult response = getOperationResult(conn);
518         buildDocumentResult(response, indexName);
519
520         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
521
522         shutdownConnection(conn);
523
524         return response;
525     }
526
527     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
528             throws DocumentStoreOperationException {
529         conn.setRequestProperty("Connection", "Close");
530         attachContent(conn, doc.getContentInJson());
531     }
532
533     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
534             throws DocumentStoreOperationException {
535         // Grab the current time so we can use it to generate a metrics log.
536         MdcOverride override = getStartTime(new MdcOverride());
537
538         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
539         HttpURLConnection conn = initializeConnection(fullUrl);
540
541         try {
542             conn.setRequestMethod("HEAD");
543         } catch (ProtocolException e) {
544             shutdownConnection(conn);
545             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
546         }
547
548         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
549
550         int resultCode;
551         try {
552             resultCode = conn.getResponseCode();
553         } catch (IOException e) {
554             shutdownConnection(conn);
555             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
556         }
557
558         logger.debug(MSG_RESPONSE_CODE + resultCode);
559
560         DocumentOperationResult opResult = createDefaultOperationResult();
561         opResult.setResultCode(resultCode);
562
563         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
564
565         shutdownConnection(conn);
566
567         return opResult;
568     }
569
570     @Override
571     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
572             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
573
574         if (!allowImplicitIndexCreation) {
575             // Before we do anything, make sure that the specified index actually exists in the
576             // document store - we don't want to rely on ElasticSearch to fail the document
577             // create because it could be configured to implicitly create a non-existent index,
578             // which can lead to hard-to-debug behaviour with queries down the road.
579             OperationResult indexExistsResult = checkIndexExistence(indexName);
580             if (!isSuccess(indexExistsResult)) {
581                 DocumentOperationResult opResult = new DocumentOperationResult();
582                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
583                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
584                 opResult.setResult(resultMsg);
585                 opResult.setFailureCause(resultMsg);
586                 return opResult;
587             }
588         }
589
590         // Grab the current time so we can use it to generate a metrics log.
591         MdcOverride override = getStartTime(new MdcOverride());
592
593         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
594                 + document.getVersion(), false);
595         HttpURLConnection conn = initializeConnection(fullUrl);
596
597         try {
598             conn.setRequestMethod("PUT");
599         } catch (ProtocolException e) {
600             shutdownConnection(conn);
601             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
602         }
603
604         attachDocument(conn, document);
605
606         logger.debug("Sending 'PUT' request to: " + conn.getURL());
607
608         DocumentOperationResult opResult = getOperationResult(conn);
609         buildDocumentResult(opResult, indexName);
610
611         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
612
613         shutdownConnection(conn);
614
615         return opResult;
616     }
617
618     @Override
619     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
620             throws DocumentStoreOperationException {
621         // Grab the current time so we can use it to generate a metrics log.
622         MdcOverride override = getStartTime(new MdcOverride());
623
624         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
625                 + document.getVersion(), false);
626         HttpURLConnection conn = initializeConnection(fullUrl);
627
628         try {
629             conn.setRequestMethod("DELETE");
630         } catch (ProtocolException e) {
631             shutdownConnection(conn);
632             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
633         }
634
635         logger.debug("\nSending 'DELETE' request to " + conn.getURL());
636
637         DocumentOperationResult opResult = getOperationResult(conn);
638         buildDocumentResult(opResult, indexName);
639         // supress the etag and url in response for delete as they are not required
640         if (opResult.getDocument() != null) {
641             opResult.getDocument().setEtag(null);
642             opResult.getDocument().setUrl(null);
643         }
644
645         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
646
647         shutdownConnection(conn);
648
649         return opResult;
650     }
651
652     @Override
653     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
654             throws DocumentStoreOperationException {
655         // Grab the current time so we can use it to generate a metrics log.
656         MdcOverride override = getStartTime(new MdcOverride());
657
658         String fullUrl = null;
659         if (document.getVersion() == null) {
660             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
661         } else {
662             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
663                     + document.getVersion(), false);
664         }
665         HttpURLConnection conn = initializeConnection(fullUrl);
666
667         logger.debug("\nSending 'GET' request to: " + conn.getURL());
668
669         DocumentOperationResult opResult = getOperationResult(conn);
670         buildDocumentResult(opResult, indexName);
671
672         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
673
674         shutdownConnection(conn);
675
676         return opResult;
677     }
678
679     @Override
680     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
681
682         String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
683
684         // Grab the current time so we can use it to generate a metrics log.
685         MdcOverride override = getStartTime(new MdcOverride());
686
687         HttpURLConnection conn = initializeConnection(fullUrl);
688
689         try {
690             conn.setRequestMethod("GET");
691         } catch (ProtocolException e) {
692             shutdownConnection(conn);
693             throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
694         }
695
696         logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
697
698         SearchOperationResult opResult = getSearchOperationResult(conn);
699         buildSearchResult(opResult, indexName);
700
701         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
702
703         return opResult;
704     }
705
706     @Override
707     public SearchOperationResult searchWithPayload(String indexName, String query)
708             throws DocumentStoreOperationException {
709         if (logger.isDebugEnabled()) {
710             logger.debug("Querying index: " + indexName + " with query string: " + query);
711         }
712
713         String fullUrl = getFullUrl("/" + indexName + "/_search", false);
714
715         // Grab the current time so we can use it to generate a metrics log.
716         MdcOverride override = getStartTime(new MdcOverride());
717
718         HttpURLConnection conn = initializeConnection(fullUrl);
719
720         try {
721             conn.setRequestMethod("POST");
722         } catch (ProtocolException e) {
723             shutdownConnection(conn);
724             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
725         }
726
727         attachContent(conn, query);
728
729         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
730         logger.debug("Request body =  Elasticsearch query = " + query);
731
732         SearchOperationResult opResult = getSearchOperationResult(conn);
733
734         buildSearchResult(opResult, indexName);
735
736         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
737
738         shutdownConnection(conn);
739
740         return opResult;
741     }
742
743
744     @Override
745     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
746             throws DocumentStoreOperationException {
747         if (logger.isDebugEnabled()) {
748             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
749         }
750
751         String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
752
753         // Grab the current time so we can use it to generate a metrics log.
754         MdcOverride override = getStartTime(new MdcOverride());
755
756         HttpURLConnection conn = initializeConnection(fullUrl);
757
758         try {
759             conn.setRequestMethod("POST");
760         } catch (ProtocolException e) {
761             shutdownConnection(conn);
762             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
763         }
764
765         attachContent(conn, query);
766
767         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
768         logger.debug("Request body =  Elasticsearch query = " + query);
769
770         SearchOperationResult opResult = getSearchOperationResult(conn);
771         buildSuggestResult(opResult, indexName);
772
773         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
774
775         shutdownConnection(conn);
776
777         return opResult;
778     }
779
780     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
781         OutputStream outputStream = null;
782         OutputStreamWriter out = null;
783
784         try {
785             outputStream = conn.getOutputStream();
786         } catch (IOException e) {
787             shutdownConnection(conn);
788             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
789         }
790
791         out = new OutputStreamWriter(outputStream);
792
793         try {
794             out.write(content);
795             out.close();
796         } catch (IOException e) {
797             shutdownConnection(conn);
798             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
799         }
800     }
801
802     private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
803         URL url = null;
804         HttpURLConnection conn = null;
805
806         try {
807             url = new URL(fullUrl);
808         } catch (MalformedURLException e) {
809             throw new DocumentStoreOperationException("Error building a URL with " + url, e);
810         }
811
812         try {
813             conn = (HttpURLConnection) url.openConnection();
814             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
815             conn.setDoOutput(true);
816         } catch (IOException e) {
817             shutdownConnection(conn);
818             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
819         }
820
821         return conn;
822     }
823
824     private void handleResponse(HttpURLConnection conn, OperationResult opResult)
825             throws DocumentStoreOperationException {
826         int resultCode;
827
828         try {
829             resultCode = conn.getResponseCode();
830         } catch (IOException e) {
831             shutdownConnection(conn);
832             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
833         }
834
835         logger.debug(MSG_RESPONSE_CODE + resultCode);
836
837         InputStream inputStream = null;
838
839         if (!isSuccessCode(resultCode)) {
840             inputStream = conn.getErrorStream();
841         } else {
842             try {
843                 inputStream = conn.getInputStream();
844             } catch (IOException e) {
845                 shutdownConnection(conn);
846                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
847             }
848         }
849
850         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
851         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
852
853         StringBuilder result = new StringBuilder(128);
854         String string = null;
855
856         try {
857             while ((string = bufferedreader.readLine()) != null) {
858                 result.append(string).append("\n");
859             }
860         } catch (IOException e) {
861             shutdownConnection(conn);
862             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
863         }
864
865         if (resultCode == Status.CONFLICT.getStatusCode()) {
866             opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
867         } else {
868             opResult.setResultCode(resultCode);
869         }
870         if (logger.isDebugEnabled()) {
871             logger.debug("Raw result string from ElasticSearch = " + result.toString());
872         }
873         opResult.setResult(result.toString());
874         opResult.setResultVersion(extractVersion(result.toString()));
875     }
876
877     private String extractVersion(String result) {
878         JSONParser parser = new JSONParser();
879         String version = null;
880         try {
881             JSONObject root = (JSONObject) parser.parse(result);
882             if (root.get(JSON_ATTR_VERSION) != null) {
883                 version = root.get(JSON_ATTR_VERSION).toString();
884             }
885         } catch (ParseException e) {
886             // Not all responses from ElasticSearch include a version, so
887             // if we don't get one back, just return an empty string rather
888             // than trigger a false failure.
889             version = "";
890         }
891         return version;
892     }
893
894     /**
895      * This convenience method gets the current system time and stores it in an attribute in the supplied
896      * {@link MdcOverride} object so that it can be used later by the metrics logger.
897      *
898      * @param override - The {@link MdcOverride} object to update.
899      * @return - The supplied {@link MdcOverride} object.
900      */
901     private MdcOverride getStartTime(MdcOverride override) {
902
903         // Grab the current time...
904         long startTimeInMs = System.currentTimeMillis();
905
906         // ...and add it as an attribute to the supplied MDC Override
907         // object.
908         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
909         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
910
911         // Return the MdcOverride object that we were passed.
912         // This looks odd, but it allows us to do stuff like:
913         //
914         // MdcOverride ov = getStartTime(new MdcOverride())
915         //
916         // which is quite handy, but also allows us to pass in an existing
917         // MdcOverride object which already has some attributes set.
918         return override;
919     }
920
921     private boolean isSuccess(OperationResult result) {
922         return isSuccessCode(result.getResultCode());
923     }
924
925     private boolean isSuccessCode(int statusCode) {
926         return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
927     }
928
929     @Override
930     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
931         if (logger.isDebugEnabled()) {
932             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
933
934             for (BulkRequest request : requests) {
935                 dbgString.append("[").append(request).append("] ");
936             }
937
938             logger.debug(dbgString.toString());
939         }
940
941         // Grab the current time so we can use it to generate a metrics log.
942         MdcOverride override = getStartTime(new MdcOverride());
943
944         // Parse the supplied set of operations.
945         // Iterate over the list of operations which we were provided and
946         // translate them into a format that ElasticSearh understands.
947         int opCount = 0;
948         StringBuilder esOperationSet = new StringBuilder(128);
949         List<ElasticSearchResultItem> rejected = new ArrayList<>();
950         for (BulkRequest request : requests) {
951
952             // Convert the request to the syntax ElasticSearch likes.
953             if (buildEsOperation(request, esOperationSet, rejected)) {
954                 opCount++;
955             }
956         }
957
958         ElasticSearchBulkOperationResult opResult = null;
959         if (opCount > 0) {
960
961             // Open an HTTP connection to the ElasticSearch back end.
962             String fullUrl = getFullUrl("/_bulk", false);
963             URL url;
964             HttpURLConnection conn;
965             try {
966
967                 url = new URL(fullUrl);
968                 conn = (HttpURLConnection) url.openConnection();
969                 conn.setRequestMethod("PUT");
970                 conn.setDoOutput(true);
971                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
972                 conn.setRequestProperty("Connection", "Close");
973
974             } catch (IOException e) {
975
976                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
977                 if (logger.isDebugEnabled()) {
978                     logger.debug(Throwables.getStackTraceAsString(e));
979                 }
980
981                 throw new DocumentStoreOperationException(
982                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
983             }
984
985             StringBuilder bulkResult = new StringBuilder(128);
986             try {
987                 // Create an output stream to write our request to.
988                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
989
990                 if (logger.isDebugEnabled()) {
991                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
992                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
993                 }
994
995                 // Write the resulting request string to our output stream. (this sends the request to ES?)
996                 out.write(esOperationSet.toString());
997                 out.close();
998
999                 // Open an input stream on our connection in order to read back the results.
1000                 InputStream is = conn.getInputStream();
1001                 InputStreamReader inputstreamreader = new InputStreamReader(is);
1002                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1003
1004                 // Read the contents of the input stream into our result string...
1005                 String esResponseString = null;
1006
1007                 while ((esResponseString = bufferedreader.readLine()) != null) {
1008                     bulkResult.append(esResponseString).append("\n");
1009                 }
1010
1011             } catch (IOException e) {
1012
1013                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1014                 if (logger.isDebugEnabled()) {
1015                     StringWriter sw = new StringWriter();
1016                     e.printStackTrace(new PrintWriter(sw));
1017                     logger.debug(sw.toString());
1018                 }
1019
1020                 throw new DocumentStoreOperationException(
1021                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
1022             }
1023
1024             if (logger.isDebugEnabled()) {
1025                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1026             }
1027
1028             // ...and marshal the resulting string into a Java object.
1029             try {
1030                 opResult = marshallEsBulkResult(bulkResult.toString());
1031
1032             } catch (IOException e) {
1033
1034                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1035                 if (logger.isDebugEnabled()) {
1036                     logger.debug(Throwables.getStackTraceAsString(e));
1037                 }
1038
1039                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
1040                         e);
1041             }
1042         }
1043
1044         // Finally, build the operation result and return it to the caller.
1045         OperationResult result = new OperationResult();
1046         result.setResultCode(207);
1047         result.setResult(buildGenericBulkResultSet(opResult, rejected));
1048
1049         // In the success case we don't want the entire result string to be
1050         // dumped into the metrics log, so concatenate it.
1051         String resultStringForMetricsLog = result.getResult();
1052         if (isSuccess(result)) {
1053             resultStringForMetricsLog =
1054                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1055         }
1056
1057         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1058                 new LogFields() //
1059                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1060                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1061                 override);
1062
1063         return result;
1064     }
1065
1066     /**
1067      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1068      *
1069      * @param request - The request to be performed.
1070      * @param sb - The string builder to append the json data to
1071      * @throws DocumentStoreOperationException
1072      */
1073     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1074             throws DocumentStoreOperationException {
1075
1076         boolean retVal = true;
1077         // What kind of operation are we performing?
1078         switch (request.getOperationType()) {
1079
1080             // Create a new document.
1081             case CREATE:
1082
1083                 // Make sure that we were supplied a document payload.
1084                 if (request.getOperation().getDocument() == null) {
1085
1086                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1087                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1088                     return false;
1089                 }
1090
1091                 // Make sure that the supplied document URL is formatted
1092                 // correctly.
1093                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1094                     fails.add(generateRejectionEntry(request.getOperationType(),
1095                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1096                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1097                     return false;
1098                 }
1099
1100                 // Validate that the specified index actually exists before we
1101                 // try to perform the create.
1102                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1103
1104                     fails.add(generateRejectionEntry(request.getOperationType(),
1105                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1106                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1107                     return false;
1108                 }
1109
1110                 // If we were supplied an id for the new document, then
1111                 // include it in the bulk operation to Elastic Search
1112                 if (request.getId() == null) {
1113
1114                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1115
1116                     // Otherwise, we just leave that parameter off and ElasticSearch
1117                     // will generate one for us.
1118                 } else {
1119                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1120                             request.getId()));
1121                 }
1122
1123                 try {
1124                     // Append the document that we want to create.
1125                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1126                 } catch (JsonProcessingException e) {
1127                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1128                 }
1129
1130                 break;
1131
1132             // Update an existing document.
1133             case UPDATE:
1134
1135                 // Make sure that we were supplied a document payload.
1136                 if (request.getOperation().getDocument() == null) {
1137
1138                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1139                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1140                     return false;
1141                 }
1142
1143                 // Make sure that the supplied document URL is formatted
1144                 // correctly.
1145                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1146                     fails.add(generateRejectionEntry(request.getOperationType(),
1147                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1148                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1149                     return false;
1150                 }
1151
1152                 // Validate that the specified index actually exists before we
1153                 // try to perform the update.
1154                 if (!indexExists(request.getIndex())) {
1155
1156                     fails.add(generateRejectionEntry(request.getOperationType(),
1157                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1158                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1159                     return false;
1160                 }
1161
1162                 // Validate that the document we are trying to update actually
1163                 // exists before we try to perform the update.
1164                 if (!documentExists(request.getIndex(), request.getId())) {
1165
1166                     fails.add(generateRejectionEntry(request.getOperationType(),
1167                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1168                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1169                     return false;
1170                 }
1171
1172                 // It is mandatory that a version be supplied for an update operation,
1173                 // so validate that now.
1174                 if (request.getOperation().getMetaData().getEtag() == null) {
1175
1176                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1177                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1178                     return false;
1179                 }
1180
1181                 // Generate the update request...
1182                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1183                         request.getOperation().getMetaData().getEtag()));
1184
1185                 // ...and append the document that we want to update.
1186                 try {
1187                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1188                 } catch (JsonProcessingException e) {
1189                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1190                 }
1191                 break;
1192
1193             // Delete an existing document.
1194             case DELETE:
1195
1196                 // Make sure that the supplied document URL is formatted
1197                 // correctly.
1198                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1199                     fails.add(generateRejectionEntry(request.getOperationType(),
1200                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1201                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1202                     return false;
1203                 }
1204
1205                 // Validate that the specified index actually exists before we
1206                 // try to perform the delete.
1207                 if (!indexExists(request.getIndex())) {
1208
1209                     fails.add(generateRejectionEntry(request.getOperationType(),
1210                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1211                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1212                     return false;
1213                 }
1214
1215                 // Validate that the document we are trying to update actually
1216                 // exists before we try to perform the delete.
1217                 if (!documentExists(request.getIndex(), request.getId())) {
1218
1219                     fails.add(generateRejectionEntry(request.getOperationType(),
1220                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1221                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1222                     return false;
1223                 }
1224
1225                 // It is mandatory that a version be supplied for a delete operation,
1226                 // so validate that now.
1227                 if (request.getOperation().getMetaData().getEtag() == null) {
1228
1229                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1230                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1231                     return false;
1232                 }
1233
1234                 // Generate the delete request.
1235                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1236                         request.getOperation().getMetaData().getEtag()));
1237                 break;
1238             default:
1239         }
1240
1241         return retVal;
1242     }
1243
1244     private boolean indexExists(String index) throws DocumentStoreOperationException {
1245         return isSuccess(checkIndexExistence(index));
1246     }
1247
1248     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1249         return isSuccess(checkDocumentExistence(index, id));
1250     }
1251
1252     /**
1253      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1254      * document store.
1255      *
1256      * @param rejectReason - A message describing why the operation was rejected.
1257      * @param anId - The identifier associated with the document being acted on.
1258      * @param statusCode - An HTTP status code.
1259      * @return - A result set item.
1260      */
1261     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1262             String anId, int statusCode, String originalUrl) {
1263
1264         ElasticSearchError err = new ElasticSearchError();
1265         err.setReason(rejectReason);
1266
1267         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1268         op.setIndex(index);
1269         op.setId(anId);
1270         op.setStatus(statusCode);
1271         op.setError(err);
1272         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1273
1274         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1275
1276         switch (opType) {
1277             case CREATE:
1278                 rejectionResult.setCreate(op);
1279                 break;
1280             case UPDATE:
1281                 rejectionResult.setIndex(op);
1282                 break;
1283             case DELETE:
1284                 rejectionResult.setDelete(op);
1285                 break;
1286             default:
1287         }
1288
1289         return rejectionResult;
1290     }
1291
1292     /**
1293      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1294      * marshals it into a Java object.
1295      *
1296      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1297      * @return - The marshalled response.
1298      * @throws JsonParseException
1299      * @throws JsonMappingException
1300      * @throws IOException
1301      */
1302     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1303         if (jsonResult != null) {
1304             if (logger.isDebugEnabled()) {
1305                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1306             }
1307
1308             ObjectMapper mapper = new ObjectMapper();
1309             mapper.setSerializationInclusion(Include.NON_EMPTY);
1310
1311             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1312         }
1313
1314         return null;
1315     }
1316
1317     /**
1318      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1319      *
1320      * @param esResult - ElasticSearch bulk operations response.
1321      * @return - A generic result set.
1322      */
1323     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1324             List<ElasticSearchResultItem> rejectedOps) {
1325         int totalOps = 0;
1326         int totalSuccess = 0;
1327         int totalFails = 0;
1328
1329         if (logger.isDebugEnabled()) {
1330
1331             logger.debug("ESController: Build generic result set.  ES Results: "
1332                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1333         }
1334
1335         // Build a combined list of result items from the results returned
1336         // from ElasticSearch and the list of operations that we rejected
1337         // without sending to ElasticSearch.
1338         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1339         if (esResult != null) {
1340             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1341         }
1342         combinedResults.addAll(rejectedOps);
1343
1344         // Iterate over the individual results in the resulting result set.
1345         StringBuilder resultsBuilder = new StringBuilder();
1346         AtomicBoolean firstItem = new AtomicBoolean(true);
1347         for (ElasticSearchResultItem item : combinedResults) {
1348
1349             // Increment the operation counts.
1350             totalOps++;
1351             if (isSuccessCode(item.operationStatus().getStatus())) {
1352                 totalSuccess++;
1353             } else {
1354                 totalFails++;
1355             }
1356
1357             // Prepend a comma to our response string unless this it the
1358             // first result in the set.
1359             if (!firstItem.compareAndSet(true, false)) {
1360                 resultsBuilder.append(", ");
1361             }
1362
1363             // Append the current result as a generic json structure.
1364             resultsBuilder.append(item.toJson());
1365         }
1366
1367         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1368                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1369     }
1370
1371
1372     /**
1373      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1374      *
1375      * @param indexName - The index to look for.
1376      * @return - An operation result indicating the success or failure of the check.
1377      * @throws DocumentStoreOperationException
1378      */
1379     public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1380         // Grab the current time so we can use it to generate a metrics log.
1381         MdcOverride override = getStartTime(new MdcOverride());
1382
1383         String fullUrl = getFullUrl("/" + indexName, false);
1384         HttpURLConnection conn = initializeConnection(fullUrl);
1385
1386         try {
1387             conn.setRequestMethod("HEAD");
1388
1389         } catch (ProtocolException e) {
1390             shutdownConnection(conn);
1391             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1392         }
1393
1394         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1395
1396         int resultCode;
1397         try {
1398             resultCode = conn.getResponseCode();
1399         } catch (IOException e) {
1400             shutdownConnection(conn);
1401             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
1402         }
1403         logger.debug(MSG_RESPONSE_CODE + resultCode);
1404
1405         // Initialize operation result with a failure codes / fault string
1406         OperationResult opResult = new OperationResult();
1407         setDefaultOperationResultValues(opResult);
1408         opResult.setResultCode(resultCode);
1409
1410         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
1411
1412         shutdownConnection(conn);
1413
1414         return opResult;
1415     }
1416
1417     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1418         DocumentOperationResult opResult = createDefaultOperationResult();
1419         handleResponse(conn, opResult);
1420         return opResult;
1421     }
1422
1423     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1424             throws DocumentStoreOperationException {
1425         SearchOperationResult opResult = createDefaultSearchOperationResult();
1426         handleResponse(conn, opResult);
1427         return opResult;
1428     }
1429
1430     private DocumentOperationResult createDefaultOperationResult() {
1431         DocumentOperationResult opResult = new DocumentOperationResult();
1432         setDefaultOperationResultValues(opResult);
1433         return opResult;
1434     }
1435
1436     private SearchOperationResult createDefaultSearchOperationResult() {
1437         SearchOperationResult opResult = new SearchOperationResult();
1438         setDefaultOperationResultValues(opResult);
1439         return opResult;
1440     }
1441
1442     private void setDefaultOperationResultValues(OperationResult opResult) {
1443         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
1444         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
1445     }
1446
1447     private void buildDocumentResult(DocumentOperationResult result, String index)
1448             throws DocumentStoreOperationException {
1449
1450         JSONParser parser = new JSONParser();
1451         JSONObject root;
1452         try {
1453             root = (JSONObject) parser.parse(result.getResult());
1454             if (isSuccess(result)) {
1455                 // Success response object
1456                 Document doc = new Document();
1457                 doc.setEtag(result.getResultVersion());
1458                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1459
1460                 doc.setContent((JSONObject) root.get("_source"));
1461                 result.setDocument(doc);
1462
1463             } else {
1464                 // Error response object
1465                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1466                 if (error != null) {
1467                     result.setError(
1468                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1469                 }
1470
1471             }
1472         } catch (Exception e) {
1473             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1474         }
1475     }
1476
1477     private String buildDocumentResponseUrl(String index, String id) {
1478         return ApiUtils.buildDocumentUri(index, id);
1479     }
1480
1481     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1482
1483         JSONParser parser = new JSONParser();
1484         JSONObject root;
1485
1486         try {
1487             root = (JSONObject) parser.parse(result.getResult());
1488             if (isSuccess(result)) {
1489                 JSONObject hits = (JSONObject) root.get("hits");
1490                 JSONArray hitArray = (JSONArray) hits.get("hits");
1491                 SearchHits searchHits = new SearchHits();
1492                 searchHits.setTotalHits(hits.get("total").toString());
1493                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1494
1495                 for (int i = 0; i < hitArray.size(); i++) {
1496                     JSONObject hit = (JSONObject) hitArray.get(i);
1497                     SearchHit searchHit = new SearchHit();
1498                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1499                     Document doc = new Document();
1500                     if (hit.get(JSON_ATTR_VERSION) != null) {
1501                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1502                     }
1503
1504                     doc.setUrl(
1505                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1506                     doc.setContent((JSONObject) hit.get("_source"));
1507                     searchHit.setDocument(doc);
1508                     searchHitArray.add(searchHit);
1509                 }
1510                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1511                 result.setSearchResult(searchHits);
1512
1513                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1514                 if (aggregations != null) {
1515                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1516                     AggregationResults aggs = new AggregationResults();
1517                     aggs.setAggregations(aggResults);
1518                     result.setAggregationResult(aggs);
1519                 }
1520
1521                 // success
1522             } else {
1523                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1524                 if (error != null) {
1525                     result.setError(
1526                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1527                 }
1528             }
1529         } catch (Exception e) {
1530             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1531         }
1532     }
1533
1534     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1535         JSONParser parser = new JSONParser();
1536         JSONObject root;
1537         try {
1538             root = (JSONObject) parser.parse(result.getResult());
1539             if (isSuccess(result)) {
1540                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1541                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1542                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1543                 SuggestHits suggestHits = new SuggestHits();
1544                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1545
1546                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1547
1548                 for (int i = 0; i < optionsArray.size(); i++) {
1549                     JSONObject hit = (JSONObject) optionsArray.get(i);
1550
1551                     SuggestHit suggestHit = new SuggestHit();
1552                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1553                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1554                     Document doc = new Document();
1555                     if (hit.get(JSON_ATTR_VERSION) != null) {
1556                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1557                     }
1558                     doc.setUrl(
1559                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1560
1561                     doc.setContent((JSONObject) hit.get("payload"));
1562                     suggestHit.setDocument(doc);
1563                     suggestHitArray.add(suggestHit);
1564                 }
1565                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1566                 result.setSuggestResult(suggestHits);
1567
1568                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1569                 if (aggregations != null) {
1570                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1571                     AggregationResults aggs = new AggregationResults();
1572                     aggs.setAggregations(aggResults);
1573                     result.setAggregationResult(aggs);
1574                 }
1575
1576                 // success
1577             } else {
1578                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1579                 if (error != null) {
1580                     result.setError(
1581                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1582                 }
1583             }
1584         } catch (Exception e) {
1585             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1586         }
1587     }
1588
1589     /**
1590      * Record the timing of the operation in the metrics log.
1591      *
1592      */
1593     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1594             String... args) {
1595         metricsLogger.info(message,
1596                 new LogFields() //
1597                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1598                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),
1599                 override, args);
1600     }
1601 }