0e9ff8bc3ec5f926c7b7059ed7bcb34dd7f7213d
[aai/search-data-service.git] / src / main / java / org / openecomp / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * Search Data Service
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License ati
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.openecomp.sa.searchdbabstraction.elasticsearch.dao;
26
27 import com.att.aft.dme2.internal.google.common.base.Throwables;
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import edu.emory.mathcs.backport.java.util.Arrays;
34 import org.json.simple.JSONArray;
35 import org.json.simple.JSONObject;
36 import org.json.simple.parser.JSONParser;
37 import org.json.simple.parser.ParseException;
38 import org.openecomp.cl.api.LogFields;
39 import org.openecomp.cl.api.LogLine;
40 import org.openecomp.cl.api.Logger;
41 import org.openecomp.cl.eelf.LoggerFactory;
42 import org.openecomp.cl.mdc.MdcContext;
43 import org.openecomp.cl.mdc.MdcOverride;
44 import org.openecomp.sa.rest.AnalysisConfiguration;
45 import org.openecomp.sa.rest.ApiUtils;
46 import org.openecomp.sa.rest.BulkRequest;
47 import org.openecomp.sa.rest.BulkRequest.OperationType;
48 import org.openecomp.sa.rest.DocumentSchema;
49 import org.openecomp.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
50 import org.openecomp.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
51 import org.openecomp.sa.searchdbabstraction.entity.AggregationResult;
52 import org.openecomp.sa.searchdbabstraction.entity.AggregationResults;
53 import org.openecomp.sa.searchdbabstraction.entity.Document;
54 import org.openecomp.sa.searchdbabstraction.entity.DocumentOperationResult;
55 import org.openecomp.sa.searchdbabstraction.entity.ErrorResult;
56 import org.openecomp.sa.searchdbabstraction.entity.OperationResult;
57 import org.openecomp.sa.searchdbabstraction.entity.SearchHit;
58 import org.openecomp.sa.searchdbabstraction.entity.SearchHits;
59 import org.openecomp.sa.searchdbabstraction.entity.SearchOperationResult;
60 import org.openecomp.sa.searchdbabstraction.logging.SearchDbMsgs;
61 import org.openecomp.sa.searchdbabstraction.util.AggregationParsingUtil;
62 import org.openecomp.sa.searchdbabstraction.util.DocumentSchemaUtil;
63 import org.openecomp.sa.searchdbabstraction.util.SearchDbConstants;
64
65 import java.io.BufferedReader;
66 import java.io.File;
67 import java.io.FileInputStream;
68 import java.io.IOException;
69 import java.io.InputStream;
70 import java.io.InputStreamReader;
71 import java.io.OutputStream;
72 import java.io.OutputStreamWriter;
73 import java.io.PrintWriter;
74 import java.io.StringWriter;
75 import java.net.HttpURLConnection;
76 import java.net.MalformedURLException;
77 import java.net.ProtocolException;
78 import java.net.URL;
79 import java.text.SimpleDateFormat;
80 import java.util.ArrayList;
81 import java.util.List;
82 import java.util.Properties;
83 import java.util.concurrent.atomic.AtomicBoolean;
84 import javax.ws.rs.core.Response.Status;
85
86
87 /**
88  * This class has the Elasticsearch implementation of the
89  * DB operations defined in DocumentStoreInterface.
90  */
91 public class ElasticSearchHttpController implements DocumentStoreInterface {
92
93   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
94       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
95   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
96       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
97   private static final String BULK_IMPORT_INDEX_TEMPLATE =
98       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
99   private static final String BULK_DELETE_TEMPLATE =
100       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
101
102   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
103       "Internal Error: ElasticSearch operation fault occurred";
104   private static final Logger logger = LoggerFactory.getInstance()
105       .getLogger(ElasticSearchHttpController.class.getName());
106   private static final Logger metricsLogger = LoggerFactory.getInstance()
107       .getMetricsLogger(ElasticSearchHttpController.class.getName());
108   private final ElasticSearchConfig config;
109
110   private static final String DEFAULT_TYPE = "default";
111
112   private static ElasticSearchHttpController instance = null;
113
114   protected AnalysisConfiguration analysisConfig;
115
116   public static ElasticSearchHttpController getInstance() {
117
118     synchronized (ElasticSearchHttpController.class) {
119
120       if (instance == null) {
121
122         Properties properties = new Properties();
123         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
124         try {
125           properties.load(new FileInputStream(file));
126         } catch (Exception e) {
127           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
128             "ElasticSearchHTTPController.getInstance",
129             e.getLocalizedMessage());
130         }
131
132         ElasticSearchConfig config = new ElasticSearchConfig(properties);
133         instance = new ElasticSearchHttpController(config);
134       }
135     }
136
137     return instance;
138   }
139
140   public ElasticSearchHttpController(ElasticSearchConfig config) {
141     this.config = config;
142     analysisConfig = new AnalysisConfiguration();
143
144     try {
145       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
146       checkConnection();
147       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
148     } catch (Exception e) {
149       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
150           getFullUrl("", false), e.getMessage());
151     }
152   }
153
154
155   public AnalysisConfiguration getAnalysisConfig() {
156     return analysisConfig;
157   }
158
159   @Override
160   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
161
162     OperationResult result = new OperationResult();
163     result.setResultCode(500);
164
165     try {
166
167       // Submit the request to ElasticSearch to create the index using a
168       // default document type.
169       result = createTable(index,
170           DEFAULT_TYPE,
171           analysisConfig.getEsIndexSettings(),
172           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
173
174       // ElasticSearch will return us a 200 code on success when we
175       // want to report a 201, so translate the result here.
176       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
177       if (isSuccess(result)) {
178         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
179         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
180       }
181
182     } catch (DocumentStoreOperationException e) {
183
184       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
185     }
186
187     return result;
188   }
189
190
191   @Override
192   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
193
194     //Initialize operation result with a failure codes / fault string
195     OperationResult opResult = new OperationResult();
196     opResult.setResultCode(500);
197     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
198
199     // Grab the current time so we can use it to generate a metrics log.
200     MdcOverride override = getStartTime(new MdcOverride());
201
202     String fullUrl = getFullUrl("/" + indexName + "/", false);
203     HttpURLConnection conn = initializeConnection(fullUrl);
204
205     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
206
207     try {
208       conn.setRequestMethod("DELETE");
209     } catch (ProtocolException e) {
210       shutdownConnection(conn);
211       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
212     }
213
214     handleResponse(conn, opResult);
215
216     // Generate a metrics log so we can track how long the operation took.
217     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
218         new LogFields()
219             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
220             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
221         override,
222         indexName);
223
224     shutdownConnection(conn);
225
226     return opResult;
227   }
228
229
230   private OperationResult checkConnection() throws Exception {
231
232     String fullUrl = getFullUrl("/_cluster/health", false);
233     URL url = null;
234     HttpURLConnection conn = null;
235
236     url = new URL(fullUrl);
237     conn = (HttpURLConnection) url.openConnection();
238     conn.setRequestMethod("GET");
239     conn.setDoOutput(true);
240     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
241
242     int resultCode = conn.getResponseCode();
243     logger.debug("getClusterHealth() response Code : " + resultCode);
244     OperationResult opResult = new OperationResult();
245     opResult.setResultCode(resultCode);
246
247     shutdownConnection(conn);
248
249     return opResult;
250   }
251
252   private String getFullUrl(String resourceUrl, boolean isSecure) {
253
254     final String host = config.getIpAddress();
255     final String port = config.getHttpPort();
256
257     if (isSecure) {
258       return String.format("https://%s:%s%s", host, port, resourceUrl);
259     } else {
260       return String.format("http://%s:%s%s", host, port, resourceUrl);
261     }
262   }
263
264   private void shutdownConnection(HttpURLConnection connection) {
265     if (connection == null) {
266       return;
267     }
268
269     InputStream inputstream = null;
270     OutputStream outputstream = null;
271
272     try {
273       inputstream = connection.getInputStream();
274     } catch (IOException e) {
275       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
276     } finally {
277       if (inputstream != null) {
278         try {
279           inputstream.close();
280         } catch (IOException e) {
281           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
282               e.getLocalizedMessage());
283         }
284       }
285     }
286
287     try {
288       outputstream = connection.getOutputStream();
289     } catch (IOException e) {
290       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
291     } finally {
292       if (outputstream != null) {
293         try {
294           outputstream.close();
295         } catch (IOException e) {
296           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
297               e.getLocalizedMessage());
298         }
299       }
300     }
301
302     connection.disconnect();
303   }
304
305   //@Override
306   protected OperationResult createTable(String indexName, String typeName,
307                                         String indexSettings, String indexMappings)
308       throws DocumentStoreOperationException {
309
310     if (indexSettings == null) {
311       logger.debug("No settings provided.");
312     }
313
314     if (indexMappings == null) {
315       logger.debug("No mappings provided.");
316     }
317
318     OperationResult opResult = new OperationResult();
319
320     // Initialize operation result with a failure codes / fault string
321     opResult.setResultCode(500);
322     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
323
324     // Grab the current time so we can use it to generate a metrics log.
325     MdcOverride override = getStartTime(new MdcOverride());
326
327     String fullUrl = getFullUrl("/" + indexName + "/", false);
328     HttpURLConnection conn = initializeConnection(fullUrl);
329
330     try {
331       conn.setRequestMethod("PUT");
332     } catch (ProtocolException e) {
333       shutdownConnection(conn);
334       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
335     }
336
337     StringBuilder sb = new StringBuilder(128);
338     sb.append("{ \"settings\" : ");
339     sb.append(indexSettings);
340     sb.append(",");
341
342     sb.append("\"mappings\" : {");
343     sb.append("\"" + typeName + "\" :");
344     sb.append(indexMappings);
345     sb.append("}}");
346
347     attachContent(conn, sb.toString());
348
349     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
350     logger.debug("Request content: " + sb.toString());
351
352     handleResponse(conn, opResult);
353
354     shutdownConnection(conn);
355
356     // Generate a metrics log so we can track how long the operation took.
357     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
358         new LogFields()
359             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
360             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
361         override,
362         indexName);
363
364     return opResult;
365   }
366
367   @Override
368   public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document)
369       throws DocumentStoreOperationException {
370     if (document.getId() == null || document.getId().isEmpty()) {
371       return createDocumentWithoutId(indexName, document);
372     } else {
373       return createDocumentWithId(indexName, document);
374     }
375   }
376
377   private DocumentOperationResult createDocumentWithId(String indexName,
378                                                        DocumentStoreDataEntity document)
379       throws DocumentStoreOperationException {
380     // check if the document already exists
381     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
382
383
384     if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
385       if (opResult.getResultCode() == Status.OK.getStatusCode()) {
386         opResult.setFailureCause("A document with the same id already exists.");
387       } else {
388         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
389       }
390       opResult.setResultCode(Status.CONFLICT.getStatusCode());
391       return opResult;
392     }
393
394     opResult = new DocumentOperationResult();
395     // Initialize operation result with a failure codes / fault string
396     opResult.setResultCode(500);
397     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
398
399     // Grab the current time so we can use it to generate a metrics log.
400     MdcOverride override = getStartTime(new MdcOverride());
401
402     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
403         + "/" + document.getId(), false);
404     HttpURLConnection conn = initializeConnection(fullUrl);
405
406     try {
407       conn.setRequestMethod("PUT");
408     } catch (ProtocolException e) {
409       shutdownConnection(conn);
410       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
411     }
412
413     attachDocument(conn, document);
414
415     logger.debug("Sending 'PUT' request to: " + conn.getURL());
416
417     handleResponse(conn, opResult);
418     buildDocumentResult(opResult, indexName);
419
420     // Generate a metrics log so we can track how long the operation took.
421     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
422         new LogFields()
423             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
424             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
425         override,
426         indexName);
427
428     shutdownConnection(conn);
429
430     return opResult;
431
432   }
433
434   private DocumentOperationResult createDocumentWithoutId(String indexName,
435                                                           DocumentStoreDataEntity document)
436       throws DocumentStoreOperationException {
437
438     DocumentOperationResult response = new DocumentOperationResult();
439     // Initialize operation result with a failure codes / fault string
440     response.setResultCode(500);
441     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
442
443     // Grab the current time so we can use it to generate a metrics log.
444     MdcOverride override = getStartTime(new MdcOverride());
445
446     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
447     HttpURLConnection conn = initializeConnection(fullUrl);
448
449     try {
450       conn.setRequestMethod("POST");
451     } catch (ProtocolException e) {
452       shutdownConnection(conn);
453       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
454     }
455
456     attachDocument(conn, document);
457
458     logger.debug("Sending 'POST' request to: " + conn.getURL());
459
460     handleResponse(conn, response);
461     buildDocumentResult(response, indexName);
462
463     // Generate a metrics log so we can track how long the operation took.
464     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
465         new LogFields()
466             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
467             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
468         override,
469         indexName);
470
471     shutdownConnection(conn);
472
473     return response;
474   }
475
476   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
477       throws DocumentStoreOperationException {
478     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
479     conn.setRequestProperty("Connection", "Close");
480
481     attachContent(conn, doc.getContentInJson());
482   }
483
484   private DocumentOperationResult checkDocumentExistence(String indexName,
485                                                          String docId)
486       throws DocumentStoreOperationException {
487     DocumentOperationResult opResult = new DocumentOperationResult();
488
489     // Initialize operation result with a failure codes / fault string
490     opResult.setResultCode(500);
491
492     // Grab the current time so we can use it to generate a metrics log.
493     MdcOverride override = getStartTime(new MdcOverride());
494
495     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
496     HttpURLConnection conn = initializeConnection(fullUrl);
497
498     try {
499       conn.setRequestMethod("HEAD");
500     } catch (ProtocolException e) {
501       shutdownConnection(conn);
502       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
503     }
504
505     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
506
507     int resultCode;
508     try {
509       resultCode = conn.getResponseCode();
510     } catch (IOException e) {
511       shutdownConnection(conn);
512       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
513     }
514
515     logger.debug("Response Code : " + resultCode);
516
517     opResult.setResultCode(resultCode);
518
519     // Generate a metrics log so we can track how long the operation took.
520     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
521         new LogFields()
522             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
523             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
524         override,
525         indexName,
526         docId);
527
528     shutdownConnection(conn);
529
530     return opResult;
531   }
532
533   @Override
534   public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document)
535       throws DocumentStoreOperationException {
536     DocumentOperationResult opResult = new DocumentOperationResult();
537
538     // Initialize operation result with a failure codes / fault string
539     opResult.setResultCode(500);
540     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
541
542     // Grab the current time so we can use it to generate a metrics log.
543     MdcOverride override = getStartTime(new MdcOverride());
544
545     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
546         + "?version=" + document.getVersion(), false);
547     HttpURLConnection conn = initializeConnection(fullUrl);
548
549     try {
550       conn.setRequestMethod("PUT");
551     } catch (ProtocolException e) {
552       shutdownConnection(conn);
553       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
554     }
555
556     attachDocument(conn, document);
557
558     logger.debug("Sending 'PUT' request to: " + conn.getURL());
559
560     handleResponse(conn, opResult);
561     buildDocumentResult(opResult, indexName);
562
563     // Generate a metrics log so we can track how long the operation took.
564     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
565         new LogFields()
566             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
567             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
568         override,
569         indexName,
570         document.getId());
571
572     shutdownConnection(conn);
573
574     return opResult;
575   }
576
577   @Override
578   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
579       throws DocumentStoreOperationException {
580     DocumentOperationResult opResult = new DocumentOperationResult();
581
582     // Initialize operation result with a failure codes / fault string
583     opResult.setResultCode(500);
584     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
585
586     // Grab the current time so we can use it to generate a metrics log.
587     MdcOverride override = getStartTime(new MdcOverride());
588
589     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
590         + "?version=" + document.getVersion(), false);
591     HttpURLConnection conn = initializeConnection(fullUrl);
592
593     try {
594       conn.setRequestMethod("DELETE");
595     } catch (ProtocolException e) {
596       shutdownConnection(conn);
597       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
598     }
599
600     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
601
602     handleResponse(conn, opResult);
603     buildDocumentResult(opResult, indexName);
604     //supress the etag and url in response for delete as they are not required
605     if (opResult.getDocument() != null) {
606       opResult.getDocument().setEtag(null);
607       opResult.getDocument().setUrl(null);
608     }
609
610     // Generate a metrics log so we can track how long the operation took.
611     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
612         new LogFields()
613             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
614             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
615         override,
616         indexName,
617         document.getId());
618
619     shutdownConnection(conn);
620
621     return opResult;
622   }
623
624   @Override
625   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
626       throws DocumentStoreOperationException {
627     DocumentOperationResult opResult = new DocumentOperationResult();
628
629     // Initialize operation result with a failure codes / fault string
630     opResult.setResultCode(500);
631     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
632
633     // Grab the current time so we can use it to generate a metrics log.
634     MdcOverride override = getStartTime(new MdcOverride());
635
636     String fullUrl = null;
637     if (document.getVersion() == null) {
638       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
639     } else {
640       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
641           + "?version=" + document.getVersion(), false);
642     }
643     HttpURLConnection conn = initializeConnection(fullUrl);
644
645     logger.debug("\nSending 'GET' request to: " + conn.getURL());
646
647     handleResponse(conn, opResult);
648     buildDocumentResult(opResult, indexName);
649
650     // Generate a metrics log so we can track how long the operation took.
651     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
652         new LogFields()
653             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
654             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
655         override,
656         indexName,
657         document.getId());
658
659     shutdownConnection(conn);
660
661     return opResult;
662   }
663
664   public SearchOperationResult search(String indexName, String queryString)
665       throws DocumentStoreOperationException {
666     SearchOperationResult opResult = new SearchOperationResult();
667
668     // Initialize operation result with a failure codes / fault string
669     opResult.setResultCode(500);
670     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
671
672     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
673
674     // Grab the current time so we can use it to generate a metrics log.
675     MdcOverride override = getStartTime(new MdcOverride());
676
677     HttpURLConnection conn = initializeConnection(fullUrl);
678
679     try {
680       conn.setRequestMethod("GET");
681     } catch (ProtocolException e) {
682       shutdownConnection(conn);
683       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
684     }
685
686     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
687
688     handleResponse(conn, opResult);
689     buildSearchResult(opResult, indexName);
690
691
692     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
693         new LogFields()
694             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
695             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
696         override,
697         indexName,
698         queryString);
699
700     return opResult;
701   }
702
703   public SearchOperationResult searchWithPayload(String indexName, String query)
704       throws DocumentStoreOperationException {
705     SearchOperationResult opResult = new SearchOperationResult();
706
707     if (logger.isDebugEnabled()) {
708       logger.debug("Querying index: " + indexName + " with query string: " + query);
709     }
710
711     // Initialize operation result with a failure codes / fault string
712     opResult.setResultCode(500);
713     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
714
715     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
716
717     // Grab the current time so we can use it to generate a metrics log.
718     MdcOverride override = getStartTime(new MdcOverride());
719
720     HttpURLConnection conn = initializeConnection(fullUrl);
721
722     try {
723       conn.setRequestMethod("POST");
724     } catch (ProtocolException e) {
725       shutdownConnection(conn);
726       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
727     }
728
729     attachContent(conn, query);
730
731     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
732     logger.debug("Request body =  Elasticsearch query = " + query);
733
734     handleResponse(conn, opResult);
735     buildSearchResult(opResult, indexName);
736
737     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
738         new LogFields()
739             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
740             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
741         override,
742         indexName,
743         query);
744
745     shutdownConnection(conn);
746
747     return opResult;
748   }
749
750   private void attachContent(HttpURLConnection conn, String content)
751       throws DocumentStoreOperationException {
752     OutputStream outputStream = null;
753     OutputStreamWriter out = null;
754
755     try {
756       outputStream = conn.getOutputStream();
757     } catch (IOException e) {
758       shutdownConnection(conn);
759       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
760     }
761
762     out = new OutputStreamWriter(outputStream);
763
764     try {
765       out.write(content);
766       out.close();
767     } catch (IOException e) {
768       shutdownConnection(conn);
769       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
770     }
771   }
772
773   private HttpURLConnection initializeConnection(String fullUrl)
774       throws DocumentStoreOperationException {
775     URL url = null;
776     HttpURLConnection conn = null;
777
778     try {
779       url = new URL(fullUrl);
780     } catch (MalformedURLException e) {
781       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
782     }
783
784     try {
785       conn = (HttpURLConnection) url.openConnection();
786       conn.setDoOutput(true);
787     } catch (IOException e) {
788       shutdownConnection(conn);
789       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
790     }
791
792     return conn;
793   }
794
795   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
796       throws DocumentStoreOperationException {
797     int resultCode = 200;
798
799     try {
800       resultCode = conn.getResponseCode();
801     } catch (IOException e) {
802       shutdownConnection(conn);
803       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
804     }
805
806     logger.debug("Response Code : " + resultCode);
807
808     InputStream inputStream = null;
809
810     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
811       inputStream = conn.getErrorStream();
812     } else {
813       try {
814         inputStream = conn.getInputStream();
815       } catch (IOException e) {
816         shutdownConnection(conn);
817         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
818       }
819     }
820
821     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
822     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
823
824     StringBuilder result = new StringBuilder(128);
825     String string = null;
826
827     try {
828       while ((string = bufferedreader.readLine()) != null) {
829         result.append(string).append("\n");
830       }
831     } catch (IOException e) {
832       shutdownConnection(conn);
833       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
834     }
835
836     if (resultCode == Status.CONFLICT.getStatusCode()) {
837       opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
838     } else {
839       opResult.setResultCode(resultCode);
840     }
841     if (logger.isDebugEnabled()) {
842       logger.debug("Raw result string from ElasticSearch = " + result.toString());
843     }
844     opResult.setResult(result.toString());
845     opResult.setResultVersion(extractVersion(result.toString()));
846   }
847
848   private String extractVersion(String result) throws DocumentStoreOperationException {
849
850     JSONParser parser = new JSONParser();
851     String version = null;
852     try {
853       JSONObject root = (JSONObject) parser.parse(result);
854       if (root.get("_version") != null) {
855         version = root.get("_version").toString();
856       }
857
858     } catch (ParseException e) {
859
860       // Not all responses from ElasticSearch include a version, so
861       // if we don't get one back, just return an empty string rather
862       // than trigger a false failure.
863       version = "";
864     }
865     return version;
866   }
867
868   /**
869    * This convenience method gets the current system time and stores
870    * it in an attribute in the supplied {@link MdcOverride} object so
871    * that it can be used later by the metrics logger.
872    *
873    * @param override - The {@link MdcOverride} object to update.
874    * @return - The supplied {@link MdcOverride} object.
875    */
876   private MdcOverride getStartTime(MdcOverride override) {
877
878     // Grab the current time...
879     long startTimeInMs = System.currentTimeMillis();
880
881     // ...and add it as an attribute to the supplied MDC Override
882     // object.
883     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
884     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
885
886     // Return the MdcOverride object that we were passed.
887     // This looks odd, but it allows us to do stuff like:
888     //
889     //    MdcOverride ov = getStartTime(new MdcOverride())
890     //
891     // which is quite handy, but also allows us to pass in an existing
892     // MdcOverride object which already has some attributes set.
893     return override;
894   }
895
896   private boolean isSuccess(OperationResult result) {
897
898     return isSuccessCode(result.getResultCode());
899   }
900
901
902   private boolean isSuccessCode(int statusCode) {
903     return ((statusCode >= 200) && (statusCode < 300));
904   }
905
906
907   @Override
908   public OperationResult performBulkOperations(BulkRequest[] requests)
909       throws DocumentStoreOperationException {
910
911     if (logger.isDebugEnabled()) {
912       String dbgString = "ESController: performBulkOperations - Operations: ";
913
914       for (BulkRequest request : requests) {
915         dbgString += "[" + request.toString() + "] ";
916       }
917
918       logger.debug(dbgString);
919     }
920
921     // Grab the current time so we can use it to generate a metrics log.
922     MdcOverride override = getStartTime(new MdcOverride());
923
924     // Parse the supplied set of operations.
925     // Iterate over the list of operations which we were provided and
926     // translate them into a format that ElasticSearh understands.
927     int opCount = 0;
928     StringBuilder esOperationSet = new StringBuilder(128);
929     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
930     for (BulkRequest request : requests) {
931
932       // Convert the request to the syntax ElasticSearch likes.
933       if (buildEsOperation(request, esOperationSet, rejected)) {
934         opCount++;
935       }
936     }
937
938     ElasticSearchBulkOperationResult opResult = null;
939     if (opCount > 0) {
940
941       // Open an HTTP connection to the ElasticSearch back end.
942       String fullUrl = getFullUrl("/_bulk", false);
943       URL url;
944       HttpURLConnection conn;
945       try {
946
947         url = new URL(fullUrl);
948         conn = (HttpURLConnection) url.openConnection();
949         conn.setRequestMethod("PUT");
950         conn.setDoOutput(true);
951         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
952         conn.setRequestProperty("Connection", "Close");
953
954       } catch (IOException e) {
955
956         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
957         if (logger.isDebugEnabled()) {
958           logger.debug(Throwables.getStackTraceAsString(e));
959         }
960
961         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
962             + e.getMessage(), e);
963       }
964
965       StringBuilder bulkResult = new StringBuilder(128);
966       try {
967         // Create an output stream to write our request to.
968         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
969         ;
970
971         if (logger.isDebugEnabled()) {
972           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
973           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
974               "\\n"));
975         }
976
977         // Write the resulting request string to our output stream. (this sends the request to ES?)
978         out.write(esOperationSet.toString());
979         out.close();
980
981         // Open an input stream on our connection in order to read back the results.
982         InputStream is = conn.getInputStream();
983         InputStreamReader inputstreamreader = new InputStreamReader(is);
984         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
985
986         // Read the contents of the input stream into our result string...
987         String esResponseString = null;
988
989         while ((esResponseString = bufferedreader.readLine()) != null) {
990           bulkResult.append(esResponseString).append("\n");
991         }
992
993       } catch (IOException e) {
994
995         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
996         if (logger.isDebugEnabled()) {
997           StringWriter sw = new StringWriter();
998           e.printStackTrace(new PrintWriter(sw));
999           logger.debug(sw.toString());
1000         }
1001
1002         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1003             + e.getMessage(), e);
1004       }
1005
1006       if (logger.isDebugEnabled()) {
1007         logger.debug("ESController: Received result string from ElasticSearch: = "
1008             + bulkResult.toString());
1009       }
1010
1011       // ...and marshal the resulting string into a Java object.
1012       try {
1013         opResult = marshallEsBulkResult(bulkResult.toString());
1014
1015       } catch (IOException e) {
1016
1017         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1018         if (logger.isDebugEnabled()) {
1019           logger.debug(Throwables.getStackTraceAsString(e));
1020         }
1021
1022         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1023             + e.getMessage(), e);
1024       }
1025     }
1026
1027     // Finally, build the operation result and return it to the caller.
1028     OperationResult result = new OperationResult();
1029     result.setResultCode(207);
1030     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1031
1032     // In the success case we don't want the entire result string to be
1033     // dumped into the metrics log, so concatenate it.
1034     String resultStringForMetricsLog = result.getResult();
1035     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1036       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1037           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1038     }
1039
1040     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1041         new LogFields()
1042             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1043             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1044         override);
1045
1046     return result;
1047   }
1048
1049
1050   /**
1051    * This method converts a {@link BulkRequest} object into a json structure
1052    * which can be understood by ElasticSearch.
1053    *
1054    * @param request - The request to be performed.
1055    * @param sb      - The string builder to append the json data to
1056    * @throws DocumentStoreOperationException
1057    */
1058   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1059                                    List<ElasticSearchResultItem> fails)
1060       throws DocumentStoreOperationException {
1061
1062     boolean retVal = true;
1063     OperationResult indexExistsResult = null;
1064
1065     // What kind of operation are we performing?
1066     switch (request.getOperationType()) {
1067
1068       // Create a new document.
1069       case CREATE:
1070
1071         // Make sure that we were supplied a document payload.
1072         if (request.getOperation().getDocument() == null) {
1073
1074           fails.add(generateRejectionEntry(request.getOperationType(),
1075               "Missing document payload",
1076               request.getIndex(),
1077               request.getId(),
1078               400,
1079               request.getOperation().getMetaData().getUrl()));
1080           return false;
1081         }
1082
1083         // Make sure that the supplied document URL is formatted
1084         // correctly.
1085         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1086           fails.add(generateRejectionEntry(request.getOperationType(),
1087               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1088               request.getIndex(),
1089               "",
1090               400,
1091               request.getOperation().getMetaData().getUrl()));
1092           return false;
1093         }
1094
1095         // Validate that the specified index actually exists before we
1096         // try to perform the create.
1097         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1098
1099           fails.add(generateRejectionEntry(request.getOperationType(),
1100               "Specified resource does not exist: "
1101                   + request.getOperation().getMetaData().getUrl(),
1102               request.getIndex(),
1103               request.getId(),
1104               404,
1105               request.getOperation().getMetaData().getUrl()));
1106           return false;
1107         }
1108
1109         // If we were supplied an id for the new document, then
1110         // include it in the bulk operation to Elastic Search
1111         if (request.getId() == null) {
1112
1113           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1114               request.getIndex(),
1115               DEFAULT_TYPE));
1116
1117           // Otherwise, we just leave that parameter off and ElasticSearch
1118           // will generate one for us.
1119         } else {
1120           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1121               request.getIndex(),
1122               DEFAULT_TYPE,
1123               request.getId()));
1124         }
1125
1126         try {
1127           // Append the document that we want to create.
1128           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1129         } catch (JsonProcessingException e) {
1130           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1131         }
1132
1133         break;
1134
1135       // Update an existing document.
1136       case UPDATE:
1137
1138         // Make sure that we were supplied a document payload.
1139         if (request.getOperation().getDocument() == null) {
1140
1141           fails.add(generateRejectionEntry(request.getOperationType(),
1142               "Missing document payload",
1143               request.getIndex(),
1144               request.getId(),
1145               400,
1146               request.getOperation().getMetaData().getUrl()));
1147           return false;
1148         }
1149
1150         // Make sure that the supplied document URL is formatted
1151         // correctly.
1152         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1153           fails.add(generateRejectionEntry(request.getOperationType(),
1154               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1155               request.getIndex(),
1156               "",
1157               400,
1158               request.getOperation().getMetaData().getUrl()));
1159           return false;
1160         }
1161
1162         // Validate that the specified index actually exists before we
1163         // try to perform the update.
1164         if (!indexExists(request.getIndex())) {
1165
1166           fails.add(generateRejectionEntry(request.getOperationType(),
1167               "Specified resource does not exist: "
1168                   + request.getOperation().getMetaData().getUrl(),
1169               request.getIndex(),
1170               request.getId(),
1171               404,
1172               request.getOperation().getMetaData().getUrl()));
1173           return false;
1174         }
1175
1176         // Validate that the document we are trying to update actually
1177         // exists before we try to perform the update.
1178         if (!documentExists(request.getIndex(), request.getId())) {
1179
1180           fails.add(generateRejectionEntry(request.getOperationType(),
1181               "Specified resource does not exist: "
1182                   + request.getOperation().getMetaData().getUrl(),
1183               request.getIndex(),
1184               request.getId(),
1185               404,
1186               request.getOperation().getMetaData().getUrl()));
1187           return false;
1188         }
1189
1190         // It is mandatory that a version be supplied for an update operation,
1191         // so validate that now.
1192         if (request.getOperation().getMetaData().getEtag() == null) {
1193
1194           fails.add(generateRejectionEntry(request.getOperationType(),
1195               "Missing mandatory ETag field",
1196               request.getIndex(),
1197               request.getId(),
1198               400,
1199               request.getOperation().getMetaData().getUrl()));
1200           return false;
1201         }
1202
1203         // Generate the update request...
1204         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1205             request.getIndex(),
1206             DEFAULT_TYPE,
1207             request.getId(),
1208             request.getOperation().getMetaData().getEtag()));
1209
1210         // ...and append the document that we want to update.
1211         try {
1212           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1213         } catch (JsonProcessingException e) {
1214           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1215         }
1216         break;
1217
1218       // Delete an existing document.
1219       case DELETE:
1220
1221         // Make sure that the supplied document URL is formatted
1222         // correctly.
1223         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1224           fails.add(generateRejectionEntry(request.getOperationType(),
1225               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1226               request.getIndex(),
1227               "",
1228               400,
1229               request.getOperation().getMetaData().getUrl()));
1230           return false;
1231         }
1232
1233         // Validate that the specified index actually exists before we
1234         // try to perform the delete.
1235         if (!indexExists(request.getIndex())) {
1236
1237           fails.add(generateRejectionEntry(request.getOperationType(),
1238               "Specified resource does not exist: "
1239                   + request.getOperation().getMetaData().getUrl(),
1240               request.getIndex(),
1241               request.getId(),
1242               404,
1243               request.getOperation().getMetaData().getUrl()));
1244           return false;
1245         }
1246
1247         // Validate that the document we are trying to update actually
1248         // exists before we try to perform the delete.
1249         if (!documentExists(request.getIndex(), request.getId())) {
1250
1251           fails.add(generateRejectionEntry(request.getOperationType(),
1252               "Specified resource does not exist: "
1253                   + request.getOperation().getMetaData().getUrl(),
1254               request.getIndex(),
1255               request.getId(),
1256               404,
1257               request.getOperation().getMetaData().getUrl()));
1258           return false;
1259         }
1260
1261         // It is mandatory that a version be supplied for a delete operation,
1262         // so validate that now.
1263         if (request.getOperation().getMetaData().getEtag() == null) {
1264
1265           fails.add(generateRejectionEntry(request.getOperationType(),
1266               "Missing mandatory ETag field",
1267               request.getIndex(),
1268               request.getId(),
1269               400,
1270               request.getOperation().getMetaData().getUrl()));
1271           return false;
1272         }
1273
1274         // Generate the delete request.
1275         sb.append(String.format(BULK_DELETE_TEMPLATE,
1276             request.getIndex(),
1277             DEFAULT_TYPE,
1278             request.getId(),
1279             request.getOperation().getMetaData().getEtag()));
1280         break;
1281       default:
1282     }
1283
1284     return retVal;
1285   }
1286
1287   private boolean indexExists(String index) throws DocumentStoreOperationException {
1288
1289     OperationResult indexExistsResult = checkIndexExistence(index);
1290
1291     return ((indexExistsResult.getResultCode() >= 200)
1292         && (indexExistsResult.getResultCode() < 300));
1293   }
1294
1295   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1296
1297     OperationResult docExistsResult = checkDocumentExistence(index, id);
1298
1299     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1300   }
1301
1302   /**
1303    * This method constructs a status entry for a bulk operation which has
1304    * been rejected before even sending it to the document store.
1305    *
1306    * @param rejectReason - A message describing why the operation was rejected.
1307    * @param anId         - The identifier associated with the document being
1308    *                     acted on.
1309    * @param statusCode  - An HTTP status code.
1310    * @return - A result set item.
1311    */
1312   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1313                                                          String rejectReason,
1314                                                          String index,
1315                                                          String anId,
1316                                                          int statusCode,
1317                                                          String originalUrl) {
1318
1319     ElasticSearchError err = new ElasticSearchError();
1320     err.setReason(rejectReason);
1321
1322     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1323     op.setIndex(index);
1324     op.setId(anId);
1325     op.setStatus(statusCode);
1326     op.setError(err);
1327     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1328
1329     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1330
1331     switch (opType) {
1332       case CREATE:
1333         rejectionResult.setCreate(op);
1334         break;
1335       case UPDATE:
1336         rejectionResult.setIndex(op);
1337         break;
1338       case DELETE:
1339         rejectionResult.setDelete(op);
1340         break;
1341       default:
1342     }
1343
1344     return rejectionResult;
1345   }
1346
1347
1348   /**
1349    * This method takes the json structure returned from ElasticSearch in
1350    * response to a bulk operations request and marshals it into a Java
1351    * object.
1352    *
1353    * @param jsonResult - The bulk operations response returned from
1354    *                   ElasticSearch.
1355    * @return - The marshalled response.
1356    * @throws JsonParseException
1357    * @throws JsonMappingException
1358    * @throws IOException
1359    */
1360   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1361       throws JsonParseException, JsonMappingException, IOException {
1362
1363     if (jsonResult != null) {
1364       if (logger.isDebugEnabled()) {
1365         logger.debug("ESController: Marshalling ES result set from json: "
1366             + jsonResult.replaceAll("\n", ""));
1367       }
1368
1369       ObjectMapper mapper = new ObjectMapper();
1370       mapper.setSerializationInclusion(Include.NON_EMPTY);
1371
1372       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1373     }
1374
1375     return null;
1376   }
1377
1378
1379   /**
1380    * This method takes the marshalled ElasticSearch bulk response and
1381    * converts it into a generic response payload.
1382    *
1383    * @param esResult - ElasticSearch bulk operations response.
1384    * @return - A generic result set.
1385    */
1386   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1387                                            List<ElasticSearchResultItem> rejectedOps) {
1388
1389     int totalOps = 0;
1390     int totalSuccess = 0;
1391     int totalFails = 0;
1392
1393     if (logger.isDebugEnabled()) {
1394
1395       logger.debug("ESController: Build generic result set.  ES Results: "
1396           + ((esResult != null) ? esResult.toString() : "[]")
1397           + " Rejected Ops: " + rejectedOps.toString());
1398     }
1399
1400     // Build a combined list of result items from the results returned
1401     // from ElasticSearch and the list of operations that we rejected
1402     // without sending to ElasticSearch.
1403     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1404     if (esResult != null) {
1405       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1406     }
1407     combinedResults.addAll(rejectedOps);
1408
1409     // Iterate over the individual results in the resulting result set.
1410     StringBuilder resultsBuilder = new StringBuilder();
1411     AtomicBoolean firstItem = new AtomicBoolean(true);
1412     for (ElasticSearchResultItem item : combinedResults) {
1413
1414       // Increment the operation counts.
1415       totalOps++;
1416       if (isSuccessCode(item.operationStatus().getStatus())) {
1417         totalSuccess++;
1418       } else {
1419         totalFails++;
1420       }
1421
1422       // Prepend a comma to our response string unless this it the
1423       // first result in the set.
1424       if (!firstItem.compareAndSet(true, false)) {
1425         resultsBuilder.append(", ");
1426       }
1427
1428       // Append the current result as a generic json structure.
1429       resultsBuilder.append(item.toJson());
1430     }
1431
1432     // Now, build the result string and return it.
1433     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1434         + "\"total_success\": " + totalSuccess + ", "
1435         + "\"total_fails\": " + totalFails + ", "
1436         + "\"results\": ["
1437         + resultsBuilder.toString()
1438         + "]}";
1439
1440     return responseBody;
1441   }
1442
1443
1444   /**
1445    * This method queryies ElasticSearch to determine if the supplied
1446    * index is present in the document store.
1447    *
1448    * @param indexName - The index to look for.
1449    * @return - An operation result indicating the success or failure of
1450    * the check.
1451    * @throws DocumentStoreOperationException
1452    */
1453   public OperationResult checkIndexExistence(String indexName)
1454       throws DocumentStoreOperationException {
1455
1456     // Initialize operation result with a failure codes / fault string
1457     OperationResult opResult = new OperationResult();
1458     opResult.setResultCode(500);
1459
1460     // Grab the current time so we can use it to generate a metrics log.
1461     MdcOverride override = getStartTime(new MdcOverride());
1462
1463     String fullUrl = getFullUrl("/" + indexName, false);
1464     HttpURLConnection conn = initializeConnection(fullUrl);
1465
1466     try {
1467       conn.setRequestMethod("HEAD");
1468
1469     } catch (ProtocolException e) {
1470       shutdownConnection(conn);
1471       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1472     }
1473
1474     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1475
1476     int resultCode;
1477     try {
1478       resultCode = conn.getResponseCode();
1479     } catch (IOException e) {
1480       shutdownConnection(conn);
1481       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1482     }
1483     logger.debug("Response Code : " + resultCode);
1484
1485     opResult.setResultCode(resultCode);
1486
1487     // Generate a metrics log so we can track how long the operation took.
1488     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1489         new LogFields()
1490             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1491             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1492         override,
1493         indexName);
1494
1495     shutdownConnection(conn);
1496
1497     return opResult;
1498   }
1499
1500
1501   private void buildDocumentResult(DocumentOperationResult result, String index)
1502       throws DocumentStoreOperationException {
1503
1504     JSONParser parser = new JSONParser();
1505     JSONObject root;
1506     try {
1507       root = (JSONObject) parser.parse(result.getResult());
1508
1509       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1510         // Success response object
1511         Document doc = new Document();
1512         doc.setEtag(result.getResultVersion());
1513         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1514
1515         doc.setContent((JSONObject) root.get("_source"));
1516         result.setDocument(doc);
1517
1518       } else {
1519         // Error response object
1520         JSONObject error = (JSONObject) root.get("error");
1521         if (error != null) {
1522           result.setError(new ErrorResult(error.get("type").toString(),
1523               error.get("reason").toString()));
1524         }
1525
1526       }
1527     } catch (Exception e) {
1528       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1529           + result.getResult());
1530     }
1531
1532
1533   }
1534
1535   private String buildDocumentResponseUrl(String index, String id) {
1536     return ApiUtils.buildDocumentUri(index, id);
1537   }
1538
1539   private void buildSearchResult(SearchOperationResult result, String index)
1540       throws DocumentStoreOperationException {
1541
1542     JSONParser parser = new JSONParser();
1543     JSONObject root;
1544
1545     try {
1546       root = (JSONObject) parser.parse(result.getResult());
1547       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1548         JSONObject hits = (JSONObject) root.get("hits");
1549         JSONArray hitArray = (JSONArray) hits.get("hits");
1550         SearchHits searchHits = new SearchHits();
1551         searchHits.setTotalHits(hits.get("total").toString());
1552         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1553
1554         for (int i = 0; i < hitArray.size(); i++) {
1555           JSONObject hit = (JSONObject) hitArray.get(i);
1556           SearchHit searchHit = new SearchHit();
1557           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1558           Document doc = new Document();
1559           if (hit.get("_version") != null) {
1560             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1561           }
1562
1563           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1564               ? hit.get("_id").toString() : ""));
1565           doc.setContent((JSONObject) hit.get("_source"));
1566           searchHit.setDocument(doc);
1567           searchHitArray.add(searchHit);
1568         }
1569         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1570         result.setSearchResult(searchHits);
1571
1572         JSONObject aggregations = (JSONObject) root.get("aggregations");
1573         if (aggregations != null) {
1574           AggregationResult[] aggResults =
1575               AggregationParsingUtil.parseAggregationResults(aggregations);
1576           AggregationResults aggs = new AggregationResults();
1577           aggs.setAggregations(aggResults);
1578           result.setAggregationResult(aggs);
1579         }
1580
1581         // success
1582       } else {
1583         JSONObject error = (JSONObject) root.get("error");
1584         if (error != null) {
1585           result.setError(new ErrorResult(error.get("type").toString(),
1586               error.get("reason").toString()));
1587         }
1588       }
1589     } catch (Exception e) {
1590       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1591           + result.getResult());
1592     }
1593
1594   }
1595
1596 }