cd0720086666a4e5303d47f6e7b9de2f696e2038
[aai/search-data-service.git] / src / main / java / org / openecomp / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.openecomp.sa.searchdbabstraction.elasticsearch.dao;
24
25 import com.att.aft.dme2.internal.google.common.base.Throwables;
26 import com.fasterxml.jackson.annotation.JsonInclude.Include;
27 import com.fasterxml.jackson.core.JsonParseException;
28 import com.fasterxml.jackson.core.JsonProcessingException;
29 import com.fasterxml.jackson.databind.JsonMappingException;
30 import com.fasterxml.jackson.databind.ObjectMapper;
31 import edu.emory.mathcs.backport.java.util.Arrays;
32 import org.json.simple.JSONArray;
33 import org.json.simple.JSONObject;
34 import org.json.simple.parser.JSONParser;
35 import org.json.simple.parser.ParseException;
36 import org.openecomp.cl.api.LogFields;
37 import org.openecomp.cl.api.LogLine;
38 import org.openecomp.cl.api.Logger;
39 import org.openecomp.cl.eelf.LoggerFactory;
40 import org.openecomp.cl.mdc.MdcContext;
41 import org.openecomp.cl.mdc.MdcOverride;
42 import org.openecomp.sa.rest.AnalysisConfiguration;
43 import org.openecomp.sa.rest.ApiUtils;
44 import org.openecomp.sa.rest.BulkRequest;
45 import org.openecomp.sa.rest.BulkRequest.OperationType;
46 import org.openecomp.sa.rest.DocumentSchema;
47 import org.openecomp.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
48 import org.openecomp.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
49 import org.openecomp.sa.searchdbabstraction.entity.AggregationResult;
50 import org.openecomp.sa.searchdbabstraction.entity.AggregationResults;
51 import org.openecomp.sa.searchdbabstraction.entity.Document;
52 import org.openecomp.sa.searchdbabstraction.entity.DocumentOperationResult;
53 import org.openecomp.sa.searchdbabstraction.entity.ErrorResult;
54 import org.openecomp.sa.searchdbabstraction.entity.OperationResult;
55 import org.openecomp.sa.searchdbabstraction.entity.SearchHit;
56 import org.openecomp.sa.searchdbabstraction.entity.SearchHits;
57 import org.openecomp.sa.searchdbabstraction.entity.SearchOperationResult;
58 import org.openecomp.sa.searchdbabstraction.logging.SearchDbMsgs;
59 import org.openecomp.sa.searchdbabstraction.util.AggregationParsingUtil;
60 import org.openecomp.sa.searchdbabstraction.util.DocumentSchemaUtil;
61 import org.openecomp.sa.searchdbabstraction.util.SearchDbConstants;
62
63 import java.io.BufferedReader;
64 import java.io.File;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.io.InputStreamReader;
69 import java.io.OutputStream;
70 import java.io.OutputStreamWriter;
71 import java.io.PrintWriter;
72 import java.io.StringWriter;
73 import java.net.HttpURLConnection;
74 import java.net.MalformedURLException;
75 import java.net.ProtocolException;
76 import java.net.URL;
77 import java.text.SimpleDateFormat;
78 import java.util.ArrayList;
79 import java.util.List;
80 import java.util.Properties;
81 import java.util.concurrent.atomic.AtomicBoolean;
82 import javax.ws.rs.core.Response.Status;
83
84
85 /**
86  * This class has the Elasticsearch implementation of the
87  * DB operations defined in DocumentStoreInterface.
88  */
89 public class ElasticSearchHttpController implements DocumentStoreInterface {
90
91   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
92       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
93   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
94       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
95   private static final String BULK_IMPORT_INDEX_TEMPLATE =
96       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
97   private static final String BULK_DELETE_TEMPLATE =
98       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
99
100   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
101       "Internal Error: ElasticSearch operation fault occurred";
102   private static final Logger logger = LoggerFactory.getInstance()
103       .getLogger(ElasticSearchHttpController.class.getName());
104   private static final Logger metricsLogger = LoggerFactory.getInstance()
105       .getMetricsLogger(ElasticSearchHttpController.class.getName());
106   private final ElasticSearchConfig config;
107
108   private static final String DEFAULT_TYPE = "default";
109
110   private static ElasticSearchHttpController instance = null;
111
112   protected AnalysisConfiguration analysisConfig;
113
114   public static ElasticSearchHttpController getInstance() {
115
116     synchronized (ElasticSearchHttpController.class) {
117
118       if (instance == null) {
119
120         Properties properties = new Properties();
121         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
122         try {
123           properties.load(new FileInputStream(file));
124         } catch (Exception e) {
125           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
126             "ElasticSearchHTTPController.getInstance",
127             e.getLocalizedMessage());
128         }
129
130         ElasticSearchConfig config = new ElasticSearchConfig(properties);
131         instance = new ElasticSearchHttpController(config);
132       }
133     }
134
135     return instance;
136   }
137
138   public ElasticSearchHttpController(ElasticSearchConfig config) {
139     this.config = config;
140     analysisConfig = new AnalysisConfiguration();
141
142     try {
143       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
144       checkConnection();
145       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
146     } catch (Exception e) {
147       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
148           getFullUrl("", false), e.getMessage());
149     }
150   }
151
152
153   public AnalysisConfiguration getAnalysisConfig() {
154     return analysisConfig;
155   }
156
157   @Override
158   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
159
160     OperationResult result = new OperationResult();
161     result.setResultCode(500);
162
163     try {
164
165       // Submit the request to ElasticSearch to create the index using a
166       // default document type.
167       result = createTable(index,
168           DEFAULT_TYPE,
169           analysisConfig.getEsIndexSettings(),
170           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
171
172       // ElasticSearch will return us a 200 code on success when we
173       // want to report a 201, so translate the result here.
174       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
175       if (isSuccess(result)) {
176         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
177         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
178       }
179
180     } catch (DocumentStoreOperationException e) {
181
182       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
183     }
184
185     return result;
186   }
187
188
189   @Override
190   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
191
192     //Initialize operation result with a failure codes / fault string
193     OperationResult opResult = new OperationResult();
194     opResult.setResultCode(500);
195     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
196
197     // Grab the current time so we can use it to generate a metrics log.
198     MdcOverride override = getStartTime(new MdcOverride());
199
200     String fullUrl = getFullUrl("/" + indexName + "/", false);
201     HttpURLConnection conn = initializeConnection(fullUrl);
202
203     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
204
205     try {
206       conn.setRequestMethod("DELETE");
207     } catch (ProtocolException e) {
208       shutdownConnection(conn);
209       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
210     }
211
212     handleResponse(conn, opResult);
213
214     // Generate a metrics log so we can track how long the operation took.
215     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
216         new LogFields()
217             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
218             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
219         override,
220         indexName);
221
222     shutdownConnection(conn);
223
224     return opResult;
225   }
226
227
228   private OperationResult checkConnection() throws Exception {
229
230     String fullUrl = getFullUrl("/_cluster/health", false);
231     URL url = null;
232     HttpURLConnection conn = null;
233
234     url = new URL(fullUrl);
235     conn = (HttpURLConnection) url.openConnection();
236     conn.setRequestMethod("GET");
237     conn.setDoOutput(true);
238     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
239
240     int resultCode = conn.getResponseCode();
241     logger.debug("getClusterHealth() response Code : " + resultCode);
242     OperationResult opResult = new OperationResult();
243     opResult.setResultCode(resultCode);
244
245     shutdownConnection(conn);
246
247     return opResult;
248   }
249
250   private String getFullUrl(String resourceUrl, boolean isSecure) {
251
252     final String host = config.getIpAddress();
253     final String port = config.getHttpPort();
254
255     if (isSecure) {
256       return String.format("https://%s:%s%s", host, port, resourceUrl);
257     } else {
258       return String.format("http://%s:%s%s", host, port, resourceUrl);
259     }
260   }
261
262   private void shutdownConnection(HttpURLConnection connection) {
263     if (connection == null) {
264       return;
265     }
266
267     InputStream inputstream = null;
268     OutputStream outputstream = null;
269
270     try {
271       inputstream = connection.getInputStream();
272     } catch (IOException e) {
273       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
274     } finally {
275       if (inputstream != null) {
276         try {
277           inputstream.close();
278         } catch (IOException e) {
279           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
280               e.getLocalizedMessage());
281         }
282       }
283     }
284
285     try {
286       outputstream = connection.getOutputStream();
287     } catch (IOException e) {
288       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
289     } finally {
290       if (outputstream != null) {
291         try {
292           outputstream.close();
293         } catch (IOException e) {
294           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
295               e.getLocalizedMessage());
296         }
297       }
298     }
299
300     connection.disconnect();
301   }
302
303   //@Override
304   protected OperationResult createTable(String indexName, String typeName,
305                                         String indexSettings, String indexMappings)
306       throws DocumentStoreOperationException {
307
308     if (indexSettings == null) {
309       logger.debug("No settings provided.");
310     }
311
312     if (indexMappings == null) {
313       logger.debug("No mappings provided.");
314     }
315
316     OperationResult opResult = new OperationResult();
317
318     // Initialize operation result with a failure codes / fault string
319     opResult.setResultCode(500);
320     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
321
322     // Grab the current time so we can use it to generate a metrics log.
323     MdcOverride override = getStartTime(new MdcOverride());
324
325     String fullUrl = getFullUrl("/" + indexName + "/", false);
326     HttpURLConnection conn = initializeConnection(fullUrl);
327
328     try {
329       conn.setRequestMethod("PUT");
330     } catch (ProtocolException e) {
331       shutdownConnection(conn);
332       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
333     }
334
335     StringBuilder sb = new StringBuilder(128);
336     sb.append("{ \"settings\" : ");
337     sb.append(indexSettings);
338     sb.append(",");
339
340     sb.append("\"mappings\" : {");
341     sb.append("\"" + typeName + "\" :");
342     sb.append(indexMappings);
343     sb.append("}}");
344
345     attachContent(conn, sb.toString());
346
347     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
348     logger.debug("Request content: " + sb.toString());
349
350     handleResponse(conn, opResult);
351
352     shutdownConnection(conn);
353
354     // Generate a metrics log so we can track how long the operation took.
355     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
356         new LogFields()
357             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
358             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
359         override,
360         indexName);
361
362     return opResult;
363   }
364
365   @Override
366   public DocumentOperationResult createDocument(String                  indexName, 
367                                                 DocumentStoreDataEntity document,
368                                                 boolean                 allowImplicitIndexCreation)
369       throws DocumentStoreOperationException {
370     
371     if(!allowImplicitIndexCreation) {
372       
373       // Before we do anything, make sure that the specified index actually exists in the
374       // document store - we don't want to rely on ElasticSearch to fail the document
375       // create because it could be configured to implicitly create a non-existent index,
376       // which can lead to hard-to-debug behaviour with queries down the road.
377       OperationResult indexExistsResult = checkIndexExistence(indexName);
378       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
379   
380         DocumentOperationResult opResult = new DocumentOperationResult();
381         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
382         opResult.setResult("Document Index '" + indexName + "' does not exist.");
383         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
384         return opResult;
385       }
386     }
387     
388     if (document.getId() == null || document.getId().isEmpty()) {
389       return createDocumentWithoutId(indexName, document);
390     } else {
391       return createDocumentWithId(indexName, document);
392     }
393   }
394
395   private DocumentOperationResult createDocumentWithId(String indexName,
396                                                        DocumentStoreDataEntity document)
397       throws DocumentStoreOperationException {
398     // check if the document already exists
399     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
400
401
402     if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
403       if (opResult.getResultCode() == Status.OK.getStatusCode()) {
404         opResult.setFailureCause("A document with the same id already exists.");
405       } else {
406         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
407       }
408       opResult.setResultCode(Status.CONFLICT.getStatusCode());
409       return opResult;
410     }
411
412     opResult = new DocumentOperationResult();
413     // Initialize operation result with a failure codes / fault string
414     opResult.setResultCode(500);
415     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
416
417     // Grab the current time so we can use it to generate a metrics log.
418     MdcOverride override = getStartTime(new MdcOverride());
419
420     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
421         + "/" + document.getId(), false);
422     HttpURLConnection conn = initializeConnection(fullUrl);
423
424     try {
425       conn.setRequestMethod("PUT");
426     } catch (ProtocolException e) {
427       shutdownConnection(conn);
428       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
429     }
430
431     attachDocument(conn, document);
432
433     logger.debug("Sending 'PUT' request to: " + conn.getURL());
434
435     handleResponse(conn, opResult);
436     buildDocumentResult(opResult, indexName);
437
438     // Generate a metrics log so we can track how long the operation took.
439     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
440         new LogFields()
441             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
442             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
443         override,
444         indexName);
445
446     shutdownConnection(conn);
447
448     return opResult;
449
450   }
451
452   private DocumentOperationResult createDocumentWithoutId(String indexName,
453                                                           DocumentStoreDataEntity document)
454       throws DocumentStoreOperationException {
455
456     DocumentOperationResult response = new DocumentOperationResult();
457     // Initialize operation result with a failure codes / fault string
458     response.setResultCode(500);
459     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
460
461     // Grab the current time so we can use it to generate a metrics log.
462     MdcOverride override = getStartTime(new MdcOverride());
463
464     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
465     HttpURLConnection conn = initializeConnection(fullUrl);
466
467     try {
468       conn.setRequestMethod("POST");
469     } catch (ProtocolException e) {
470       shutdownConnection(conn);
471       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
472     }
473
474     attachDocument(conn, document);
475
476     logger.debug("Sending 'POST' request to: " + conn.getURL());
477
478     handleResponse(conn, response);
479     buildDocumentResult(response, indexName);
480
481     // Generate a metrics log so we can track how long the operation took.
482     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
483         new LogFields()
484             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
485             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
486         override,
487         indexName);
488
489     shutdownConnection(conn);
490
491     return response;
492   }
493
494   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
495       throws DocumentStoreOperationException {
496     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
497     conn.setRequestProperty("Connection", "Close");
498
499     attachContent(conn, doc.getContentInJson());
500   }
501
502   private DocumentOperationResult checkDocumentExistence(String indexName,
503                                                          String docId)
504       throws DocumentStoreOperationException {
505     DocumentOperationResult opResult = new DocumentOperationResult();
506
507     // Initialize operation result with a failure codes / fault string
508     opResult.setResultCode(500);
509
510     // Grab the current time so we can use it to generate a metrics log.
511     MdcOverride override = getStartTime(new MdcOverride());
512
513     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
514     HttpURLConnection conn = initializeConnection(fullUrl);
515
516     try {
517       conn.setRequestMethod("HEAD");
518     } catch (ProtocolException e) {
519       shutdownConnection(conn);
520       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
521     }
522
523     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
524
525     int resultCode;
526     try {
527       resultCode = conn.getResponseCode();
528     } catch (IOException e) {
529       shutdownConnection(conn);
530       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
531     }
532
533     logger.debug("Response Code : " + resultCode);
534
535     opResult.setResultCode(resultCode);
536
537     // Generate a metrics log so we can track how long the operation took.
538     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
539         new LogFields()
540             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
541             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
542         override,
543         indexName,
544         docId);
545
546     shutdownConnection(conn);
547
548     return opResult;
549   }
550
551   @Override
552   public DocumentOperationResult updateDocument(String                  indexName, 
553                                                 DocumentStoreDataEntity document,
554                                                 boolean                 allowImplicitIndexCreation)
555       throws DocumentStoreOperationException {
556     
557     if(!allowImplicitIndexCreation) {
558       
559       // Before we do anything, make sure that the specified index actually exists in the
560       // document store - we don't want to rely on ElasticSearch to fail the document
561       // create because it could be configured to implicitly create a non-existent index,
562       // which can lead to hard-to-debug behaviour with queries down the road.
563       OperationResult indexExistsResult = checkIndexExistence(indexName);
564       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
565   
566         DocumentOperationResult opResult = new DocumentOperationResult();
567         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
568         opResult.setResult("Document Index '" + indexName + "' does not exist.");
569         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
570         return opResult;
571       }
572     }
573     
574     DocumentOperationResult opResult = new DocumentOperationResult();
575
576     // Initialize operation result with a failure codes / fault string
577     opResult.setResultCode(500);
578     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
579
580     // Grab the current time so we can use it to generate a metrics log.
581     MdcOverride override = getStartTime(new MdcOverride());
582
583     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
584         + "?version=" + document.getVersion(), false);
585     HttpURLConnection conn = initializeConnection(fullUrl);
586
587     try {
588       conn.setRequestMethod("PUT");
589     } catch (ProtocolException e) {
590       shutdownConnection(conn);
591       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
592     }
593
594     attachDocument(conn, document);
595
596     logger.debug("Sending 'PUT' request to: " + conn.getURL());
597
598     handleResponse(conn, opResult);
599     buildDocumentResult(opResult, indexName);
600
601     // Generate a metrics log so we can track how long the operation took.
602     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
603         new LogFields()
604             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
605             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
606         override,
607         indexName,
608         document.getId());
609
610     shutdownConnection(conn);
611
612     return opResult;
613   }
614
615   @Override
616   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
617       throws DocumentStoreOperationException {
618     DocumentOperationResult opResult = new DocumentOperationResult();
619
620     // Initialize operation result with a failure codes / fault string
621     opResult.setResultCode(500);
622     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
623
624     // Grab the current time so we can use it to generate a metrics log.
625     MdcOverride override = getStartTime(new MdcOverride());
626
627     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
628         + "?version=" + document.getVersion(), false);
629     HttpURLConnection conn = initializeConnection(fullUrl);
630
631     try {
632       conn.setRequestMethod("DELETE");
633     } catch (ProtocolException e) {
634       shutdownConnection(conn);
635       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
636     }
637
638     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
639
640     handleResponse(conn, opResult);
641     buildDocumentResult(opResult, indexName);
642     //supress the etag and url in response for delete as they are not required
643     if (opResult.getDocument() != null) {
644       opResult.getDocument().setEtag(null);
645       opResult.getDocument().setUrl(null);
646     }
647
648     // Generate a metrics log so we can track how long the operation took.
649     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
650         new LogFields()
651             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
652             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
653         override,
654         indexName,
655         document.getId());
656
657     shutdownConnection(conn);
658
659     return opResult;
660   }
661
662   @Override
663   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
664       throws DocumentStoreOperationException {
665     DocumentOperationResult opResult = new DocumentOperationResult();
666
667     // Initialize operation result with a failure codes / fault string
668     opResult.setResultCode(500);
669     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
670
671     // Grab the current time so we can use it to generate a metrics log.
672     MdcOverride override = getStartTime(new MdcOverride());
673
674     String fullUrl = null;
675     if (document.getVersion() == null) {
676       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
677     } else {
678       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
679           + "?version=" + document.getVersion(), false);
680     }
681     HttpURLConnection conn = initializeConnection(fullUrl);
682
683     logger.debug("\nSending 'GET' request to: " + conn.getURL());
684
685     handleResponse(conn, opResult);
686     buildDocumentResult(opResult, indexName);
687
688     // Generate a metrics log so we can track how long the operation took.
689     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
690         new LogFields()
691             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
692             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
693         override,
694         indexName,
695         document.getId());
696
697     shutdownConnection(conn);
698
699     return opResult;
700   }
701
702   public SearchOperationResult search(String indexName, String queryString)
703       throws DocumentStoreOperationException {
704     SearchOperationResult opResult = new SearchOperationResult();
705
706     // Initialize operation result with a failure codes / fault string
707     opResult.setResultCode(500);
708     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
709
710     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
711
712     // Grab the current time so we can use it to generate a metrics log.
713     MdcOverride override = getStartTime(new MdcOverride());
714
715     HttpURLConnection conn = initializeConnection(fullUrl);
716
717     try {
718       conn.setRequestMethod("GET");
719     } catch (ProtocolException e) {
720       shutdownConnection(conn);
721       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
722     }
723
724     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
725
726     handleResponse(conn, opResult);
727     buildSearchResult(opResult, indexName);
728
729
730     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
731         new LogFields()
732             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
733             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
734         override,
735         indexName,
736         queryString);
737
738     return opResult;
739   }
740
741   public SearchOperationResult searchWithPayload(String indexName, String query)
742       throws DocumentStoreOperationException {
743     SearchOperationResult opResult = new SearchOperationResult();
744
745     if (logger.isDebugEnabled()) {
746       logger.debug("Querying index: " + indexName + " with query string: " + query);
747     }
748
749     // Initialize operation result with a failure codes / fault string
750     opResult.setResultCode(500);
751     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
752
753     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
754
755     // Grab the current time so we can use it to generate a metrics log.
756     MdcOverride override = getStartTime(new MdcOverride());
757
758     HttpURLConnection conn = initializeConnection(fullUrl);
759
760     try {
761       conn.setRequestMethod("POST");
762     } catch (ProtocolException e) {
763       shutdownConnection(conn);
764       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
765     }
766
767     attachContent(conn, query);
768
769     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
770     logger.debug("Request body =  Elasticsearch query = " + query);
771
772     handleResponse(conn, opResult);
773     buildSearchResult(opResult, indexName);
774
775     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
776         new LogFields()
777             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
778             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
779         override,
780         indexName,
781         query);
782
783     shutdownConnection(conn);
784
785     return opResult;
786   }
787
788   private void attachContent(HttpURLConnection conn, String content)
789       throws DocumentStoreOperationException {
790     OutputStream outputStream = null;
791     OutputStreamWriter out = null;
792
793     try {
794       outputStream = conn.getOutputStream();
795     } catch (IOException e) {
796       shutdownConnection(conn);
797       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
798     }
799
800     out = new OutputStreamWriter(outputStream);
801
802     try {
803       out.write(content);
804       out.close();
805     } catch (IOException e) {
806       shutdownConnection(conn);
807       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
808     }
809   }
810
811   private HttpURLConnection initializeConnection(String fullUrl)
812       throws DocumentStoreOperationException {
813     URL url = null;
814     HttpURLConnection conn = null;
815
816     try {
817       url = new URL(fullUrl);
818     } catch (MalformedURLException e) {
819       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
820     }
821
822     try {
823       conn = (HttpURLConnection) url.openConnection();
824       conn.setDoOutput(true);
825     } catch (IOException e) {
826       shutdownConnection(conn);
827       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
828     }
829
830     return conn;
831   }
832
833   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
834       throws DocumentStoreOperationException {
835     int resultCode = 200;
836
837     try {
838       resultCode = conn.getResponseCode();
839     } catch (IOException e) {
840       shutdownConnection(conn);
841       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
842     }
843
844     logger.debug("Response Code : " + resultCode);
845
846     InputStream inputStream = null;
847
848     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
849       inputStream = conn.getErrorStream();
850     } else {
851       try {
852         inputStream = conn.getInputStream();
853       } catch (IOException e) {
854         shutdownConnection(conn);
855         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
856       }
857     }
858
859     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
860     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
861
862     StringBuilder result = new StringBuilder(128);
863     String string = null;
864
865     try {
866       while ((string = bufferedreader.readLine()) != null) {
867         result.append(string).append("\n");
868       }
869     } catch (IOException e) {
870       shutdownConnection(conn);
871       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
872     }
873
874     if (resultCode == Status.CONFLICT.getStatusCode()) {
875       opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
876     } else {
877       opResult.setResultCode(resultCode);
878     }
879     if (logger.isDebugEnabled()) {
880       logger.debug("Raw result string from ElasticSearch = " + result.toString());
881     }
882     opResult.setResult(result.toString());
883     opResult.setResultVersion(extractVersion(result.toString()));
884   }
885
886   private String extractVersion(String result) throws DocumentStoreOperationException {
887
888     JSONParser parser = new JSONParser();
889     String version = null;
890     try {
891       JSONObject root = (JSONObject) parser.parse(result);
892       if (root.get("_version") != null) {
893         version = root.get("_version").toString();
894       }
895
896     } catch (ParseException e) {
897
898       // Not all responses from ElasticSearch include a version, so
899       // if we don't get one back, just return an empty string rather
900       // than trigger a false failure.
901       version = "";
902     }
903     return version;
904   }
905
906   /**
907    * This convenience method gets the current system time and stores
908    * it in an attribute in the supplied {@link MdcOverride} object so
909    * that it can be used later by the metrics logger.
910    *
911    * @param override - The {@link MdcOverride} object to update.
912    * @return - The supplied {@link MdcOverride} object.
913    */
914   private MdcOverride getStartTime(MdcOverride override) {
915
916     // Grab the current time...
917     long startTimeInMs = System.currentTimeMillis();
918
919     // ...and add it as an attribute to the supplied MDC Override
920     // object.
921     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
922     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
923
924     // Return the MdcOverride object that we were passed.
925     // This looks odd, but it allows us to do stuff like:
926     //
927     //    MdcOverride ov = getStartTime(new MdcOverride())
928     //
929     // which is quite handy, but also allows us to pass in an existing
930     // MdcOverride object which already has some attributes set.
931     return override;
932   }
933
934   private boolean isSuccess(OperationResult result) {
935
936     return isSuccessCode(result.getResultCode());
937   }
938
939
940   private boolean isSuccessCode(int statusCode) {
941     return ((statusCode >= 200) && (statusCode < 300));
942   }
943
944
945   @Override
946   public OperationResult performBulkOperations(BulkRequest[] requests)
947       throws DocumentStoreOperationException {
948
949     if (logger.isDebugEnabled()) {
950       String dbgString = "ESController: performBulkOperations - Operations: ";
951
952       for (BulkRequest request : requests) {
953         dbgString += "[" + request.toString() + "] ";
954       }
955
956       logger.debug(dbgString);
957     }
958
959     // Grab the current time so we can use it to generate a metrics log.
960     MdcOverride override = getStartTime(new MdcOverride());
961
962     // Parse the supplied set of operations.
963     // Iterate over the list of operations which we were provided and
964     // translate them into a format that ElasticSearh understands.
965     int opCount = 0;
966     StringBuilder esOperationSet = new StringBuilder(128);
967     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
968     for (BulkRequest request : requests) {
969
970       // Convert the request to the syntax ElasticSearch likes.
971       if (buildEsOperation(request, esOperationSet, rejected)) {
972         opCount++;
973       }
974     }
975
976     ElasticSearchBulkOperationResult opResult = null;
977     if (opCount > 0) {
978
979       // Open an HTTP connection to the ElasticSearch back end.
980       String fullUrl = getFullUrl("/_bulk", false);
981       URL url;
982       HttpURLConnection conn;
983       try {
984
985         url = new URL(fullUrl);
986         conn = (HttpURLConnection) url.openConnection();
987         conn.setRequestMethod("PUT");
988         conn.setDoOutput(true);
989         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
990         conn.setRequestProperty("Connection", "Close");
991
992       } catch (IOException e) {
993
994         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
995         if (logger.isDebugEnabled()) {
996           logger.debug(Throwables.getStackTraceAsString(e));
997         }
998
999         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
1000             + e.getMessage(), e);
1001       }
1002
1003       StringBuilder bulkResult = new StringBuilder(128);
1004       try {
1005         // Create an output stream to write our request to.
1006         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1007         ;
1008
1009         if (logger.isDebugEnabled()) {
1010           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1011           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1012               "\\n"));
1013         }
1014
1015         // Write the resulting request string to our output stream. (this sends the request to ES?)
1016         out.write(esOperationSet.toString());
1017         out.close();
1018
1019         // Open an input stream on our connection in order to read back the results.
1020         InputStream is = conn.getInputStream();
1021         InputStreamReader inputstreamreader = new InputStreamReader(is);
1022         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1023
1024         // Read the contents of the input stream into our result string...
1025         String esResponseString = null;
1026
1027         while ((esResponseString = bufferedreader.readLine()) != null) {
1028           bulkResult.append(esResponseString).append("\n");
1029         }
1030
1031       } catch (IOException e) {
1032
1033         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1034         if (logger.isDebugEnabled()) {
1035           StringWriter sw = new StringWriter();
1036           e.printStackTrace(new PrintWriter(sw));
1037           logger.debug(sw.toString());
1038         }
1039
1040         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1041             + e.getMessage(), e);
1042       }
1043
1044       if (logger.isDebugEnabled()) {
1045         logger.debug("ESController: Received result string from ElasticSearch: = "
1046             + bulkResult.toString());
1047       }
1048
1049       // ...and marshal the resulting string into a Java object.
1050       try {
1051         opResult = marshallEsBulkResult(bulkResult.toString());
1052
1053       } catch (IOException e) {
1054
1055         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1056         if (logger.isDebugEnabled()) {
1057           logger.debug(Throwables.getStackTraceAsString(e));
1058         }
1059
1060         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1061             + e.getMessage(), e);
1062       }
1063     }
1064
1065     // Finally, build the operation result and return it to the caller.
1066     OperationResult result = new OperationResult();
1067     result.setResultCode(207);
1068     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1069
1070     // In the success case we don't want the entire result string to be
1071     // dumped into the metrics log, so concatenate it.
1072     String resultStringForMetricsLog = result.getResult();
1073     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1074       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1075           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1076     }
1077
1078     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1079         new LogFields()
1080             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1081             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1082         override);
1083
1084     return result;
1085   }
1086
1087
1088   /**
1089    * This method converts a {@link BulkRequest} object into a json structure
1090    * which can be understood by ElasticSearch.
1091    *
1092    * @param request - The request to be performed.
1093    * @param sb      - The string builder to append the json data to
1094    * @throws DocumentStoreOperationException
1095    */
1096   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1097                                    List<ElasticSearchResultItem> fails)
1098       throws DocumentStoreOperationException {
1099
1100     boolean retVal = true;
1101     OperationResult indexExistsResult = null;
1102
1103     // What kind of operation are we performing?
1104     switch (request.getOperationType()) {
1105
1106       // Create a new document.
1107       case CREATE:
1108
1109         // Make sure that we were supplied a document payload.
1110         if (request.getOperation().getDocument() == null) {
1111
1112           fails.add(generateRejectionEntry(request.getOperationType(),
1113               "Missing document payload",
1114               request.getIndex(),
1115               request.getId(),
1116               400,
1117               request.getOperation().getMetaData().getUrl()));
1118           return false;
1119         }
1120
1121         // Make sure that the supplied document URL is formatted
1122         // correctly.
1123         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1124           fails.add(generateRejectionEntry(request.getOperationType(),
1125               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1126               request.getIndex(),
1127               "",
1128               400,
1129               request.getOperation().getMetaData().getUrl()));
1130           return false;
1131         }
1132
1133         // Validate that the specified index actually exists before we
1134         // try to perform the create.
1135         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1136
1137           fails.add(generateRejectionEntry(request.getOperationType(),
1138               "Specified resource does not exist: "
1139                   + request.getOperation().getMetaData().getUrl(),
1140               request.getIndex(),
1141               request.getId(),
1142               404,
1143               request.getOperation().getMetaData().getUrl()));
1144           return false;
1145         }
1146
1147         // If we were supplied an id for the new document, then
1148         // include it in the bulk operation to Elastic Search
1149         if (request.getId() == null) {
1150
1151           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1152               request.getIndex(),
1153               DEFAULT_TYPE));
1154
1155           // Otherwise, we just leave that parameter off and ElasticSearch
1156           // will generate one for us.
1157         } else {
1158           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1159               request.getIndex(),
1160               DEFAULT_TYPE,
1161               request.getId()));
1162         }
1163
1164         try {
1165           // Append the document that we want to create.
1166           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1167         } catch (JsonProcessingException e) {
1168           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1169         }
1170
1171         break;
1172
1173       // Update an existing document.
1174       case UPDATE:
1175
1176         // Make sure that we were supplied a document payload.
1177         if (request.getOperation().getDocument() == null) {
1178
1179           fails.add(generateRejectionEntry(request.getOperationType(),
1180               "Missing document payload",
1181               request.getIndex(),
1182               request.getId(),
1183               400,
1184               request.getOperation().getMetaData().getUrl()));
1185           return false;
1186         }
1187
1188         // Make sure that the supplied document URL is formatted
1189         // correctly.
1190         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1191           fails.add(generateRejectionEntry(request.getOperationType(),
1192               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1193               request.getIndex(),
1194               "",
1195               400,
1196               request.getOperation().getMetaData().getUrl()));
1197           return false;
1198         }
1199
1200         // Validate that the specified index actually exists before we
1201         // try to perform the update.
1202         if (!indexExists(request.getIndex())) {
1203
1204           fails.add(generateRejectionEntry(request.getOperationType(),
1205               "Specified resource does not exist: "
1206                   + request.getOperation().getMetaData().getUrl(),
1207               request.getIndex(),
1208               request.getId(),
1209               404,
1210               request.getOperation().getMetaData().getUrl()));
1211           return false;
1212         }
1213
1214         // Validate that the document we are trying to update actually
1215         // exists before we try to perform the update.
1216         if (!documentExists(request.getIndex(), request.getId())) {
1217
1218           fails.add(generateRejectionEntry(request.getOperationType(),
1219               "Specified resource does not exist: "
1220                   + request.getOperation().getMetaData().getUrl(),
1221               request.getIndex(),
1222               request.getId(),
1223               404,
1224               request.getOperation().getMetaData().getUrl()));
1225           return false;
1226         }
1227
1228         // It is mandatory that a version be supplied for an update operation,
1229         // so validate that now.
1230         if (request.getOperation().getMetaData().getEtag() == null) {
1231
1232           fails.add(generateRejectionEntry(request.getOperationType(),
1233               "Missing mandatory ETag field",
1234               request.getIndex(),
1235               request.getId(),
1236               400,
1237               request.getOperation().getMetaData().getUrl()));
1238           return false;
1239         }
1240
1241         // Generate the update request...
1242         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1243             request.getIndex(),
1244             DEFAULT_TYPE,
1245             request.getId(),
1246             request.getOperation().getMetaData().getEtag()));
1247
1248         // ...and append the document that we want to update.
1249         try {
1250           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1251         } catch (JsonProcessingException e) {
1252           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1253         }
1254         break;
1255
1256       // Delete an existing document.
1257       case DELETE:
1258
1259         // Make sure that the supplied document URL is formatted
1260         // correctly.
1261         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1262           fails.add(generateRejectionEntry(request.getOperationType(),
1263               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1264               request.getIndex(),
1265               "",
1266               400,
1267               request.getOperation().getMetaData().getUrl()));
1268           return false;
1269         }
1270
1271         // Validate that the specified index actually exists before we
1272         // try to perform the delete.
1273         if (!indexExists(request.getIndex())) {
1274
1275           fails.add(generateRejectionEntry(request.getOperationType(),
1276               "Specified resource does not exist: "
1277                   + request.getOperation().getMetaData().getUrl(),
1278               request.getIndex(),
1279               request.getId(),
1280               404,
1281               request.getOperation().getMetaData().getUrl()));
1282           return false;
1283         }
1284
1285         // Validate that the document we are trying to update actually
1286         // exists before we try to perform the delete.
1287         if (!documentExists(request.getIndex(), request.getId())) {
1288
1289           fails.add(generateRejectionEntry(request.getOperationType(),
1290               "Specified resource does not exist: "
1291                   + request.getOperation().getMetaData().getUrl(),
1292               request.getIndex(),
1293               request.getId(),
1294               404,
1295               request.getOperation().getMetaData().getUrl()));
1296           return false;
1297         }
1298
1299         // It is mandatory that a version be supplied for a delete operation,
1300         // so validate that now.
1301         if (request.getOperation().getMetaData().getEtag() == null) {
1302
1303           fails.add(generateRejectionEntry(request.getOperationType(),
1304               "Missing mandatory ETag field",
1305               request.getIndex(),
1306               request.getId(),
1307               400,
1308               request.getOperation().getMetaData().getUrl()));
1309           return false;
1310         }
1311
1312         // Generate the delete request.
1313         sb.append(String.format(BULK_DELETE_TEMPLATE,
1314             request.getIndex(),
1315             DEFAULT_TYPE,
1316             request.getId(),
1317             request.getOperation().getMetaData().getEtag()));
1318         break;
1319       default:
1320     }
1321
1322     return retVal;
1323   }
1324
1325   private boolean indexExists(String index) throws DocumentStoreOperationException {
1326
1327     OperationResult indexExistsResult = checkIndexExistence(index);
1328
1329     return ((indexExistsResult.getResultCode() >= 200)
1330         && (indexExistsResult.getResultCode() < 300));
1331   }
1332
1333   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1334
1335     OperationResult docExistsResult = checkDocumentExistence(index, id);
1336
1337     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1338   }
1339
1340   /**
1341    * This method constructs a status entry for a bulk operation which has
1342    * been rejected before even sending it to the document store.
1343    *
1344    * @param rejectReason - A message describing why the operation was rejected.
1345    * @param anId         - The identifier associated with the document being
1346    *                     acted on.
1347    * @param statusCode  - An HTTP status code.
1348    * @return - A result set item.
1349    */
1350   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1351                                                          String rejectReason,
1352                                                          String index,
1353                                                          String anId,
1354                                                          int statusCode,
1355                                                          String originalUrl) {
1356
1357     ElasticSearchError err = new ElasticSearchError();
1358     err.setReason(rejectReason);
1359
1360     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1361     op.setIndex(index);
1362     op.setId(anId);
1363     op.setStatus(statusCode);
1364     op.setError(err);
1365     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1366
1367     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1368
1369     switch (opType) {
1370       case CREATE:
1371         rejectionResult.setCreate(op);
1372         break;
1373       case UPDATE:
1374         rejectionResult.setIndex(op);
1375         break;
1376       case DELETE:
1377         rejectionResult.setDelete(op);
1378         break;
1379       default:
1380     }
1381
1382     return rejectionResult;
1383   }
1384
1385
1386   /**
1387    * This method takes the json structure returned from ElasticSearch in
1388    * response to a bulk operations request and marshals it into a Java
1389    * object.
1390    *
1391    * @param jsonResult - The bulk operations response returned from
1392    *                   ElasticSearch.
1393    * @return - The marshalled response.
1394    * @throws JsonParseException
1395    * @throws JsonMappingException
1396    * @throws IOException
1397    */
1398   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1399       throws JsonParseException, JsonMappingException, IOException {
1400
1401     if (jsonResult != null) {
1402       if (logger.isDebugEnabled()) {
1403         logger.debug("ESController: Marshalling ES result set from json: "
1404             + jsonResult.replaceAll("\n", ""));
1405       }
1406
1407       ObjectMapper mapper = new ObjectMapper();
1408       mapper.setSerializationInclusion(Include.NON_EMPTY);
1409
1410       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1411     }
1412
1413     return null;
1414   }
1415
1416
1417   /**
1418    * This method takes the marshalled ElasticSearch bulk response and
1419    * converts it into a generic response payload.
1420    *
1421    * @param esResult - ElasticSearch bulk operations response.
1422    * @return - A generic result set.
1423    */
1424   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1425                                            List<ElasticSearchResultItem> rejectedOps) {
1426
1427     int totalOps = 0;
1428     int totalSuccess = 0;
1429     int totalFails = 0;
1430
1431     if (logger.isDebugEnabled()) {
1432
1433       logger.debug("ESController: Build generic result set.  ES Results: "
1434           + ((esResult != null) ? esResult.toString() : "[]")
1435           + " Rejected Ops: " + rejectedOps.toString());
1436     }
1437
1438     // Build a combined list of result items from the results returned
1439     // from ElasticSearch and the list of operations that we rejected
1440     // without sending to ElasticSearch.
1441     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1442     if (esResult != null) {
1443       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1444     }
1445     combinedResults.addAll(rejectedOps);
1446
1447     // Iterate over the individual results in the resulting result set.
1448     StringBuilder resultsBuilder = new StringBuilder();
1449     AtomicBoolean firstItem = new AtomicBoolean(true);
1450     for (ElasticSearchResultItem item : combinedResults) {
1451
1452       // Increment the operation counts.
1453       totalOps++;
1454       if (isSuccessCode(item.operationStatus().getStatus())) {
1455         totalSuccess++;
1456       } else {
1457         totalFails++;
1458       }
1459
1460       // Prepend a comma to our response string unless this it the
1461       // first result in the set.
1462       if (!firstItem.compareAndSet(true, false)) {
1463         resultsBuilder.append(", ");
1464       }
1465
1466       // Append the current result as a generic json structure.
1467       resultsBuilder.append(item.toJson());
1468     }
1469
1470     // Now, build the result string and return it.
1471     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1472         + "\"total_success\": " + totalSuccess + ", "
1473         + "\"total_fails\": " + totalFails + ", "
1474         + "\"results\": ["
1475         + resultsBuilder.toString()
1476         + "]}";
1477
1478     return responseBody;
1479   }
1480
1481
1482   /**
1483    * This method queryies ElasticSearch to determine if the supplied
1484    * index is present in the document store.
1485    *
1486    * @param indexName - The index to look for.
1487    * @return - An operation result indicating the success or failure of
1488    * the check.
1489    * @throws DocumentStoreOperationException
1490    */
1491   public OperationResult checkIndexExistence(String indexName)
1492       throws DocumentStoreOperationException {
1493
1494     // Initialize operation result with a failure codes / fault string
1495     OperationResult opResult = new OperationResult();
1496     opResult.setResultCode(500);
1497
1498     // Grab the current time so we can use it to generate a metrics log.
1499     MdcOverride override = getStartTime(new MdcOverride());
1500
1501     String fullUrl = getFullUrl("/" + indexName, false);
1502     HttpURLConnection conn = initializeConnection(fullUrl);
1503
1504     try {
1505       conn.setRequestMethod("HEAD");
1506
1507     } catch (ProtocolException e) {
1508       shutdownConnection(conn);
1509       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1510     }
1511
1512     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1513
1514     int resultCode;
1515     try {
1516       resultCode = conn.getResponseCode();
1517     } catch (IOException e) {
1518       shutdownConnection(conn);
1519       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1520     }
1521     logger.debug("Response Code : " + resultCode);
1522
1523     opResult.setResultCode(resultCode);
1524
1525     // Generate a metrics log so we can track how long the operation took.
1526     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1527         new LogFields()
1528             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1529             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1530         override,
1531         indexName);
1532
1533     shutdownConnection(conn);
1534
1535     return opResult;
1536   }
1537
1538
1539   private void buildDocumentResult(DocumentOperationResult result, String index)
1540       throws DocumentStoreOperationException {
1541
1542     JSONParser parser = new JSONParser();
1543     JSONObject root;
1544     try {
1545       root = (JSONObject) parser.parse(result.getResult());
1546
1547       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1548         // Success response object
1549         Document doc = new Document();
1550         doc.setEtag(result.getResultVersion());
1551         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1552
1553         doc.setContent((JSONObject) root.get("_source"));
1554         result.setDocument(doc);
1555
1556       } else {
1557         // Error response object
1558         JSONObject error = (JSONObject) root.get("error");
1559         if (error != null) {
1560           result.setError(new ErrorResult(error.get("type").toString(),
1561               error.get("reason").toString()));
1562         }
1563
1564       }
1565     } catch (Exception e) {
1566       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1567           + result.getResult());
1568     }
1569
1570
1571   }
1572
1573   private String buildDocumentResponseUrl(String index, String id) {
1574     return ApiUtils.buildDocumentUri(index, id);
1575   }
1576
1577   private void buildSearchResult(SearchOperationResult result, String index)
1578       throws DocumentStoreOperationException {
1579
1580     JSONParser parser = new JSONParser();
1581     JSONObject root;
1582
1583     try {
1584       root = (JSONObject) parser.parse(result.getResult());
1585       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1586         JSONObject hits = (JSONObject) root.get("hits");
1587         JSONArray hitArray = (JSONArray) hits.get("hits");
1588         SearchHits searchHits = new SearchHits();
1589         searchHits.setTotalHits(hits.get("total").toString());
1590         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1591
1592         for (int i = 0; i < hitArray.size(); i++) {
1593           JSONObject hit = (JSONObject) hitArray.get(i);
1594           SearchHit searchHit = new SearchHit();
1595           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1596           Document doc = new Document();
1597           if (hit.get("_version") != null) {
1598             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1599           }
1600
1601           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1602               ? hit.get("_id").toString() : ""));
1603           doc.setContent((JSONObject) hit.get("_source"));
1604           searchHit.setDocument(doc);
1605           searchHitArray.add(searchHit);
1606         }
1607         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1608         result.setSearchResult(searchHits);
1609
1610         JSONObject aggregations = (JSONObject) root.get("aggregations");
1611         if (aggregations != null) {
1612           AggregationResult[] aggResults =
1613               AggregationParsingUtil.parseAggregationResults(aggregations);
1614           AggregationResults aggs = new AggregationResults();
1615           aggs.setAggregations(aggResults);
1616           result.setAggregationResult(aggs);
1617         }
1618
1619         // success
1620       } else {
1621         JSONObject error = (JSONObject) root.get("error");
1622         if (error != null) {
1623           result.setError(new ErrorResult(error.get("type").toString(),
1624               error.get("reason").toString()));
1625         }
1626       }
1627     } catch (Exception e) {
1628       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1629           + result.getResult());
1630     }
1631
1632   }
1633
1634 }