Reject doc create requests if index does not exist
[aai/search-data-service.git] / src / main / java / org / openecomp / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * Search Data Service
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License ati
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.openecomp.sa.searchdbabstraction.elasticsearch.dao;
26
27 import com.att.aft.dme2.internal.google.common.base.Throwables;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import edu.emory.mathcs.backport.java.util.Arrays;
34 import org.json.simple.JSONArray;
35 import org.json.simple.JSONObject;
36 import org.json.simple.parser.JSONParser;
37 import org.json.simple.parser.ParseException;
38 import org.openecomp.cl.api.LogFields;
39 import org.openecomp.cl.api.LogLine;
40 import org.openecomp.cl.api.Logger;
41 import org.openecomp.cl.eelf.LoggerFactory;
42 import org.openecomp.cl.mdc.MdcContext;
43 import org.openecomp.cl.mdc.MdcOverride;
44 import org.openecomp.sa.rest.AnalysisConfiguration;
45 import org.openecomp.sa.rest.ApiUtils;
46 import org.openecomp.sa.rest.BulkRequest;
47 import org.openecomp.sa.rest.BulkRequest.OperationType;
48 import org.openecomp.sa.rest.DocumentSchema;
49 import org.openecomp.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
50 import org.openecomp.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
51 import org.openecomp.sa.searchdbabstraction.entity.AggregationResult;
52 import org.openecomp.sa.searchdbabstraction.entity.AggregationResults;
53 import org.openecomp.sa.searchdbabstraction.entity.Document;
54 import org.openecomp.sa.searchdbabstraction.entity.DocumentOperationResult;
55 import org.openecomp.sa.searchdbabstraction.entity.ErrorResult;
56 import org.openecomp.sa.searchdbabstraction.entity.OperationResult;
57 import org.openecomp.sa.searchdbabstraction.entity.SearchHit;
58 import org.openecomp.sa.searchdbabstraction.entity.SearchHits;
59 import org.openecomp.sa.searchdbabstraction.entity.SearchOperationResult;
60 import org.openecomp.sa.searchdbabstraction.logging.SearchDbMsgs;
61 import org.openecomp.sa.searchdbabstraction.util.AggregationParsingUtil;
62 import org.openecomp.sa.searchdbabstraction.util.DocumentSchemaUtil;
63 import org.openecomp.sa.searchdbabstraction.util.SearchDbConstants;
64
65 import java.io.BufferedReader;
66 import java.io.File;
67 import java.io.FileInputStream;
68 import java.io.IOException;
69 import java.io.InputStream;
70 import java.io.InputStreamReader;
71 import java.io.OutputStream;
72 import java.io.OutputStreamWriter;
73 import java.io.PrintWriter;
74 import java.io.StringWriter;
75 import java.net.HttpURLConnection;
76 import java.net.MalformedURLException;
77 import java.net.ProtocolException;
78 import java.net.URL;
79 import java.text.SimpleDateFormat;
80 import java.util.ArrayList;
81 import java.util.List;
82 import java.util.Properties;
83 import java.util.concurrent.atomic.AtomicBoolean;
84 import javax.ws.rs.core.Response.Status;
85
86
87 /**
88  * This class has the Elasticsearch implementation of the
89  * DB operations defined in DocumentStoreInterface.
90  */
91 public class ElasticSearchHttpController implements DocumentStoreInterface {
92
93   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
94       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
95   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
96       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
97   private static final String BULK_IMPORT_INDEX_TEMPLATE =
98       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
99   private static final String BULK_DELETE_TEMPLATE =
100       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
101
102   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
103       "Internal Error: ElasticSearch operation fault occurred";
104   private static final Logger logger = LoggerFactory.getInstance()
105       .getLogger(ElasticSearchHttpController.class.getName());
106   private static final Logger metricsLogger = LoggerFactory.getInstance()
107       .getMetricsLogger(ElasticSearchHttpController.class.getName());
108   private final ElasticSearchConfig config;
109
110   private static final String DEFAULT_TYPE = "default";
111
112   private static ElasticSearchHttpController instance = null;
113
114   protected AnalysisConfiguration analysisConfig;
115
116   public static ElasticSearchHttpController getInstance() {
117
118     synchronized (ElasticSearchHttpController.class) {
119
120       if (instance == null) {
121
122         Properties properties = new Properties();
123         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
124         try {
125           properties.load(new FileInputStream(file));
126         } catch (Exception e) {
127           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
128             "ElasticSearchHTTPController.getInstance",
129             e.getLocalizedMessage());
130         }
131
132         ElasticSearchConfig config = new ElasticSearchConfig(properties);
133         instance = new ElasticSearchHttpController(config);
134       }
135     }
136
137     return instance;
138   }
139
140   public ElasticSearchHttpController(ElasticSearchConfig config) {
141     this.config = config;
142     analysisConfig = new AnalysisConfiguration();
143
144     try {
145       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
146       checkConnection();
147       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
148     } catch (Exception e) {
149       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
150           getFullUrl("", false), e.getMessage());
151     }
152   }
153
154
155   public AnalysisConfiguration getAnalysisConfig() {
156     return analysisConfig;
157   }
158
159   @Override
160   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
161
162     OperationResult result = new OperationResult();
163     result.setResultCode(500);
164
165     try {
166
167       // Submit the request to ElasticSearch to create the index using a
168       // default document type.
169       result = createTable(index,
170           DEFAULT_TYPE,
171           analysisConfig.getEsIndexSettings(),
172           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
173
174       // ElasticSearch will return us a 200 code on success when we
175       // want to report a 201, so translate the result here.
176       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
177       if (isSuccess(result)) {
178         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
179         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
180       }
181
182     } catch (DocumentStoreOperationException e) {
183
184       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
185     }
186
187     return result;
188   }
189
190
191   @Override
192   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
193
194     //Initialize operation result with a failure codes / fault string
195     OperationResult opResult = new OperationResult();
196     opResult.setResultCode(500);
197     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
198
199     // Grab the current time so we can use it to generate a metrics log.
200     MdcOverride override = getStartTime(new MdcOverride());
201
202     String fullUrl = getFullUrl("/" + indexName + "/", false);
203     HttpURLConnection conn = initializeConnection(fullUrl);
204
205     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
206
207     try {
208       conn.setRequestMethod("DELETE");
209     } catch (ProtocolException e) {
210       shutdownConnection(conn);
211       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
212     }
213
214     handleResponse(conn, opResult);
215
216     // Generate a metrics log so we can track how long the operation took.
217     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
218         new LogFields()
219             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
220             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
221         override,
222         indexName);
223
224     shutdownConnection(conn);
225
226     return opResult;
227   }
228
229
230   private OperationResult checkConnection() throws Exception {
231
232     String fullUrl = getFullUrl("/_cluster/health", false);
233     URL url = null;
234     HttpURLConnection conn = null;
235
236     url = new URL(fullUrl);
237     conn = (HttpURLConnection) url.openConnection();
238     conn.setRequestMethod("GET");
239     conn.setDoOutput(true);
240     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
241
242     int resultCode = conn.getResponseCode();
243     logger.debug("getClusterHealth() response Code : " + resultCode);
244     OperationResult opResult = new OperationResult();
245     opResult.setResultCode(resultCode);
246
247     shutdownConnection(conn);
248
249     return opResult;
250   }
251
252   private String getFullUrl(String resourceUrl, boolean isSecure) {
253
254     final String host = config.getIpAddress();
255     final String port = config.getHttpPort();
256
257     if (isSecure) {
258       return String.format("https://%s:%s%s", host, port, resourceUrl);
259     } else {
260       return String.format("http://%s:%s%s", host, port, resourceUrl);
261     }
262   }
263
264   private void shutdownConnection(HttpURLConnection connection) {
265     if (connection == null) {
266       return;
267     }
268
269     InputStream inputstream = null;
270     OutputStream outputstream = null;
271
272     try {
273       inputstream = connection.getInputStream();
274     } catch (IOException e) {
275       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
276     } finally {
277       if (inputstream != null) {
278         try {
279           inputstream.close();
280         } catch (IOException e) {
281           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
282               e.getLocalizedMessage());
283         }
284       }
285     }
286
287     try {
288       outputstream = connection.getOutputStream();
289     } catch (IOException e) {
290       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
291     } finally {
292       if (outputstream != null) {
293         try {
294           outputstream.close();
295         } catch (IOException e) {
296           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
297               e.getLocalizedMessage());
298         }
299       }
300     }
301
302     connection.disconnect();
303   }
304
305   //@Override
306   protected OperationResult createTable(String indexName, String typeName,
307                                         String indexSettings, String indexMappings)
308       throws DocumentStoreOperationException {
309
310     if (indexSettings == null) {
311       logger.debug("No settings provided.");
312     }
313
314     if (indexMappings == null) {
315       logger.debug("No mappings provided.");
316     }
317
318     OperationResult opResult = new OperationResult();
319
320     // Initialize operation result with a failure codes / fault string
321     opResult.setResultCode(500);
322     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
323
324     // Grab the current time so we can use it to generate a metrics log.
325     MdcOverride override = getStartTime(new MdcOverride());
326
327     String fullUrl = getFullUrl("/" + indexName + "/", false);
328     HttpURLConnection conn = initializeConnection(fullUrl);
329
330     try {
331       conn.setRequestMethod("PUT");
332     } catch (ProtocolException e) {
333       shutdownConnection(conn);
334       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
335     }
336
337     StringBuilder sb = new StringBuilder(128);
338     sb.append("{ \"settings\" : ");
339     sb.append(indexSettings);
340     sb.append(",");
341
342     sb.append("\"mappings\" : {");
343     sb.append("\"" + typeName + "\" :");
344     sb.append(indexMappings);
345     sb.append("}}");
346
347     attachContent(conn, sb.toString());
348
349     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
350     logger.debug("Request content: " + sb.toString());
351
352     handleResponse(conn, opResult);
353
354     shutdownConnection(conn);
355
356     // Generate a metrics log so we can track how long the operation took.
357     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
358         new LogFields()
359             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
360             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
361         override,
362         indexName);
363
364     return opResult;
365   }
366
367   @Override
368   public DocumentOperationResult createDocument(String                  indexName, 
369                                                 DocumentStoreDataEntity document,
370                                                 boolean                 allowImplicitIndexCreation)
371       throws DocumentStoreOperationException {
372     
373     if(!allowImplicitIndexCreation) {
374       
375       // Before we do anything, make sure that the specified index actually exists in the
376       // document store - we don't want to rely on ElasticSearch to fail the document
377       // create because it could be configured to implicitly create a non-existent index,
378       // which can lead to hard-to-debug behaviour with queries down the road.
379       OperationResult indexExistsResult = checkIndexExistence(indexName);
380       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
381   
382         DocumentOperationResult opResult = new DocumentOperationResult();
383         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
384         opResult.setResult("Document Index '" + indexName + "' does not exist.");
385         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
386         return opResult;
387       }
388     }
389     
390     if (document.getId() == null || document.getId().isEmpty()) {
391       return createDocumentWithoutId(indexName, document);
392     } else {
393       return createDocumentWithId(indexName, document);
394     }
395   }
396
397   private DocumentOperationResult createDocumentWithId(String indexName,
398                                                        DocumentStoreDataEntity document)
399       throws DocumentStoreOperationException {
400     // check if the document already exists
401     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
402
403
404     if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
405       if (opResult.getResultCode() == Status.OK.getStatusCode()) {
406         opResult.setFailureCause("A document with the same id already exists.");
407       } else {
408         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
409       }
410       opResult.setResultCode(Status.CONFLICT.getStatusCode());
411       return opResult;
412     }
413
414     opResult = new DocumentOperationResult();
415     // Initialize operation result with a failure codes / fault string
416     opResult.setResultCode(500);
417     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
418
419     // Grab the current time so we can use it to generate a metrics log.
420     MdcOverride override = getStartTime(new MdcOverride());
421
422     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
423         + "/" + document.getId(), false);
424     HttpURLConnection conn = initializeConnection(fullUrl);
425
426     try {
427       conn.setRequestMethod("PUT");
428     } catch (ProtocolException e) {
429       shutdownConnection(conn);
430       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
431     }
432
433     attachDocument(conn, document);
434
435     logger.debug("Sending 'PUT' request to: " + conn.getURL());
436
437     handleResponse(conn, opResult);
438     buildDocumentResult(opResult, indexName);
439
440     // Generate a metrics log so we can track how long the operation took.
441     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
442         new LogFields()
443             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
444             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
445         override,
446         indexName);
447
448     shutdownConnection(conn);
449
450     return opResult;
451
452   }
453
454   private DocumentOperationResult createDocumentWithoutId(String indexName,
455                                                           DocumentStoreDataEntity document)
456       throws DocumentStoreOperationException {
457
458     DocumentOperationResult response = new DocumentOperationResult();
459     // Initialize operation result with a failure codes / fault string
460     response.setResultCode(500);
461     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
462
463     // Grab the current time so we can use it to generate a metrics log.
464     MdcOverride override = getStartTime(new MdcOverride());
465
466     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
467     HttpURLConnection conn = initializeConnection(fullUrl);
468
469     try {
470       conn.setRequestMethod("POST");
471     } catch (ProtocolException e) {
472       shutdownConnection(conn);
473       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
474     }
475
476     attachDocument(conn, document);
477
478     logger.debug("Sending 'POST' request to: " + conn.getURL());
479
480     handleResponse(conn, response);
481     buildDocumentResult(response, indexName);
482
483     // Generate a metrics log so we can track how long the operation took.
484     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
485         new LogFields()
486             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
487             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
488         override,
489         indexName);
490
491     shutdownConnection(conn);
492
493     return response;
494   }
495
496   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
497       throws DocumentStoreOperationException {
498     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
499     conn.setRequestProperty("Connection", "Close");
500
501     attachContent(conn, doc.getContentInJson());
502   }
503
504   private DocumentOperationResult checkDocumentExistence(String indexName,
505                                                          String docId)
506       throws DocumentStoreOperationException {
507     DocumentOperationResult opResult = new DocumentOperationResult();
508
509     // Initialize operation result with a failure codes / fault string
510     opResult.setResultCode(500);
511
512     // Grab the current time so we can use it to generate a metrics log.
513     MdcOverride override = getStartTime(new MdcOverride());
514
515     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
516     HttpURLConnection conn = initializeConnection(fullUrl);
517
518     try {
519       conn.setRequestMethod("HEAD");
520     } catch (ProtocolException e) {
521       shutdownConnection(conn);
522       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
523     }
524
525     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
526
527     int resultCode;
528     try {
529       resultCode = conn.getResponseCode();
530     } catch (IOException e) {
531       shutdownConnection(conn);
532       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
533     }
534
535     logger.debug("Response Code : " + resultCode);
536
537     opResult.setResultCode(resultCode);
538
539     // Generate a metrics log so we can track how long the operation took.
540     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
541         new LogFields()
542             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
543             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
544         override,
545         indexName,
546         docId);
547
548     shutdownConnection(conn);
549
550     return opResult;
551   }
552
553   @Override
554   public DocumentOperationResult updateDocument(String                  indexName, 
555                                                 DocumentStoreDataEntity document,
556                                                 boolean                 allowImplicitIndexCreation)
557       throws DocumentStoreOperationException {
558     
559     if(!allowImplicitIndexCreation) {
560       
561       // Before we do anything, make sure that the specified index actually exists in the
562       // document store - we don't want to rely on ElasticSearch to fail the document
563       // create because it could be configured to implicitly create a non-existent index,
564       // which can lead to hard-to-debug behaviour with queries down the road.
565       OperationResult indexExistsResult = checkIndexExistence(indexName);
566       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
567   
568         DocumentOperationResult opResult = new DocumentOperationResult();
569         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
570         opResult.setResult("Document Index '" + indexName + "' does not exist.");
571         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
572         return opResult;
573       }
574     }
575     
576     DocumentOperationResult opResult = new DocumentOperationResult();
577
578     // Initialize operation result with a failure codes / fault string
579     opResult.setResultCode(500);
580     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
581
582     // Grab the current time so we can use it to generate a metrics log.
583     MdcOverride override = getStartTime(new MdcOverride());
584
585     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
586         + "?version=" + document.getVersion(), false);
587     HttpURLConnection conn = initializeConnection(fullUrl);
588
589     try {
590       conn.setRequestMethod("PUT");
591     } catch (ProtocolException e) {
592       shutdownConnection(conn);
593       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
594     }
595
596     attachDocument(conn, document);
597
598     logger.debug("Sending 'PUT' request to: " + conn.getURL());
599
600     handleResponse(conn, opResult);
601     buildDocumentResult(opResult, indexName);
602
603     // Generate a metrics log so we can track how long the operation took.
604     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
605         new LogFields()
606             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
607             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
608         override,
609         indexName,
610         document.getId());
611
612     shutdownConnection(conn);
613
614     return opResult;
615   }
616
617   @Override
618   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
619       throws DocumentStoreOperationException {
620     DocumentOperationResult opResult = new DocumentOperationResult();
621
622     // Initialize operation result with a failure codes / fault string
623     opResult.setResultCode(500);
624     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
625
626     // Grab the current time so we can use it to generate a metrics log.
627     MdcOverride override = getStartTime(new MdcOverride());
628
629     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
630         + "?version=" + document.getVersion(), false);
631     HttpURLConnection conn = initializeConnection(fullUrl);
632
633     try {
634       conn.setRequestMethod("DELETE");
635     } catch (ProtocolException e) {
636       shutdownConnection(conn);
637       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
638     }
639
640     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
641
642     handleResponse(conn, opResult);
643     buildDocumentResult(opResult, indexName);
644     //supress the etag and url in response for delete as they are not required
645     if (opResult.getDocument() != null) {
646       opResult.getDocument().setEtag(null);
647       opResult.getDocument().setUrl(null);
648     }
649
650     // Generate a metrics log so we can track how long the operation took.
651     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
652         new LogFields()
653             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
654             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
655         override,
656         indexName,
657         document.getId());
658
659     shutdownConnection(conn);
660
661     return opResult;
662   }
663
664   @Override
665   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
666       throws DocumentStoreOperationException {
667     DocumentOperationResult opResult = new DocumentOperationResult();
668
669     // Initialize operation result with a failure codes / fault string
670     opResult.setResultCode(500);
671     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
672
673     // Grab the current time so we can use it to generate a metrics log.
674     MdcOverride override = getStartTime(new MdcOverride());
675
676     String fullUrl = null;
677     if (document.getVersion() == null) {
678       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
679     } else {
680       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
681           + "?version=" + document.getVersion(), false);
682     }
683     HttpURLConnection conn = initializeConnection(fullUrl);
684
685     logger.debug("\nSending 'GET' request to: " + conn.getURL());
686
687     handleResponse(conn, opResult);
688     buildDocumentResult(opResult, indexName);
689
690     // Generate a metrics log so we can track how long the operation took.
691     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
692         new LogFields()
693             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
694             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
695         override,
696         indexName,
697         document.getId());
698
699     shutdownConnection(conn);
700
701     return opResult;
702   }
703
704   public SearchOperationResult search(String indexName, String queryString)
705       throws DocumentStoreOperationException {
706     SearchOperationResult opResult = new SearchOperationResult();
707
708     // Initialize operation result with a failure codes / fault string
709     opResult.setResultCode(500);
710     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
711
712     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
713
714     // Grab the current time so we can use it to generate a metrics log.
715     MdcOverride override = getStartTime(new MdcOverride());
716
717     HttpURLConnection conn = initializeConnection(fullUrl);
718
719     try {
720       conn.setRequestMethod("GET");
721     } catch (ProtocolException e) {
722       shutdownConnection(conn);
723       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
724     }
725
726     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
727
728     handleResponse(conn, opResult);
729     buildSearchResult(opResult, indexName);
730
731
732     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
733         new LogFields()
734             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
735             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
736         override,
737         indexName,
738         queryString);
739
740     return opResult;
741   }
742
743   public SearchOperationResult searchWithPayload(String indexName, String query)
744       throws DocumentStoreOperationException {
745     SearchOperationResult opResult = new SearchOperationResult();
746
747     if (logger.isDebugEnabled()) {
748       logger.debug("Querying index: " + indexName + " with query string: " + query);
749     }
750
751     // Initialize operation result with a failure codes / fault string
752     opResult.setResultCode(500);
753     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
754
755     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
756
757     // Grab the current time so we can use it to generate a metrics log.
758     MdcOverride override = getStartTime(new MdcOverride());
759
760     HttpURLConnection conn = initializeConnection(fullUrl);
761
762     try {
763       conn.setRequestMethod("POST");
764     } catch (ProtocolException e) {
765       shutdownConnection(conn);
766       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
767     }
768
769     attachContent(conn, query);
770
771     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
772     logger.debug("Request body =  Elasticsearch query = " + query);
773
774     handleResponse(conn, opResult);
775     buildSearchResult(opResult, indexName);
776
777     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
778         new LogFields()
779             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
780             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
781         override,
782         indexName,
783         query);
784
785     shutdownConnection(conn);
786
787     return opResult;
788   }
789
790   private void attachContent(HttpURLConnection conn, String content)
791       throws DocumentStoreOperationException {
792     OutputStream outputStream = null;
793     OutputStreamWriter out = null;
794
795     try {
796       outputStream = conn.getOutputStream();
797     } catch (IOException e) {
798       shutdownConnection(conn);
799       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
800     }
801
802     out = new OutputStreamWriter(outputStream);
803
804     try {
805       out.write(content);
806       out.close();
807     } catch (IOException e) {
808       shutdownConnection(conn);
809       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
810     }
811   }
812
813   private HttpURLConnection initializeConnection(String fullUrl)
814       throws DocumentStoreOperationException {
815     URL url = null;
816     HttpURLConnection conn = null;
817
818     try {
819       url = new URL(fullUrl);
820     } catch (MalformedURLException e) {
821       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
822     }
823
824     try {
825       conn = (HttpURLConnection) url.openConnection();
826       conn.setDoOutput(true);
827     } catch (IOException e) {
828       shutdownConnection(conn);
829       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
830     }
831
832     return conn;
833   }
834
835   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
836       throws DocumentStoreOperationException {
837     int resultCode = 200;
838
839     try {
840       resultCode = conn.getResponseCode();
841     } catch (IOException e) {
842       shutdownConnection(conn);
843       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
844     }
845
846     logger.debug("Response Code : " + resultCode);
847
848     InputStream inputStream = null;
849
850     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
851       inputStream = conn.getErrorStream();
852     } else {
853       try {
854         inputStream = conn.getInputStream();
855       } catch (IOException e) {
856         shutdownConnection(conn);
857         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
858       }
859     }
860
861     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
862     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
863
864     StringBuilder result = new StringBuilder(128);
865     String string = null;
866
867     try {
868       while ((string = bufferedreader.readLine()) != null) {
869         result.append(string).append("\n");
870       }
871     } catch (IOException e) {
872       shutdownConnection(conn);
873       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
874     }
875
876     if (resultCode == Status.CONFLICT.getStatusCode()) {
877       opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
878     } else {
879       opResult.setResultCode(resultCode);
880     }
881     if (logger.isDebugEnabled()) {
882       logger.debug("Raw result string from ElasticSearch = " + result.toString());
883     }
884     opResult.setResult(result.toString());
885     opResult.setResultVersion(extractVersion(result.toString()));
886   }
887
888   private String extractVersion(String result) throws DocumentStoreOperationException {
889
890     JSONParser parser = new JSONParser();
891     String version = null;
892     try {
893       JSONObject root = (JSONObject) parser.parse(result);
894       if (root.get("_version") != null) {
895         version = root.get("_version").toString();
896       }
897
898     } catch (ParseException e) {
899
900       // Not all responses from ElasticSearch include a version, so
901       // if we don't get one back, just return an empty string rather
902       // than trigger a false failure.
903       version = "";
904     }
905     return version;
906   }
907
908   /**
909    * This convenience method gets the current system time and stores
910    * it in an attribute in the supplied {@link MdcOverride} object so
911    * that it can be used later by the metrics logger.
912    *
913    * @param override - The {@link MdcOverride} object to update.
914    * @return - The supplied {@link MdcOverride} object.
915    */
916   private MdcOverride getStartTime(MdcOverride override) {
917
918     // Grab the current time...
919     long startTimeInMs = System.currentTimeMillis();
920
921     // ...and add it as an attribute to the supplied MDC Override
922     // object.
923     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
924     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
925
926     // Return the MdcOverride object that we were passed.
927     // This looks odd, but it allows us to do stuff like:
928     //
929     //    MdcOverride ov = getStartTime(new MdcOverride())
930     //
931     // which is quite handy, but also allows us to pass in an existing
932     // MdcOverride object which already has some attributes set.
933     return override;
934   }
935
936   private boolean isSuccess(OperationResult result) {
937
938     return isSuccessCode(result.getResultCode());
939   }
940
941
942   private boolean isSuccessCode(int statusCode) {
943     return ((statusCode >= 200) && (statusCode < 300));
944   }
945
946
947   @Override
948   public OperationResult performBulkOperations(BulkRequest[] requests)
949       throws DocumentStoreOperationException {
950
951     if (logger.isDebugEnabled()) {
952       String dbgString = "ESController: performBulkOperations - Operations: ";
953
954       for (BulkRequest request : requests) {
955         dbgString += "[" + request.toString() + "] ";
956       }
957
958       logger.debug(dbgString);
959     }
960
961     // Grab the current time so we can use it to generate a metrics log.
962     MdcOverride override = getStartTime(new MdcOverride());
963
964     // Parse the supplied set of operations.
965     // Iterate over the list of operations which we were provided and
966     // translate them into a format that ElasticSearh understands.
967     int opCount = 0;
968     StringBuilder esOperationSet = new StringBuilder(128);
969     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
970     for (BulkRequest request : requests) {
971
972       // Convert the request to the syntax ElasticSearch likes.
973       if (buildEsOperation(request, esOperationSet, rejected)) {
974         opCount++;
975       }
976     }
977
978     ElasticSearchBulkOperationResult opResult = null;
979     if (opCount > 0) {
980
981       // Open an HTTP connection to the ElasticSearch back end.
982       String fullUrl = getFullUrl("/_bulk", false);
983       URL url;
984       HttpURLConnection conn;
985       try {
986
987         url = new URL(fullUrl);
988         conn = (HttpURLConnection) url.openConnection();
989         conn.setRequestMethod("PUT");
990         conn.setDoOutput(true);
991         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
992         conn.setRequestProperty("Connection", "Close");
993
994       } catch (IOException e) {
995
996         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
997         if (logger.isDebugEnabled()) {
998           logger.debug(Throwables.getStackTraceAsString(e));
999         }
1000
1001         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
1002             + e.getMessage(), e);
1003       }
1004
1005       StringBuilder bulkResult = new StringBuilder(128);
1006       try {
1007         // Create an output stream to write our request to.
1008         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1009         ;
1010
1011         if (logger.isDebugEnabled()) {
1012           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1013           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1014               "\\n"));
1015         }
1016
1017         // Write the resulting request string to our output stream. (this sends the request to ES?)
1018         out.write(esOperationSet.toString());
1019         out.close();
1020
1021         // Open an input stream on our connection in order to read back the results.
1022         InputStream is = conn.getInputStream();
1023         InputStreamReader inputstreamreader = new InputStreamReader(is);
1024         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1025
1026         // Read the contents of the input stream into our result string...
1027         String esResponseString = null;
1028
1029         while ((esResponseString = bufferedreader.readLine()) != null) {
1030           bulkResult.append(esResponseString).append("\n");
1031         }
1032
1033       } catch (IOException e) {
1034
1035         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1036         if (logger.isDebugEnabled()) {
1037           StringWriter sw = new StringWriter();
1038           e.printStackTrace(new PrintWriter(sw));
1039           logger.debug(sw.toString());
1040         }
1041
1042         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1043             + e.getMessage(), e);
1044       }
1045
1046       if (logger.isDebugEnabled()) {
1047         logger.debug("ESController: Received result string from ElasticSearch: = "
1048             + bulkResult.toString());
1049       }
1050
1051       // ...and marshal the resulting string into a Java object.
1052       try {
1053         opResult = marshallEsBulkResult(bulkResult.toString());
1054
1055       } catch (IOException e) {
1056
1057         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1058         if (logger.isDebugEnabled()) {
1059           logger.debug(Throwables.getStackTraceAsString(e));
1060         }
1061
1062         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1063             + e.getMessage(), e);
1064       }
1065     }
1066
1067     // Finally, build the operation result and return it to the caller.
1068     OperationResult result = new OperationResult();
1069     result.setResultCode(207);
1070     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1071
1072     // In the success case we don't want the entire result string to be
1073     // dumped into the metrics log, so concatenate it.
1074     String resultStringForMetricsLog = result.getResult();
1075     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1076       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1077           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1078     }
1079
1080     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1081         new LogFields()
1082             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1083             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1084         override);
1085
1086     return result;
1087   }
1088
1089
1090   /**
1091    * This method converts a {@link BulkRequest} object into a json structure
1092    * which can be understood by ElasticSearch.
1093    *
1094    * @param request - The request to be performed.
1095    * @param sb      - The string builder to append the json data to
1096    * @throws DocumentStoreOperationException
1097    */
1098   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1099                                    List<ElasticSearchResultItem> fails)
1100       throws DocumentStoreOperationException {
1101
1102     boolean retVal = true;
1103     OperationResult indexExistsResult = null;
1104
1105     // What kind of operation are we performing?
1106     switch (request.getOperationType()) {
1107
1108       // Create a new document.
1109       case CREATE:
1110
1111         // Make sure that we were supplied a document payload.
1112         if (request.getOperation().getDocument() == null) {
1113
1114           fails.add(generateRejectionEntry(request.getOperationType(),
1115               "Missing document payload",
1116               request.getIndex(),
1117               request.getId(),
1118               400,
1119               request.getOperation().getMetaData().getUrl()));
1120           return false;
1121         }
1122
1123         // Make sure that the supplied document URL is formatted
1124         // correctly.
1125         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1126           fails.add(generateRejectionEntry(request.getOperationType(),
1127               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1128               request.getIndex(),
1129               "",
1130               400,
1131               request.getOperation().getMetaData().getUrl()));
1132           return false;
1133         }
1134
1135         // Validate that the specified index actually exists before we
1136         // try to perform the create.
1137         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1138
1139           fails.add(generateRejectionEntry(request.getOperationType(),
1140               "Specified resource does not exist: "
1141                   + request.getOperation().getMetaData().getUrl(),
1142               request.getIndex(),
1143               request.getId(),
1144               404,
1145               request.getOperation().getMetaData().getUrl()));
1146           return false;
1147         }
1148
1149         // If we were supplied an id for the new document, then
1150         // include it in the bulk operation to Elastic Search
1151         if (request.getId() == null) {
1152
1153           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1154               request.getIndex(),
1155               DEFAULT_TYPE));
1156
1157           // Otherwise, we just leave that parameter off and ElasticSearch
1158           // will generate one for us.
1159         } else {
1160           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1161               request.getIndex(),
1162               DEFAULT_TYPE,
1163               request.getId()));
1164         }
1165
1166         try {
1167           // Append the document that we want to create.
1168           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1169         } catch (JsonProcessingException e) {
1170           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1171         }
1172
1173         break;
1174
1175       // Update an existing document.
1176       case UPDATE:
1177
1178         // Make sure that we were supplied a document payload.
1179         if (request.getOperation().getDocument() == null) {
1180
1181           fails.add(generateRejectionEntry(request.getOperationType(),
1182               "Missing document payload",
1183               request.getIndex(),
1184               request.getId(),
1185               400,
1186               request.getOperation().getMetaData().getUrl()));
1187           return false;
1188         }
1189
1190         // Make sure that the supplied document URL is formatted
1191         // correctly.
1192         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1193           fails.add(generateRejectionEntry(request.getOperationType(),
1194               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1195               request.getIndex(),
1196               "",
1197               400,
1198               request.getOperation().getMetaData().getUrl()));
1199           return false;
1200         }
1201
1202         // Validate that the specified index actually exists before we
1203         // try to perform the update.
1204         if (!indexExists(request.getIndex())) {
1205
1206           fails.add(generateRejectionEntry(request.getOperationType(),
1207               "Specified resource does not exist: "
1208                   + request.getOperation().getMetaData().getUrl(),
1209               request.getIndex(),
1210               request.getId(),
1211               404,
1212               request.getOperation().getMetaData().getUrl()));
1213           return false;
1214         }
1215
1216         // Validate that the document we are trying to update actually
1217         // exists before we try to perform the update.
1218         if (!documentExists(request.getIndex(), request.getId())) {
1219
1220           fails.add(generateRejectionEntry(request.getOperationType(),
1221               "Specified resource does not exist: "
1222                   + request.getOperation().getMetaData().getUrl(),
1223               request.getIndex(),
1224               request.getId(),
1225               404,
1226               request.getOperation().getMetaData().getUrl()));
1227           return false;
1228         }
1229
1230         // It is mandatory that a version be supplied for an update operation,
1231         // so validate that now.
1232         if (request.getOperation().getMetaData().getEtag() == null) {
1233
1234           fails.add(generateRejectionEntry(request.getOperationType(),
1235               "Missing mandatory ETag field",
1236               request.getIndex(),
1237               request.getId(),
1238               400,
1239               request.getOperation().getMetaData().getUrl()));
1240           return false;
1241         }
1242
1243         // Generate the update request...
1244         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1245             request.getIndex(),
1246             DEFAULT_TYPE,
1247             request.getId(),
1248             request.getOperation().getMetaData().getEtag()));
1249
1250         // ...and append the document that we want to update.
1251         try {
1252           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1253         } catch (JsonProcessingException e) {
1254           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1255         }
1256         break;
1257
1258       // Delete an existing document.
1259       case DELETE:
1260
1261         // Make sure that the supplied document URL is formatted
1262         // correctly.
1263         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1264           fails.add(generateRejectionEntry(request.getOperationType(),
1265               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1266               request.getIndex(),
1267               "",
1268               400,
1269               request.getOperation().getMetaData().getUrl()));
1270           return false;
1271         }
1272
1273         // Validate that the specified index actually exists before we
1274         // try to perform the delete.
1275         if (!indexExists(request.getIndex())) {
1276
1277           fails.add(generateRejectionEntry(request.getOperationType(),
1278               "Specified resource does not exist: "
1279                   + request.getOperation().getMetaData().getUrl(),
1280               request.getIndex(),
1281               request.getId(),
1282               404,
1283               request.getOperation().getMetaData().getUrl()));
1284           return false;
1285         }
1286
1287         // Validate that the document we are trying to update actually
1288         // exists before we try to perform the delete.
1289         if (!documentExists(request.getIndex(), request.getId())) {
1290
1291           fails.add(generateRejectionEntry(request.getOperationType(),
1292               "Specified resource does not exist: "
1293                   + request.getOperation().getMetaData().getUrl(),
1294               request.getIndex(),
1295               request.getId(),
1296               404,
1297               request.getOperation().getMetaData().getUrl()));
1298           return false;
1299         }
1300
1301         // It is mandatory that a version be supplied for a delete operation,
1302         // so validate that now.
1303         if (request.getOperation().getMetaData().getEtag() == null) {
1304
1305           fails.add(generateRejectionEntry(request.getOperationType(),
1306               "Missing mandatory ETag field",
1307               request.getIndex(),
1308               request.getId(),
1309               400,
1310               request.getOperation().getMetaData().getUrl()));
1311           return false;
1312         }
1313
1314         // Generate the delete request.
1315         sb.append(String.format(BULK_DELETE_TEMPLATE,
1316             request.getIndex(),
1317             DEFAULT_TYPE,
1318             request.getId(),
1319             request.getOperation().getMetaData().getEtag()));
1320         break;
1321       default:
1322     }
1323
1324     return retVal;
1325   }
1326
1327   private boolean indexExists(String index) throws DocumentStoreOperationException {
1328
1329     OperationResult indexExistsResult = checkIndexExistence(index);
1330
1331     return ((indexExistsResult.getResultCode() >= 200)
1332         && (indexExistsResult.getResultCode() < 300));
1333   }
1334
1335   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1336
1337     OperationResult docExistsResult = checkDocumentExistence(index, id);
1338
1339     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1340   }
1341
1342   /**
1343    * This method constructs a status entry for a bulk operation which has
1344    * been rejected before even sending it to the document store.
1345    *
1346    * @param rejectReason - A message describing why the operation was rejected.
1347    * @param anId         - The identifier associated with the document being
1348    *                     acted on.
1349    * @param statusCode  - An HTTP status code.
1350    * @return - A result set item.
1351    */
1352   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1353                                                          String rejectReason,
1354                                                          String index,
1355                                                          String anId,
1356                                                          int statusCode,
1357                                                          String originalUrl) {
1358
1359     ElasticSearchError err = new ElasticSearchError();
1360     err.setReason(rejectReason);
1361
1362     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1363     op.setIndex(index);
1364     op.setId(anId);
1365     op.setStatus(statusCode);
1366     op.setError(err);
1367     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1368
1369     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1370
1371     switch (opType) {
1372       case CREATE:
1373         rejectionResult.setCreate(op);
1374         break;
1375       case UPDATE:
1376         rejectionResult.setIndex(op);
1377         break;
1378       case DELETE:
1379         rejectionResult.setDelete(op);
1380         break;
1381       default:
1382     }
1383
1384     return rejectionResult;
1385   }
1386
1387
1388   /**
1389    * This method takes the json structure returned from ElasticSearch in
1390    * response to a bulk operations request and marshals it into a Java
1391    * object.
1392    *
1393    * @param jsonResult - The bulk operations response returned from
1394    *                   ElasticSearch.
1395    * @return - The marshalled response.
1396    * @throws JsonParseException
1397    * @throws JsonMappingException
1398    * @throws IOException
1399    */
1400   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1401       throws JsonParseException, JsonMappingException, IOException {
1402
1403     if (jsonResult != null) {
1404       if (logger.isDebugEnabled()) {
1405         logger.debug("ESController: Marshalling ES result set from json: "
1406             + jsonResult.replaceAll("\n", ""));
1407       }
1408
1409       ObjectMapper mapper = new ObjectMapper();
1410       mapper.setSerializationInclusion(Include.NON_EMPTY);
1411
1412       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1413     }
1414
1415     return null;
1416   }
1417
1418
1419   /**
1420    * This method takes the marshalled ElasticSearch bulk response and
1421    * converts it into a generic response payload.
1422    *
1423    * @param esResult - ElasticSearch bulk operations response.
1424    * @return - A generic result set.
1425    */
1426   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1427                                            List<ElasticSearchResultItem> rejectedOps) {
1428
1429     int totalOps = 0;
1430     int totalSuccess = 0;
1431     int totalFails = 0;
1432
1433     if (logger.isDebugEnabled()) {
1434
1435       logger.debug("ESController: Build generic result set.  ES Results: "
1436           + ((esResult != null) ? esResult.toString() : "[]")
1437           + " Rejected Ops: " + rejectedOps.toString());
1438     }
1439
1440     // Build a combined list of result items from the results returned
1441     // from ElasticSearch and the list of operations that we rejected
1442     // without sending to ElasticSearch.
1443     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1444     if (esResult != null) {
1445       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1446     }
1447     combinedResults.addAll(rejectedOps);
1448
1449     // Iterate over the individual results in the resulting result set.
1450     StringBuilder resultsBuilder = new StringBuilder();
1451     AtomicBoolean firstItem = new AtomicBoolean(true);
1452     for (ElasticSearchResultItem item : combinedResults) {
1453
1454       // Increment the operation counts.
1455       totalOps++;
1456       if (isSuccessCode(item.operationStatus().getStatus())) {
1457         totalSuccess++;
1458       } else {
1459         totalFails++;
1460       }
1461
1462       // Prepend a comma to our response string unless this it the
1463       // first result in the set.
1464       if (!firstItem.compareAndSet(true, false)) {
1465         resultsBuilder.append(", ");
1466       }
1467
1468       // Append the current result as a generic json structure.
1469       resultsBuilder.append(item.toJson());
1470     }
1471
1472     // Now, build the result string and return it.
1473     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1474         + "\"total_success\": " + totalSuccess + ", "
1475         + "\"total_fails\": " + totalFails + ", "
1476         + "\"results\": ["
1477         + resultsBuilder.toString()
1478         + "]}";
1479
1480     return responseBody;
1481   }
1482
1483
1484   /**
1485    * This method queryies ElasticSearch to determine if the supplied
1486    * index is present in the document store.
1487    *
1488    * @param indexName - The index to look for.
1489    * @return - An operation result indicating the success or failure of
1490    * the check.
1491    * @throws DocumentStoreOperationException
1492    */
1493   public OperationResult checkIndexExistence(String indexName)
1494       throws DocumentStoreOperationException {
1495
1496     // Initialize operation result with a failure codes / fault string
1497     OperationResult opResult = new OperationResult();
1498     opResult.setResultCode(500);
1499
1500     // Grab the current time so we can use it to generate a metrics log.
1501     MdcOverride override = getStartTime(new MdcOverride());
1502
1503     String fullUrl = getFullUrl("/" + indexName, false);
1504     HttpURLConnection conn = initializeConnection(fullUrl);
1505
1506     try {
1507       conn.setRequestMethod("HEAD");
1508
1509     } catch (ProtocolException e) {
1510       shutdownConnection(conn);
1511       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1512     }
1513
1514     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1515
1516     int resultCode;
1517     try {
1518       resultCode = conn.getResponseCode();
1519     } catch (IOException e) {
1520       shutdownConnection(conn);
1521       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1522     }
1523     logger.debug("Response Code : " + resultCode);
1524
1525     opResult.setResultCode(resultCode);
1526
1527     // Generate a metrics log so we can track how long the operation took.
1528     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1529         new LogFields()
1530             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1531             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1532         override,
1533         indexName);
1534
1535     shutdownConnection(conn);
1536
1537     return opResult;
1538   }
1539
1540
1541   private void buildDocumentResult(DocumentOperationResult result, String index)
1542       throws DocumentStoreOperationException {
1543
1544     JSONParser parser = new JSONParser();
1545     JSONObject root;
1546     try {
1547       root = (JSONObject) parser.parse(result.getResult());
1548
1549       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1550         // Success response object
1551         Document doc = new Document();
1552         doc.setEtag(result.getResultVersion());
1553         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1554
1555         doc.setContent((JSONObject) root.get("_source"));
1556         result.setDocument(doc);
1557
1558       } else {
1559         // Error response object
1560         JSONObject error = (JSONObject) root.get("error");
1561         if (error != null) {
1562           result.setError(new ErrorResult(error.get("type").toString(),
1563               error.get("reason").toString()));
1564         }
1565
1566       }
1567     } catch (Exception e) {
1568       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1569           + result.getResult());
1570     }
1571
1572
1573   }
1574
1575   private String buildDocumentResponseUrl(String index, String id) {
1576     return ApiUtils.buildDocumentUri(index, id);
1577   }
1578
1579   private void buildSearchResult(SearchOperationResult result, String index)
1580       throws DocumentStoreOperationException {
1581
1582     JSONParser parser = new JSONParser();
1583     JSONObject root;
1584
1585     try {
1586       root = (JSONObject) parser.parse(result.getResult());
1587       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1588         JSONObject hits = (JSONObject) root.get("hits");
1589         JSONArray hitArray = (JSONArray) hits.get("hits");
1590         SearchHits searchHits = new SearchHits();
1591         searchHits.setTotalHits(hits.get("total").toString());
1592         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1593
1594         for (int i = 0; i < hitArray.size(); i++) {
1595           JSONObject hit = (JSONObject) hitArray.get(i);
1596           SearchHit searchHit = new SearchHit();
1597           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1598           Document doc = new Document();
1599           if (hit.get("_version") != null) {
1600             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1601           }
1602
1603           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1604               ? hit.get("_id").toString() : ""));
1605           doc.setContent((JSONObject) hit.get("_source"));
1606           searchHit.setDocument(doc);
1607           searchHitArray.add(searchHit);
1608         }
1609         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1610         result.setSearchResult(searchHits);
1611
1612         JSONObject aggregations = (JSONObject) root.get("aggregations");
1613         if (aggregations != null) {
1614           AggregationResult[] aggResults =
1615               AggregationParsingUtil.parseAggregationResults(aggregations);
1616           AggregationResults aggs = new AggregationResults();
1617           aggs.setAggregations(aggResults);
1618           result.setAggregationResult(aggs);
1619         }
1620
1621         // success
1622       } else {
1623         JSONObject error = (JSONObject) root.get("error");
1624         if (error != null) {
1625           result.setError(new ErrorResult(error.get("type").toString(),
1626               error.get("reason").toString()));
1627         }
1628       }
1629     } catch (Exception e) {
1630       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1631           + result.getResult());
1632     }
1633
1634   }
1635
1636 }