3c19610ef05bf68a3b8c31adb7982ea7731dd2b4
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
22
23 import com.att.aft.dme2.internal.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import edu.emory.mathcs.backport.java.util.Arrays;
30 import org.json.simple.JSONArray;
31 import org.json.simple.JSONObject;
32 import org.json.simple.parser.JSONParser;
33 import org.json.simple.parser.ParseException;
34 import org.onap.aai.sa.rest.AnalysisConfiguration;
35 import org.onap.aai.sa.rest.ApiUtils;
36 import org.onap.aai.sa.rest.BulkRequest;
37 import org.onap.aai.sa.rest.BulkRequest.OperationType;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
39 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
41 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
42 import org.onap.aai.sa.searchdbabstraction.entity.Document;
43 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
48 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
49 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
50 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
52 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
53 import org.onap.aai.cl.api.LogFields;
54 import org.onap.aai.cl.api.LogLine;
55 import org.onap.aai.cl.api.Logger;
56 import org.onap.aai.cl.eelf.LoggerFactory;
57 import org.onap.aai.cl.mdc.MdcContext;
58 import org.onap.aai.cl.mdc.MdcOverride;
59 import org.onap.aai.sa.rest.DocumentSchema;
60
61 import java.io.BufferedReader;
62 import java.io.File;
63 import java.io.FileInputStream;
64 import java.io.IOException;
65 import java.io.InputStream;
66 import java.io.InputStreamReader;
67 import java.io.OutputStream;
68 import java.io.OutputStreamWriter;
69 import java.io.PrintWriter;
70 import java.io.StringWriter;
71 import java.net.HttpURLConnection;
72 import java.net.MalformedURLException;
73 import java.net.ProtocolException;
74 import java.net.URL;
75 import java.text.SimpleDateFormat;
76 import java.util.ArrayList;
77 import java.util.List;
78 import java.util.Properties;
79 import java.util.concurrent.atomic.AtomicBoolean;
80 import javax.ws.rs.core.Response.Status;
81
82
83 /**
84  * This class has the Elasticsearch implementation of the
85  * DB operations defined in DocumentStoreInterface.
86  */
87 public class ElasticSearchHttpController implements DocumentStoreInterface {
88
89   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
90       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
91   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
92       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
93   private static final String BULK_IMPORT_INDEX_TEMPLATE =
94       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
95   private static final String BULK_DELETE_TEMPLATE =
96       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
97
98   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
99       "Internal Error: ElasticSearch operation fault occurred";
100   private static final Logger logger = LoggerFactory.getInstance()
101       .getLogger(ElasticSearchHttpController.class.getName());
102   private static final Logger metricsLogger = LoggerFactory.getInstance()
103       .getMetricsLogger(ElasticSearchHttpController.class.getName());
104   private final ElasticSearchConfig config;
105
106   private static final String DEFAULT_TYPE = "default";
107
108   private static ElasticSearchHttpController instance = null;
109
110   protected AnalysisConfiguration analysisConfig;
111
112   public static ElasticSearchHttpController getInstance() {
113
114     synchronized (ElasticSearchHttpController.class) {
115
116       if (instance == null) {
117
118         Properties properties = new Properties();
119         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
120         try {
121           properties.load(new FileInputStream(file));
122         } catch (Exception e) {
123           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
124             "ElasticSearchHTTPController.getInstance",
125             e.getLocalizedMessage());
126         }
127
128         ElasticSearchConfig config = new ElasticSearchConfig(properties);
129         instance = new ElasticSearchHttpController(config);
130       }
131     }
132
133     return instance;
134   }
135
136   public ElasticSearchHttpController(ElasticSearchConfig config) {
137     this.config = config;
138     analysisConfig = new AnalysisConfiguration();
139
140     try {
141       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
142       checkConnection();
143       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
144     } catch (Exception e) {
145       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
146           getFullUrl("", false), e.getMessage());
147     }
148   }
149
150
151   public AnalysisConfiguration getAnalysisConfig() {
152     return analysisConfig;
153   }
154
155   @Override
156   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
157
158     OperationResult result = new OperationResult();
159     result.setResultCode(500);
160
161     try {
162
163       // Submit the request to ElasticSearch to create the index using a
164       // default document type.
165       result = createTable(index,
166           DEFAULT_TYPE,
167           analysisConfig.getEsIndexSettings(),
168           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
169
170       // ElasticSearch will return us a 200 code on success when we
171       // want to report a 201, so translate the result here.
172       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
173       if (isSuccess(result)) {
174         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
175         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
176       }
177
178     } catch (DocumentStoreOperationException e) {
179
180       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
181     }
182
183     return result;
184   }
185
186
187   @Override
188   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
189
190     //Initialize operation result with a failure codes / fault string
191     OperationResult opResult = new OperationResult();
192     opResult.setResultCode(500);
193     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
194
195     // Grab the current time so we can use it to generate a metrics log.
196     MdcOverride override = getStartTime(new MdcOverride());
197
198     String fullUrl = getFullUrl("/" + indexName + "/", false);
199     HttpURLConnection conn = initializeConnection(fullUrl);
200
201     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
202
203     try {
204       conn.setRequestMethod("DELETE");
205     } catch (ProtocolException e) {
206       shutdownConnection(conn);
207       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
208     }
209
210     handleResponse(conn, opResult);
211
212     // Generate a metrics log so we can track how long the operation took.
213     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
214         new LogFields()
215             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
216             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
217         override,
218         indexName);
219
220     shutdownConnection(conn);
221
222     return opResult;
223   }
224
225
226   private OperationResult checkConnection() throws Exception {
227
228     String fullUrl = getFullUrl("/_cluster/health", false);
229     URL url = null;
230     HttpURLConnection conn = null;
231
232     url = new URL(fullUrl);
233     conn = (HttpURLConnection) url.openConnection();
234     conn.setRequestMethod("GET");
235     conn.setDoOutput(true);
236     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
237
238     int resultCode = conn.getResponseCode();
239     logger.debug("getClusterHealth() response Code : " + resultCode);
240     OperationResult opResult = new OperationResult();
241     opResult.setResultCode(resultCode);
242
243     shutdownConnection(conn);
244
245     return opResult;
246   }
247
248   private String getFullUrl(String resourceUrl, boolean isSecure) {
249
250     final String host = config.getIpAddress();
251     final String port = config.getHttpPort();
252
253     if (isSecure) {
254       return String.format("https://%s:%s%s", host, port, resourceUrl);
255     } else {
256       return String.format("http://%s:%s%s", host, port, resourceUrl);
257     }
258   }
259
260   private void shutdownConnection(HttpURLConnection connection) {
261     if (connection == null) {
262       return;
263     }
264
265     InputStream inputstream = null;
266     OutputStream outputstream = null;
267
268     try {
269       inputstream = connection.getInputStream();
270     } catch (IOException e) {
271       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
272     } finally {
273       if (inputstream != null) {
274         try {
275           inputstream.close();
276         } catch (IOException e) {
277           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
278               e.getLocalizedMessage());
279         }
280       }
281     }
282
283     try {
284       outputstream = connection.getOutputStream();
285     } catch (IOException e) {
286       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
287     } finally {
288       if (outputstream != null) {
289         try {
290           outputstream.close();
291         } catch (IOException e) {
292           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
293               e.getLocalizedMessage());
294         }
295       }
296     }
297
298     connection.disconnect();
299   }
300
301   //@Override
302   protected OperationResult createTable(String indexName, String typeName,
303                                         String indexSettings, String indexMappings)
304       throws DocumentStoreOperationException {
305
306     if (indexSettings == null) {
307       logger.debug("No settings provided.");
308     }
309
310     if (indexMappings == null) {
311       logger.debug("No mappings provided.");
312     }
313
314     OperationResult opResult = new OperationResult();
315
316     // Initialize operation result with a failure codes / fault string
317     opResult.setResultCode(500);
318     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
319
320     // Grab the current time so we can use it to generate a metrics log.
321     MdcOverride override = getStartTime(new MdcOverride());
322
323     String fullUrl = getFullUrl("/" + indexName + "/", false);
324     HttpURLConnection conn = initializeConnection(fullUrl);
325
326     try {
327       conn.setRequestMethod("PUT");
328     } catch (ProtocolException e) {
329       shutdownConnection(conn);
330       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
331     }
332
333     StringBuilder sb = new StringBuilder(128);
334     sb.append("{ \"settings\" : ");
335     sb.append(indexSettings);
336     sb.append(",");
337
338     sb.append("\"mappings\" : {");
339     sb.append("\"" + typeName + "\" :");
340     sb.append(indexMappings);
341     sb.append("}}");
342
343     attachContent(conn, sb.toString());
344
345     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
346     logger.debug("Request content: " + sb.toString());
347
348     handleResponse(conn, opResult);
349
350     shutdownConnection(conn);
351
352     // Generate a metrics log so we can track how long the operation took.
353     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
354         new LogFields()
355             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
356             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
357         override,
358         indexName);
359
360     return opResult;
361   }
362
363   @Override
364   public DocumentOperationResult createDocument(String                  indexName, 
365                                                 DocumentStoreDataEntity document,
366                                                 boolean                 allowImplicitIndexCreation)
367       throws DocumentStoreOperationException {
368     
369     if(!allowImplicitIndexCreation) {
370       
371       // Before we do anything, make sure that the specified index actually exists in the
372       // document store - we don't want to rely on ElasticSearch to fail the document
373       // create because it could be configured to implicitly create a non-existent index,
374       // which can lead to hard-to-debug behaviour with queries down the road.
375       OperationResult indexExistsResult = checkIndexExistence(indexName);
376       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
377   
378         DocumentOperationResult opResult = new DocumentOperationResult();
379         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
380         opResult.setResult("Document Index '" + indexName + "' does not exist.");
381         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
382         return opResult;
383       }
384     }
385     
386     if (document.getId() == null || document.getId().isEmpty()) {
387       return createDocumentWithoutId(indexName, document);
388     } else {
389       return createDocumentWithId(indexName, document);
390     }
391   }
392
393   private DocumentOperationResult createDocumentWithId(String indexName,
394                                                        DocumentStoreDataEntity document)
395       throws DocumentStoreOperationException {
396     // check if the document already exists
397     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
398
399
400     if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
401       if (opResult.getResultCode() == Status.OK.getStatusCode()) {
402         opResult.setFailureCause("A document with the same id already exists.");
403       } else {
404         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
405       }
406       opResult.setResultCode(Status.CONFLICT.getStatusCode());
407       return opResult;
408     }
409
410     opResult = new DocumentOperationResult();
411     // Initialize operation result with a failure codes / fault string
412     opResult.setResultCode(500);
413     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
414
415     // Grab the current time so we can use it to generate a metrics log.
416     MdcOverride override = getStartTime(new MdcOverride());
417
418     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
419         + "/" + document.getId(), false);
420     HttpURLConnection conn = initializeConnection(fullUrl);
421
422     try {
423       conn.setRequestMethod("PUT");
424     } catch (ProtocolException e) {
425       shutdownConnection(conn);
426       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
427     }
428
429     attachDocument(conn, document);
430
431     logger.debug("Sending 'PUT' request to: " + conn.getURL());
432
433     handleResponse(conn, opResult);
434     buildDocumentResult(opResult, indexName);
435
436     // Generate a metrics log so we can track how long the operation took.
437     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
438         new LogFields()
439             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
440             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
441         override,
442         indexName);
443
444     shutdownConnection(conn);
445
446     return opResult;
447
448   }
449
450   private DocumentOperationResult createDocumentWithoutId(String indexName,
451                                                           DocumentStoreDataEntity document)
452       throws DocumentStoreOperationException {
453
454     DocumentOperationResult response = new DocumentOperationResult();
455     // Initialize operation result with a failure codes / fault string
456     response.setResultCode(500);
457     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
458
459     // Grab the current time so we can use it to generate a metrics log.
460     MdcOverride override = getStartTime(new MdcOverride());
461
462     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
463     HttpURLConnection conn = initializeConnection(fullUrl);
464
465     try {
466       conn.setRequestMethod("POST");
467     } catch (ProtocolException e) {
468       shutdownConnection(conn);
469       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
470     }
471
472     attachDocument(conn, document);
473
474     logger.debug("Sending 'POST' request to: " + conn.getURL());
475
476     handleResponse(conn, response);
477     buildDocumentResult(response, indexName);
478
479     // Generate a metrics log so we can track how long the operation took.
480     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
481         new LogFields()
482             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
483             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
484         override,
485         indexName);
486
487     shutdownConnection(conn);
488
489     return response;
490   }
491
492   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
493       throws DocumentStoreOperationException {
494     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
495     conn.setRequestProperty("Connection", "Close");
496
497     attachContent(conn, doc.getContentInJson());
498   }
499
500   private DocumentOperationResult checkDocumentExistence(String indexName,
501                                                          String docId)
502       throws DocumentStoreOperationException {
503     DocumentOperationResult opResult = new DocumentOperationResult();
504
505     // Initialize operation result with a failure codes / fault string
506     opResult.setResultCode(500);
507
508     // Grab the current time so we can use it to generate a metrics log.
509     MdcOverride override = getStartTime(new MdcOverride());
510
511     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
512     HttpURLConnection conn = initializeConnection(fullUrl);
513
514     try {
515       conn.setRequestMethod("HEAD");
516     } catch (ProtocolException e) {
517       shutdownConnection(conn);
518       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
519     }
520
521     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
522
523     int resultCode;
524     try {
525       resultCode = conn.getResponseCode();
526     } catch (IOException e) {
527       shutdownConnection(conn);
528       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
529     }
530
531     logger.debug("Response Code : " + resultCode);
532
533     opResult.setResultCode(resultCode);
534
535     // Generate a metrics log so we can track how long the operation took.
536     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
537         new LogFields()
538             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
539             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
540         override,
541         indexName,
542         docId);
543
544     shutdownConnection(conn);
545
546     return opResult;
547   }
548
549   @Override
550   public DocumentOperationResult updateDocument(String                  indexName, 
551                                                 DocumentStoreDataEntity document,
552                                                 boolean                 allowImplicitIndexCreation)
553       throws DocumentStoreOperationException {
554     
555     if(!allowImplicitIndexCreation) {
556       
557       // Before we do anything, make sure that the specified index actually exists in the
558       // document store - we don't want to rely on ElasticSearch to fail the document
559       // create because it could be configured to implicitly create a non-existent index,
560       // which can lead to hard-to-debug behaviour with queries down the road.
561       OperationResult indexExistsResult = checkIndexExistence(indexName);
562       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
563   
564         DocumentOperationResult opResult = new DocumentOperationResult();
565         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
566         opResult.setResult("Document Index '" + indexName + "' does not exist.");
567         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
568         return opResult;
569       }
570     }
571     
572     DocumentOperationResult opResult = new DocumentOperationResult();
573
574     // Initialize operation result with a failure codes / fault string
575     opResult.setResultCode(500);
576     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
577
578     // Grab the current time so we can use it to generate a metrics log.
579     MdcOverride override = getStartTime(new MdcOverride());
580
581     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
582         + "?version=" + document.getVersion(), false);
583     HttpURLConnection conn = initializeConnection(fullUrl);
584
585     try {
586       conn.setRequestMethod("PUT");
587     } catch (ProtocolException e) {
588       shutdownConnection(conn);
589       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
590     }
591
592     attachDocument(conn, document);
593
594     logger.debug("Sending 'PUT' request to: " + conn.getURL());
595
596     handleResponse(conn, opResult);
597     buildDocumentResult(opResult, indexName);
598
599     // Generate a metrics log so we can track how long the operation took.
600     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
601         new LogFields()
602             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
603             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
604         override,
605         indexName,
606         document.getId());
607
608     shutdownConnection(conn);
609
610     return opResult;
611   }
612
613   @Override
614   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
615       throws DocumentStoreOperationException {
616     DocumentOperationResult opResult = new DocumentOperationResult();
617
618     // Initialize operation result with a failure codes / fault string
619     opResult.setResultCode(500);
620     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
621
622     // Grab the current time so we can use it to generate a metrics log.
623     MdcOverride override = getStartTime(new MdcOverride());
624
625     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
626         + "?version=" + document.getVersion(), false);
627     HttpURLConnection conn = initializeConnection(fullUrl);
628
629     try {
630       conn.setRequestMethod("DELETE");
631     } catch (ProtocolException e) {
632       shutdownConnection(conn);
633       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
634     }
635
636     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
637
638     handleResponse(conn, opResult);
639     buildDocumentResult(opResult, indexName);
640     //supress the etag and url in response for delete as they are not required
641     if (opResult.getDocument() != null) {
642       opResult.getDocument().setEtag(null);
643       opResult.getDocument().setUrl(null);
644     }
645
646     // Generate a metrics log so we can track how long the operation took.
647     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
648         new LogFields()
649             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
650             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
651         override,
652         indexName,
653         document.getId());
654
655     shutdownConnection(conn);
656
657     return opResult;
658   }
659
660   @Override
661   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
662       throws DocumentStoreOperationException {
663     DocumentOperationResult opResult = new DocumentOperationResult();
664
665     // Initialize operation result with a failure codes / fault string
666     opResult.setResultCode(500);
667     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
668
669     // Grab the current time so we can use it to generate a metrics log.
670     MdcOverride override = getStartTime(new MdcOverride());
671
672     String fullUrl = null;
673     if (document.getVersion() == null) {
674       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
675     } else {
676       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
677           + "?version=" + document.getVersion(), false);
678     }
679     HttpURLConnection conn = initializeConnection(fullUrl);
680
681     logger.debug("\nSending 'GET' request to: " + conn.getURL());
682
683     handleResponse(conn, opResult);
684     buildDocumentResult(opResult, indexName);
685
686     // Generate a metrics log so we can track how long the operation took.
687     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
688         new LogFields()
689             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
690             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
691         override,
692         indexName,
693         document.getId());
694
695     shutdownConnection(conn);
696
697     return opResult;
698   }
699
700   public SearchOperationResult search(String indexName, String queryString)
701       throws DocumentStoreOperationException {
702     SearchOperationResult opResult = new SearchOperationResult();
703
704     // Initialize operation result with a failure codes / fault string
705     opResult.setResultCode(500);
706     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
707
708     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
709
710     // Grab the current time so we can use it to generate a metrics log.
711     MdcOverride override = getStartTime(new MdcOverride());
712
713     HttpURLConnection conn = initializeConnection(fullUrl);
714
715     try {
716       conn.setRequestMethod("GET");
717     } catch (ProtocolException e) {
718       shutdownConnection(conn);
719       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
720     }
721
722     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
723
724     handleResponse(conn, opResult);
725     buildSearchResult(opResult, indexName);
726
727
728     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
729         new LogFields()
730             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
731             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
732         override,
733         indexName,
734         queryString);
735
736     return opResult;
737   }
738
739   public SearchOperationResult searchWithPayload(String indexName, String query)
740       throws DocumentStoreOperationException {
741     SearchOperationResult opResult = new SearchOperationResult();
742
743     if (logger.isDebugEnabled()) {
744       logger.debug("Querying index: " + indexName + " with query string: " + query);
745     }
746
747     // Initialize operation result with a failure codes / fault string
748     opResult.setResultCode(500);
749     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
750
751     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
752
753     // Grab the current time so we can use it to generate a metrics log.
754     MdcOverride override = getStartTime(new MdcOverride());
755
756     HttpURLConnection conn = initializeConnection(fullUrl);
757
758     try {
759       conn.setRequestMethod("POST");
760     } catch (ProtocolException e) {
761       shutdownConnection(conn);
762       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
763     }
764
765     attachContent(conn, query);
766
767     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
768     logger.debug("Request body =  Elasticsearch query = " + query);
769
770     handleResponse(conn, opResult);
771     buildSearchResult(opResult, indexName);
772
773     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
774         new LogFields()
775             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
776             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
777         override,
778         indexName,
779         query);
780
781     shutdownConnection(conn);
782
783     return opResult;
784   }
785
786   private void attachContent(HttpURLConnection conn, String content)
787       throws DocumentStoreOperationException {
788     OutputStream outputStream = null;
789     OutputStreamWriter out = null;
790
791     try {
792       outputStream = conn.getOutputStream();
793     } catch (IOException e) {
794       shutdownConnection(conn);
795       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
796     }
797
798     out = new OutputStreamWriter(outputStream);
799
800     try {
801       out.write(content);
802       out.close();
803     } catch (IOException e) {
804       shutdownConnection(conn);
805       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
806     }
807   }
808
809   private HttpURLConnection initializeConnection(String fullUrl)
810       throws DocumentStoreOperationException {
811     URL url = null;
812     HttpURLConnection conn = null;
813
814     try {
815       url = new URL(fullUrl);
816     } catch (MalformedURLException e) {
817       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
818     }
819
820     try {
821       conn = (HttpURLConnection) url.openConnection();
822       conn.setDoOutput(true);
823     } catch (IOException e) {
824       shutdownConnection(conn);
825       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
826     }
827
828     return conn;
829   }
830
831   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
832       throws DocumentStoreOperationException {
833     int resultCode = 200;
834
835     try {
836       resultCode = conn.getResponseCode();
837     } catch (IOException e) {
838       shutdownConnection(conn);
839       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
840     }
841
842     logger.debug("Response Code : " + resultCode);
843
844     InputStream inputStream = null;
845
846     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
847       inputStream = conn.getErrorStream();
848     } else {
849       try {
850         inputStream = conn.getInputStream();
851       } catch (IOException e) {
852         shutdownConnection(conn);
853         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
854       }
855     }
856
857     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
858     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
859
860     StringBuilder result = new StringBuilder(128);
861     String string = null;
862
863     try {
864       while ((string = bufferedreader.readLine()) != null) {
865         result.append(string).append("\n");
866       }
867     } catch (IOException e) {
868       shutdownConnection(conn);
869       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
870     }
871
872     if (resultCode == Status.CONFLICT.getStatusCode()) {
873       opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
874     } else {
875       opResult.setResultCode(resultCode);
876     }
877     if (logger.isDebugEnabled()) {
878       logger.debug("Raw result string from ElasticSearch = " + result.toString());
879     }
880     opResult.setResult(result.toString());
881     opResult.setResultVersion(extractVersion(result.toString()));
882   }
883
884   private String extractVersion(String result) throws DocumentStoreOperationException {
885
886     JSONParser parser = new JSONParser();
887     String version = null;
888     try {
889       JSONObject root = (JSONObject) parser.parse(result);
890       if (root.get("_version") != null) {
891         version = root.get("_version").toString();
892       }
893
894     } catch (ParseException e) {
895
896       // Not all responses from ElasticSearch include a version, so
897       // if we don't get one back, just return an empty string rather
898       // than trigger a false failure.
899       version = "";
900     }
901     return version;
902   }
903
904   /**
905    * This convenience method gets the current system time and stores
906    * it in an attribute in the supplied {@link MdcOverride} object so
907    * that it can be used later by the metrics logger.
908    *
909    * @param override - The {@link MdcOverride} object to update.
910    * @return - The supplied {@link MdcOverride} object.
911    */
912   private MdcOverride getStartTime(MdcOverride override) {
913
914     // Grab the current time...
915     long startTimeInMs = System.currentTimeMillis();
916
917     // ...and add it as an attribute to the supplied MDC Override
918     // object.
919     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
920     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
921
922     // Return the MdcOverride object that we were passed.
923     // This looks odd, but it allows us to do stuff like:
924     //
925     //    MdcOverride ov = getStartTime(new MdcOverride())
926     //
927     // which is quite handy, but also allows us to pass in an existing
928     // MdcOverride object which already has some attributes set.
929     return override;
930   }
931
932   private boolean isSuccess(OperationResult result) {
933
934     return isSuccessCode(result.getResultCode());
935   }
936
937
938   private boolean isSuccessCode(int statusCode) {
939     return ((statusCode >= 200) && (statusCode < 300));
940   }
941
942
943   @Override
944   public OperationResult performBulkOperations(BulkRequest[] requests)
945       throws DocumentStoreOperationException {
946
947     if (logger.isDebugEnabled()) {
948       String dbgString = "ESController: performBulkOperations - Operations: ";
949
950       for (BulkRequest request : requests) {
951         dbgString += "[" + request.toString() + "] ";
952       }
953
954       logger.debug(dbgString);
955     }
956
957     // Grab the current time so we can use it to generate a metrics log.
958     MdcOverride override = getStartTime(new MdcOverride());
959
960     // Parse the supplied set of operations.
961     // Iterate over the list of operations which we were provided and
962     // translate them into a format that ElasticSearh understands.
963     int opCount = 0;
964     StringBuilder esOperationSet = new StringBuilder(128);
965     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
966     for (BulkRequest request : requests) {
967
968       // Convert the request to the syntax ElasticSearch likes.
969       if (buildEsOperation(request, esOperationSet, rejected)) {
970         opCount++;
971       }
972     }
973
974     ElasticSearchBulkOperationResult opResult = null;
975     if (opCount > 0) {
976
977       // Open an HTTP connection to the ElasticSearch back end.
978       String fullUrl = getFullUrl("/_bulk", false);
979       URL url;
980       HttpURLConnection conn;
981       try {
982
983         url = new URL(fullUrl);
984         conn = (HttpURLConnection) url.openConnection();
985         conn.setRequestMethod("PUT");
986         conn.setDoOutput(true);
987         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
988         conn.setRequestProperty("Connection", "Close");
989
990       } catch (IOException e) {
991
992         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
993         if (logger.isDebugEnabled()) {
994           logger.debug(Throwables.getStackTraceAsString(e));
995         }
996
997         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
998             + e.getMessage(), e);
999       }
1000
1001       StringBuilder bulkResult = new StringBuilder(128);
1002       try {
1003         // Create an output stream to write our request to.
1004         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1005         ;
1006
1007         if (logger.isDebugEnabled()) {
1008           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1009           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1010               "\\n"));
1011         }
1012
1013         // Write the resulting request string to our output stream. (this sends the request to ES?)
1014         out.write(esOperationSet.toString());
1015         out.close();
1016
1017         // Open an input stream on our connection in order to read back the results.
1018         InputStream is = conn.getInputStream();
1019         InputStreamReader inputstreamreader = new InputStreamReader(is);
1020         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1021
1022         // Read the contents of the input stream into our result string...
1023         String esResponseString = null;
1024
1025         while ((esResponseString = bufferedreader.readLine()) != null) {
1026           bulkResult.append(esResponseString).append("\n");
1027         }
1028
1029       } catch (IOException e) {
1030
1031         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1032         if (logger.isDebugEnabled()) {
1033           StringWriter sw = new StringWriter();
1034           e.printStackTrace(new PrintWriter(sw));
1035           logger.debug(sw.toString());
1036         }
1037
1038         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1039             + e.getMessage(), e);
1040       }
1041
1042       if (logger.isDebugEnabled()) {
1043         logger.debug("ESController: Received result string from ElasticSearch: = "
1044             + bulkResult.toString());
1045       }
1046
1047       // ...and marshal the resulting string into a Java object.
1048       try {
1049         opResult = marshallEsBulkResult(bulkResult.toString());
1050
1051       } catch (IOException e) {
1052
1053         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1054         if (logger.isDebugEnabled()) {
1055           logger.debug(Throwables.getStackTraceAsString(e));
1056         }
1057
1058         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1059             + e.getMessage(), e);
1060       }
1061     }
1062
1063     // Finally, build the operation result and return it to the caller.
1064     OperationResult result = new OperationResult();
1065     result.setResultCode(207);
1066     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1067
1068     // In the success case we don't want the entire result string to be
1069     // dumped into the metrics log, so concatenate it.
1070     String resultStringForMetricsLog = result.getResult();
1071     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1072       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1073           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1074     }
1075
1076     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1077         new LogFields()
1078             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1079             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1080         override);
1081
1082     return result;
1083   }
1084
1085
1086   /**
1087    * This method converts a {@link BulkRequest} object into a json structure
1088    * which can be understood by ElasticSearch.
1089    *
1090    * @param request - The request to be performed.
1091    * @param sb      - The string builder to append the json data to
1092    * @throws DocumentStoreOperationException
1093    */
1094   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1095                                    List<ElasticSearchResultItem> fails)
1096       throws DocumentStoreOperationException {
1097
1098     boolean retVal = true;
1099     OperationResult indexExistsResult = null;
1100
1101     // What kind of operation are we performing?
1102     switch (request.getOperationType()) {
1103
1104       // Create a new document.
1105       case CREATE:
1106
1107         // Make sure that we were supplied a document payload.
1108         if (request.getOperation().getDocument() == null) {
1109
1110           fails.add(generateRejectionEntry(request.getOperationType(),
1111               "Missing document payload",
1112               request.getIndex(),
1113               request.getId(),
1114               400,
1115               request.getOperation().getMetaData().getUrl()));
1116           return false;
1117         }
1118
1119         // Make sure that the supplied document URL is formatted
1120         // correctly.
1121         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1122           fails.add(generateRejectionEntry(request.getOperationType(),
1123               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1124               request.getIndex(),
1125               "",
1126               400,
1127               request.getOperation().getMetaData().getUrl()));
1128           return false;
1129         }
1130
1131         // Validate that the specified index actually exists before we
1132         // try to perform the create.
1133         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1134
1135           fails.add(generateRejectionEntry(request.getOperationType(),
1136               "Specified resource does not exist: "
1137                   + request.getOperation().getMetaData().getUrl(),
1138               request.getIndex(),
1139               request.getId(),
1140               404,
1141               request.getOperation().getMetaData().getUrl()));
1142           return false;
1143         }
1144
1145         // If we were supplied an id for the new document, then
1146         // include it in the bulk operation to Elastic Search
1147         if (request.getId() == null) {
1148
1149           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1150               request.getIndex(),
1151               DEFAULT_TYPE));
1152
1153           // Otherwise, we just leave that parameter off and ElasticSearch
1154           // will generate one for us.
1155         } else {
1156           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1157               request.getIndex(),
1158               DEFAULT_TYPE,
1159               request.getId()));
1160         }
1161
1162         try {
1163           // Append the document that we want to create.
1164           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1165         } catch (JsonProcessingException e) {
1166           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1167         }
1168
1169         break;
1170
1171       // Update an existing document.
1172       case UPDATE:
1173
1174         // Make sure that we were supplied a document payload.
1175         if (request.getOperation().getDocument() == null) {
1176
1177           fails.add(generateRejectionEntry(request.getOperationType(),
1178               "Missing document payload",
1179               request.getIndex(),
1180               request.getId(),
1181               400,
1182               request.getOperation().getMetaData().getUrl()));
1183           return false;
1184         }
1185
1186         // Make sure that the supplied document URL is formatted
1187         // correctly.
1188         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1189           fails.add(generateRejectionEntry(request.getOperationType(),
1190               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1191               request.getIndex(),
1192               "",
1193               400,
1194               request.getOperation().getMetaData().getUrl()));
1195           return false;
1196         }
1197
1198         // Validate that the specified index actually exists before we
1199         // try to perform the update.
1200         if (!indexExists(request.getIndex())) {
1201
1202           fails.add(generateRejectionEntry(request.getOperationType(),
1203               "Specified resource does not exist: "
1204                   + request.getOperation().getMetaData().getUrl(),
1205               request.getIndex(),
1206               request.getId(),
1207               404,
1208               request.getOperation().getMetaData().getUrl()));
1209           return false;
1210         }
1211
1212         // Validate that the document we are trying to update actually
1213         // exists before we try to perform the update.
1214         if (!documentExists(request.getIndex(), request.getId())) {
1215
1216           fails.add(generateRejectionEntry(request.getOperationType(),
1217               "Specified resource does not exist: "
1218                   + request.getOperation().getMetaData().getUrl(),
1219               request.getIndex(),
1220               request.getId(),
1221               404,
1222               request.getOperation().getMetaData().getUrl()));
1223           return false;
1224         }
1225
1226         // It is mandatory that a version be supplied for an update operation,
1227         // so validate that now.
1228         if (request.getOperation().getMetaData().getEtag() == null) {
1229
1230           fails.add(generateRejectionEntry(request.getOperationType(),
1231               "Missing mandatory ETag field",
1232               request.getIndex(),
1233               request.getId(),
1234               400,
1235               request.getOperation().getMetaData().getUrl()));
1236           return false;
1237         }
1238
1239         // Generate the update request...
1240         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1241             request.getIndex(),
1242             DEFAULT_TYPE,
1243             request.getId(),
1244             request.getOperation().getMetaData().getEtag()));
1245
1246         // ...and append the document that we want to update.
1247         try {
1248           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1249         } catch (JsonProcessingException e) {
1250           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1251         }
1252         break;
1253
1254       // Delete an existing document.
1255       case DELETE:
1256
1257         // Make sure that the supplied document URL is formatted
1258         // correctly.
1259         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1260           fails.add(generateRejectionEntry(request.getOperationType(),
1261               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1262               request.getIndex(),
1263               "",
1264               400,
1265               request.getOperation().getMetaData().getUrl()));
1266           return false;
1267         }
1268
1269         // Validate that the specified index actually exists before we
1270         // try to perform the delete.
1271         if (!indexExists(request.getIndex())) {
1272
1273           fails.add(generateRejectionEntry(request.getOperationType(),
1274               "Specified resource does not exist: "
1275                   + request.getOperation().getMetaData().getUrl(),
1276               request.getIndex(),
1277               request.getId(),
1278               404,
1279               request.getOperation().getMetaData().getUrl()));
1280           return false;
1281         }
1282
1283         // Validate that the document we are trying to update actually
1284         // exists before we try to perform the delete.
1285         if (!documentExists(request.getIndex(), request.getId())) {
1286
1287           fails.add(generateRejectionEntry(request.getOperationType(),
1288               "Specified resource does not exist: "
1289                   + request.getOperation().getMetaData().getUrl(),
1290               request.getIndex(),
1291               request.getId(),
1292               404,
1293               request.getOperation().getMetaData().getUrl()));
1294           return false;
1295         }
1296
1297         // It is mandatory that a version be supplied for a delete operation,
1298         // so validate that now.
1299         if (request.getOperation().getMetaData().getEtag() == null) {
1300
1301           fails.add(generateRejectionEntry(request.getOperationType(),
1302               "Missing mandatory ETag field",
1303               request.getIndex(),
1304               request.getId(),
1305               400,
1306               request.getOperation().getMetaData().getUrl()));
1307           return false;
1308         }
1309
1310         // Generate the delete request.
1311         sb.append(String.format(BULK_DELETE_TEMPLATE,
1312             request.getIndex(),
1313             DEFAULT_TYPE,
1314             request.getId(),
1315             request.getOperation().getMetaData().getEtag()));
1316         break;
1317       default:
1318     }
1319
1320     return retVal;
1321   }
1322
1323   private boolean indexExists(String index) throws DocumentStoreOperationException {
1324
1325     OperationResult indexExistsResult = checkIndexExistence(index);
1326
1327     return ((indexExistsResult.getResultCode() >= 200)
1328         && (indexExistsResult.getResultCode() < 300));
1329   }
1330
1331   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1332
1333     OperationResult docExistsResult = checkDocumentExistence(index, id);
1334
1335     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1336   }
1337
1338   /**
1339    * This method constructs a status entry for a bulk operation which has
1340    * been rejected before even sending it to the document store.
1341    *
1342    * @param rejectReason - A message describing why the operation was rejected.
1343    * @param anId         - The identifier associated with the document being
1344    *                     acted on.
1345    * @param statusCode  - An HTTP status code.
1346    * @return - A result set item.
1347    */
1348   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1349                                                          String rejectReason,
1350                                                          String index,
1351                                                          String anId,
1352                                                          int statusCode,
1353                                                          String originalUrl) {
1354
1355     ElasticSearchError err = new ElasticSearchError();
1356     err.setReason(rejectReason);
1357
1358     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1359     op.setIndex(index);
1360     op.setId(anId);
1361     op.setStatus(statusCode);
1362     op.setError(err);
1363     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1364
1365     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1366
1367     switch (opType) {
1368       case CREATE:
1369         rejectionResult.setCreate(op);
1370         break;
1371       case UPDATE:
1372         rejectionResult.setIndex(op);
1373         break;
1374       case DELETE:
1375         rejectionResult.setDelete(op);
1376         break;
1377       default:
1378     }
1379
1380     return rejectionResult;
1381   }
1382
1383
1384   /**
1385    * This method takes the json structure returned from ElasticSearch in
1386    * response to a bulk operations request and marshals it into a Java
1387    * object.
1388    *
1389    * @param jsonResult - The bulk operations response returned from
1390    *                   ElasticSearch.
1391    * @return - The marshalled response.
1392    * @throws JsonParseException
1393    * @throws JsonMappingException
1394    * @throws IOException
1395    */
1396   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1397       throws JsonParseException, JsonMappingException, IOException {
1398
1399     if (jsonResult != null) {
1400       if (logger.isDebugEnabled()) {
1401         logger.debug("ESController: Marshalling ES result set from json: "
1402             + jsonResult.replaceAll("\n", ""));
1403       }
1404
1405       ObjectMapper mapper = new ObjectMapper();
1406       mapper.setSerializationInclusion(Include.NON_EMPTY);
1407
1408       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1409     }
1410
1411     return null;
1412   }
1413
1414
1415   /**
1416    * This method takes the marshalled ElasticSearch bulk response and
1417    * converts it into a generic response payload.
1418    *
1419    * @param esResult - ElasticSearch bulk operations response.
1420    * @return - A generic result set.
1421    */
1422   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1423                                            List<ElasticSearchResultItem> rejectedOps) {
1424
1425     int totalOps = 0;
1426     int totalSuccess = 0;
1427     int totalFails = 0;
1428
1429     if (logger.isDebugEnabled()) {
1430
1431       logger.debug("ESController: Build generic result set.  ES Results: "
1432           + ((esResult != null) ? esResult.toString() : "[]")
1433           + " Rejected Ops: " + rejectedOps.toString());
1434     }
1435
1436     // Build a combined list of result items from the results returned
1437     // from ElasticSearch and the list of operations that we rejected
1438     // without sending to ElasticSearch.
1439     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1440     if (esResult != null) {
1441       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1442     }
1443     combinedResults.addAll(rejectedOps);
1444
1445     // Iterate over the individual results in the resulting result set.
1446     StringBuilder resultsBuilder = new StringBuilder();
1447     AtomicBoolean firstItem = new AtomicBoolean(true);
1448     for (ElasticSearchResultItem item : combinedResults) {
1449
1450       // Increment the operation counts.
1451       totalOps++;
1452       if (isSuccessCode(item.operationStatus().getStatus())) {
1453         totalSuccess++;
1454       } else {
1455         totalFails++;
1456       }
1457
1458       // Prepend a comma to our response string unless this it the
1459       // first result in the set.
1460       if (!firstItem.compareAndSet(true, false)) {
1461         resultsBuilder.append(", ");
1462       }
1463
1464       // Append the current result as a generic json structure.
1465       resultsBuilder.append(item.toJson());
1466     }
1467
1468     // Now, build the result string and return it.
1469     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1470         + "\"total_success\": " + totalSuccess + ", "
1471         + "\"total_fails\": " + totalFails + ", "
1472         + "\"results\": ["
1473         + resultsBuilder.toString()
1474         + "]}";
1475
1476     return responseBody;
1477   }
1478
1479
1480   /**
1481    * This method queryies ElasticSearch to determine if the supplied
1482    * index is present in the document store.
1483    *
1484    * @param indexName - The index to look for.
1485    * @return - An operation result indicating the success or failure of
1486    * the check.
1487    * @throws DocumentStoreOperationException
1488    */
1489   public OperationResult checkIndexExistence(String indexName)
1490       throws DocumentStoreOperationException {
1491
1492     // Initialize operation result with a failure codes / fault string
1493     OperationResult opResult = new OperationResult();
1494     opResult.setResultCode(500);
1495
1496     // Grab the current time so we can use it to generate a metrics log.
1497     MdcOverride override = getStartTime(new MdcOverride());
1498
1499     String fullUrl = getFullUrl("/" + indexName, false);
1500     HttpURLConnection conn = initializeConnection(fullUrl);
1501
1502     try {
1503       conn.setRequestMethod("HEAD");
1504
1505     } catch (ProtocolException e) {
1506       shutdownConnection(conn);
1507       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1508     }
1509
1510     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1511
1512     int resultCode;
1513     try {
1514       resultCode = conn.getResponseCode();
1515     } catch (IOException e) {
1516       shutdownConnection(conn);
1517       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1518     }
1519     logger.debug("Response Code : " + resultCode);
1520
1521     opResult.setResultCode(resultCode);
1522
1523     // Generate a metrics log so we can track how long the operation took.
1524     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1525         new LogFields()
1526             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1527             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1528         override,
1529         indexName);
1530
1531     shutdownConnection(conn);
1532
1533     return opResult;
1534   }
1535
1536
1537   private void buildDocumentResult(DocumentOperationResult result, String index)
1538       throws DocumentStoreOperationException {
1539
1540     JSONParser parser = new JSONParser();
1541     JSONObject root;
1542     try {
1543       root = (JSONObject) parser.parse(result.getResult());
1544
1545       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1546         // Success response object
1547         Document doc = new Document();
1548         doc.setEtag(result.getResultVersion());
1549         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1550
1551         doc.setContent((JSONObject) root.get("_source"));
1552         result.setDocument(doc);
1553
1554       } else {
1555         // Error response object
1556         JSONObject error = (JSONObject) root.get("error");
1557         if (error != null) {
1558           result.setError(new ErrorResult(error.get("type").toString(),
1559               error.get("reason").toString()));
1560         }
1561
1562       }
1563     } catch (Exception e) {
1564       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1565           + result.getResult());
1566     }
1567
1568
1569   }
1570
1571   private String buildDocumentResponseUrl(String index, String id) {
1572     return ApiUtils.buildDocumentUri(index, id);
1573   }
1574
1575   private void buildSearchResult(SearchOperationResult result, String index)
1576       throws DocumentStoreOperationException {
1577
1578     JSONParser parser = new JSONParser();
1579     JSONObject root;
1580
1581     try {
1582       root = (JSONObject) parser.parse(result.getResult());
1583       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1584         JSONObject hits = (JSONObject) root.get("hits");
1585         JSONArray hitArray = (JSONArray) hits.get("hits");
1586         SearchHits searchHits = new SearchHits();
1587         searchHits.setTotalHits(hits.get("total").toString());
1588         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1589
1590         for (int i = 0; i < hitArray.size(); i++) {
1591           JSONObject hit = (JSONObject) hitArray.get(i);
1592           SearchHit searchHit = new SearchHit();
1593           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1594           Document doc = new Document();
1595           if (hit.get("_version") != null) {
1596             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1597           }
1598
1599           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1600               ? hit.get("_id").toString() : ""));
1601           doc.setContent((JSONObject) hit.get("_source"));
1602           searchHit.setDocument(doc);
1603           searchHitArray.add(searchHit);
1604         }
1605         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1606         result.setSearchResult(searchHits);
1607
1608         JSONObject aggregations = (JSONObject) root.get("aggregations");
1609         if (aggregations != null) {
1610           AggregationResult[] aggResults =
1611               AggregationParsingUtil.parseAggregationResults(aggregations);
1612           AggregationResults aggs = new AggregationResults();
1613           aggs.setAggregations(aggResults);
1614           result.setAggregationResult(aggs);
1615         }
1616
1617         // success
1618       } else {
1619         JSONObject error = (JSONObject) root.get("error");
1620         if (error != null) {
1621           result.setError(new ErrorResult(error.get("type").toString(),
1622               error.get("reason").toString()));
1623         }
1624       }
1625     } catch (Exception e) {
1626       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1627           + result.getResult());
1628     }
1629
1630   }
1631
1632 }