Adding pass through shema creation
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
22
23 import com.att.aft.dme2.internal.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import edu.emory.mathcs.backport.java.util.Arrays;
30 import org.json.simple.JSONArray;
31 import org.json.simple.JSONObject;
32 import org.json.simple.parser.JSONParser;
33 import org.json.simple.parser.ParseException;
34 import org.onap.aai.sa.rest.AnalysisConfiguration;
35 import org.onap.aai.sa.rest.ApiUtils;
36 import org.onap.aai.sa.rest.BulkRequest;
37 import org.onap.aai.sa.rest.BulkRequest.OperationType;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
39 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
41 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
42 import org.onap.aai.sa.searchdbabstraction.entity.Document;
43 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
48 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
49 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
50 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
52 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
53 import org.onap.aai.cl.api.LogFields;
54 import org.onap.aai.cl.api.LogLine;
55 import org.onap.aai.cl.api.Logger;
56 import org.onap.aai.cl.eelf.LoggerFactory;
57 import org.onap.aai.cl.mdc.MdcContext;
58 import org.onap.aai.cl.mdc.MdcOverride;
59 import org.onap.aai.sa.rest.DocumentSchema;
60
61 import java.io.BufferedReader;
62 import java.io.File;
63 import java.io.FileInputStream;
64 import java.io.IOException;
65 import java.io.InputStream;
66 import java.io.InputStreamReader;
67 import java.io.OutputStream;
68 import java.io.OutputStreamWriter;
69 import java.io.PrintWriter;
70 import java.io.StringWriter;
71 import java.net.HttpURLConnection;
72 import java.net.MalformedURLException;
73 import java.net.ProtocolException;
74 import java.net.URL;
75 import java.text.SimpleDateFormat;
76 import java.util.ArrayList;
77 import java.util.List;
78 import java.util.Properties;
79 import java.util.concurrent.atomic.AtomicBoolean;
80 import javax.ws.rs.core.Response.Status;
81
82
83 /**
84  * This class has the Elasticsearch implementation of the
85  * DB operations defined in DocumentStoreInterface.
86  */
87 public class ElasticSearchHttpController implements DocumentStoreInterface {
88
89   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
90       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
91   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
92       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
93   private static final String BULK_IMPORT_INDEX_TEMPLATE =
94       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
95   private static final String BULK_DELETE_TEMPLATE =
96       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
97
98   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
99       "Internal Error: ElasticSearch operation fault occurred";
100   private static final Logger logger = LoggerFactory.getInstance()
101       .getLogger(ElasticSearchHttpController.class.getName());
102   private static final Logger metricsLogger = LoggerFactory.getInstance()
103       .getMetricsLogger(ElasticSearchHttpController.class.getName());
104   private final ElasticSearchConfig config;
105
106   private static final String DEFAULT_TYPE = "default";
107
108   private static ElasticSearchHttpController instance = null;
109
110   protected AnalysisConfiguration analysisConfig;
111
112   public static ElasticSearchHttpController getInstance() {
113
114     synchronized (ElasticSearchHttpController.class) {
115
116       if (instance == null) {
117
118         Properties properties = new Properties();
119         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
120         try {
121           properties.load(new FileInputStream(file));
122         } catch (Exception e) {
123           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
124             "ElasticSearchHTTPController.getInstance",
125             e.getLocalizedMessage());
126         }
127
128         ElasticSearchConfig config = new ElasticSearchConfig(properties);
129         instance = new ElasticSearchHttpController(config);
130       }
131     }
132
133     return instance;
134   }
135
136   public ElasticSearchHttpController(ElasticSearchConfig config) {
137     this.config = config;
138     analysisConfig = new AnalysisConfiguration();
139
140     try {
141       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
142       checkConnection();
143       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
144     } catch (Exception e) {
145       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
146           getFullUrl("", false), e.getMessage());
147     }
148   }
149
150
151   public AnalysisConfiguration getAnalysisConfig() {
152     return analysisConfig;
153   }
154
155   @Override
156   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
157
158     OperationResult result = new OperationResult();
159     result.setResultCode(500);
160
161     try {
162
163       // Submit the request to ElasticSearch to create the index using a
164       // default document type.
165       result = createTable(index,
166           DEFAULT_TYPE,
167           analysisConfig.getEsIndexSettings(),
168           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
169
170       // ElasticSearch will return us a 200 code on success when we
171       // want to report a 201, so translate the result here.
172       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
173       if (isSuccess(result)) {
174         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
175         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
176       }
177
178     } catch (DocumentStoreOperationException e) {
179
180       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
181     }
182
183     return result;
184   }
185
186   @Override
187   public OperationResult createDynamicIndex(String index, String dynamicSchema) {
188     OperationResult result = new OperationResult();
189     result.setResultCode(500);
190     
191     try {
192       result = createTable(index, dynamicSchema);
193       
194       // ElasticSearch will return us a 200 code on success when we
195       // want to report a 201, so translate the result here.
196       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
197       if (isSuccess(result)) {
198         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
199       }
200     } catch (DocumentStoreOperationException e) {
201       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
202     }
203     
204     return result;
205   }
206
207   @Override
208   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
209
210     //Initialize operation result with a failure codes / fault string
211     OperationResult opResult = new OperationResult();
212     opResult.setResultCode(500);
213     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
214
215     // Grab the current time so we can use it to generate a metrics log.
216     MdcOverride override = getStartTime(new MdcOverride());
217
218     String fullUrl = getFullUrl("/" + indexName + "/", false);
219     HttpURLConnection conn = initializeConnection(fullUrl);
220
221     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
222
223     try {
224       conn.setRequestMethod("DELETE");
225     } catch (ProtocolException e) {
226       shutdownConnection(conn);
227       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
228     }
229
230     handleResponse(conn, opResult);
231
232     // Generate a metrics log so we can track how long the operation took.
233     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
234         new LogFields()
235             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
236             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
237         override,
238         indexName);
239
240     shutdownConnection(conn);
241
242     return opResult;
243   }
244
245
246   private OperationResult checkConnection() throws Exception {
247
248     String fullUrl = getFullUrl("/_cluster/health", false);
249     URL url = null;
250     HttpURLConnection conn = null;
251
252     url = new URL(fullUrl);
253     conn = (HttpURLConnection) url.openConnection();
254     conn.setRequestMethod("GET");
255     conn.setDoOutput(true);
256     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
257
258     int resultCode = conn.getResponseCode();
259     logger.debug("getClusterHealth() response Code : " + resultCode);
260     OperationResult opResult = new OperationResult();
261     opResult.setResultCode(resultCode);
262
263     shutdownConnection(conn);
264
265     return opResult;
266   }
267
268   private String getFullUrl(String resourceUrl, boolean isSecure) {
269
270     final String host = config.getIpAddress();
271     final String port = config.getHttpPort();
272
273     if (isSecure) {
274       return String.format("https://%s:%s%s", host, port, resourceUrl);
275     } else {
276       return String.format("http://%s:%s%s", host, port, resourceUrl);
277     }
278   }
279
280   private void shutdownConnection(HttpURLConnection connection) {
281     if (connection == null) {
282       return;
283     }
284
285     InputStream inputstream = null;
286     OutputStream outputstream = null;
287
288     try {
289       inputstream = connection.getInputStream();
290     } catch (IOException e) {
291       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
292     } finally {
293       if (inputstream != null) {
294         try {
295           inputstream.close();
296         } catch (IOException e) {
297           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
298               e.getLocalizedMessage());
299         }
300       }
301     }
302
303     try {
304       outputstream = connection.getOutputStream();
305     } catch (IOException e) {
306       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
307     } finally {
308       if (outputstream != null) {
309         try {
310           outputstream.close();
311         } catch (IOException e) {
312           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
313               e.getLocalizedMessage());
314         }
315       }
316     }
317
318     connection.disconnect();
319   }
320
321   //@Override
322   protected OperationResult createTable(String indexName, String typeName,
323                                         String indexSettings, String indexMappings)
324       throws DocumentStoreOperationException {
325
326     if (indexSettings == null) {
327       logger.debug("No settings provided.");
328     }
329
330     if (indexMappings == null) {
331       logger.debug("No mappings provided.");
332     }
333
334     OperationResult opResult = new OperationResult();
335
336     // Initialize operation result with a failure codes / fault string
337     opResult.setResultCode(500);
338     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
339
340     // Grab the current time so we can use it to generate a metrics log.
341     MdcOverride override = getStartTime(new MdcOverride());
342
343     String fullUrl = getFullUrl("/" + indexName + "/", false);
344     HttpURLConnection conn = initializeConnection(fullUrl);
345
346     try {
347       conn.setRequestMethod("PUT");
348     } catch (ProtocolException e) {
349       shutdownConnection(conn);
350       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
351     }
352
353     StringBuilder sb = new StringBuilder(128);
354     sb.append("{ \"settings\" : ");
355     sb.append(indexSettings);
356     sb.append(",");
357
358     sb.append("\"mappings\" : {");
359     sb.append("\"" + typeName + "\" :");
360     sb.append(indexMappings);
361     sb.append("}}");
362
363     attachContent(conn, sb.toString());
364
365     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
366     logger.debug("Request content: " + sb.toString());
367
368     handleResponse(conn, opResult);
369
370     shutdownConnection(conn);
371
372     // Generate a metrics log so we can track how long the operation took.
373     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
374         new LogFields()
375             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
376             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
377         override,
378         indexName);
379
380     return opResult;
381   }
382
383     /**
384    * Will send the passed in JSON payload to Elasticsearch using the 
385    * provided index name in an attempt to create the index.
386    * 
387    * @param indexName - The name of the index to be created
388    * @param settingsAndMappings - The actual JSON object that will define the index
389    * @return - The operation result of writing into Elasticsearch
390    * @throws DocumentStoreOperationException
391    */
392   protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
393     OperationResult result = new OperationResult();
394     result.setResultCode(500);
395     result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
396     
397     // Grab the current time so we can use it to generate a metrics log.
398     MdcOverride override = getStartTime(new MdcOverride());
399     
400     String fullUrl = getFullUrl("/" + indexName + "/", false);
401     HttpURLConnection conn = initializeConnection(fullUrl);
402
403     try {
404       conn.setRequestMethod("PUT");
405     } catch (ProtocolException e) {
406       shutdownConnection(conn);
407       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
408     }
409     
410     attachContent(conn, settingsAndMappings);
411     handleResponse(conn, result);
412     
413     // Generate a metrics log so we can track how long the operation took.
414     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
415         new LogFields()
416             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
417             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
418         override,
419         indexName);
420     
421     return result;
422   }
423
424   @Override
425   public DocumentOperationResult createDocument(String                  indexName, 
426                                                 DocumentStoreDataEntity document,
427                                                 boolean                 allowImplicitIndexCreation)
428       throws DocumentStoreOperationException {
429     
430     if(!allowImplicitIndexCreation) {
431       
432       // Before we do anything, make sure that the specified index actually exists in the
433       // document store - we don't want to rely on ElasticSearch to fail the document
434       // create because it could be configured to implicitly create a non-existent index,
435       // which can lead to hard-to-debug behaviour with queries down the road.
436       OperationResult indexExistsResult = checkIndexExistence(indexName);
437       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
438   
439         DocumentOperationResult opResult = new DocumentOperationResult();
440         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
441         opResult.setResult("Document Index '" + indexName + "' does not exist.");
442         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
443         return opResult;
444       }
445     }
446     
447     if (document.getId() == null || document.getId().isEmpty()) {
448       return createDocumentWithoutId(indexName, document);
449     } else {
450       return createDocumentWithId(indexName, document);
451     }
452   }
453
454   private DocumentOperationResult createDocumentWithId(String indexName,
455                                                        DocumentStoreDataEntity document)
456       throws DocumentStoreOperationException {
457     // check if the document already exists
458     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
459
460
461     if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
462       if (opResult.getResultCode() == Status.OK.getStatusCode()) {
463         opResult.setFailureCause("A document with the same id already exists.");
464       } else {
465         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
466       }
467       opResult.setResultCode(Status.CONFLICT.getStatusCode());
468       return opResult;
469     }
470
471     opResult = new DocumentOperationResult();
472     // Initialize operation result with a failure codes / fault string
473     opResult.setResultCode(500);
474     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
475
476     // Grab the current time so we can use it to generate a metrics log.
477     MdcOverride override = getStartTime(new MdcOverride());
478
479     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
480         + "/" + document.getId(), false);
481     HttpURLConnection conn = initializeConnection(fullUrl);
482
483     try {
484       conn.setRequestMethod("PUT");
485     } catch (ProtocolException e) {
486       shutdownConnection(conn);
487       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
488     }
489
490     attachDocument(conn, document);
491
492     logger.debug("Sending 'PUT' request to: " + conn.getURL());
493
494     handleResponse(conn, opResult);
495     buildDocumentResult(opResult, indexName);
496
497     // Generate a metrics log so we can track how long the operation took.
498     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
499         new LogFields()
500             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
501             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
502         override,
503         indexName);
504
505     shutdownConnection(conn);
506
507     return opResult;
508
509   }
510
511   private DocumentOperationResult createDocumentWithoutId(String indexName,
512                                                           DocumentStoreDataEntity document)
513       throws DocumentStoreOperationException {
514
515     DocumentOperationResult response = new DocumentOperationResult();
516     // Initialize operation result with a failure codes / fault string
517     response.setResultCode(500);
518     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
519
520     // Grab the current time so we can use it to generate a metrics log.
521     MdcOverride override = getStartTime(new MdcOverride());
522
523     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
524     HttpURLConnection conn = initializeConnection(fullUrl);
525
526     try {
527       conn.setRequestMethod("POST");
528     } catch (ProtocolException e) {
529       shutdownConnection(conn);
530       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
531     }
532
533     attachDocument(conn, document);
534
535     logger.debug("Sending 'POST' request to: " + conn.getURL());
536
537     handleResponse(conn, response);
538     buildDocumentResult(response, indexName);
539
540     // Generate a metrics log so we can track how long the operation took.
541     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
542         new LogFields()
543             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
544             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
545         override,
546         indexName);
547
548     shutdownConnection(conn);
549
550     return response;
551   }
552
553   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
554       throws DocumentStoreOperationException {
555     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
556     conn.setRequestProperty("Connection", "Close");
557
558     attachContent(conn, doc.getContentInJson());
559   }
560
561   private DocumentOperationResult checkDocumentExistence(String indexName,
562                                                          String docId)
563       throws DocumentStoreOperationException {
564     DocumentOperationResult opResult = new DocumentOperationResult();
565
566     // Initialize operation result with a failure codes / fault string
567     opResult.setResultCode(500);
568
569     // Grab the current time so we can use it to generate a metrics log.
570     MdcOverride override = getStartTime(new MdcOverride());
571
572     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
573     HttpURLConnection conn = initializeConnection(fullUrl);
574
575     try {
576       conn.setRequestMethod("HEAD");
577     } catch (ProtocolException e) {
578       shutdownConnection(conn);
579       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
580     }
581
582     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
583
584     int resultCode;
585     try {
586       resultCode = conn.getResponseCode();
587     } catch (IOException e) {
588       shutdownConnection(conn);
589       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
590     }
591
592     logger.debug("Response Code : " + resultCode);
593
594     opResult.setResultCode(resultCode);
595
596     // Generate a metrics log so we can track how long the operation took.
597     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
598         new LogFields()
599             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
600             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
601         override,
602         indexName,
603         docId);
604
605     shutdownConnection(conn);
606
607     return opResult;
608   }
609
610   @Override
611   public DocumentOperationResult updateDocument(String                  indexName, 
612                                                 DocumentStoreDataEntity document,
613                                                 boolean                 allowImplicitIndexCreation)
614       throws DocumentStoreOperationException {
615     
616     if(!allowImplicitIndexCreation) {
617       
618       // Before we do anything, make sure that the specified index actually exists in the
619       // document store - we don't want to rely on ElasticSearch to fail the document
620       // create because it could be configured to implicitly create a non-existent index,
621       // which can lead to hard-to-debug behaviour with queries down the road.
622       OperationResult indexExistsResult = checkIndexExistence(indexName);
623       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
624   
625         DocumentOperationResult opResult = new DocumentOperationResult();
626         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
627         opResult.setResult("Document Index '" + indexName + "' does not exist.");
628         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
629         return opResult;
630       }
631     }
632     
633     DocumentOperationResult opResult = new DocumentOperationResult();
634
635     // Initialize operation result with a failure codes / fault string
636     opResult.setResultCode(500);
637     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
638
639     // Grab the current time so we can use it to generate a metrics log.
640     MdcOverride override = getStartTime(new MdcOverride());
641
642     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
643         + "?version=" + document.getVersion(), false);
644     HttpURLConnection conn = initializeConnection(fullUrl);
645
646     try {
647       conn.setRequestMethod("PUT");
648     } catch (ProtocolException e) {
649       shutdownConnection(conn);
650       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
651     }
652
653     attachDocument(conn, document);
654
655     logger.debug("Sending 'PUT' request to: " + conn.getURL());
656
657     handleResponse(conn, opResult);
658     buildDocumentResult(opResult, indexName);
659
660     // Generate a metrics log so we can track how long the operation took.
661     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
662         new LogFields()
663             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
664             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
665         override,
666         indexName,
667         document.getId());
668
669     shutdownConnection(conn);
670
671     return opResult;
672   }
673
674   @Override
675   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
676       throws DocumentStoreOperationException {
677     DocumentOperationResult opResult = new DocumentOperationResult();
678
679     // Initialize operation result with a failure codes / fault string
680     opResult.setResultCode(500);
681     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
682
683     // Grab the current time so we can use it to generate a metrics log.
684     MdcOverride override = getStartTime(new MdcOverride());
685
686     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
687         + "?version=" + document.getVersion(), false);
688     HttpURLConnection conn = initializeConnection(fullUrl);
689
690     try {
691       conn.setRequestMethod("DELETE");
692     } catch (ProtocolException e) {
693       shutdownConnection(conn);
694       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
695     }
696
697     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
698
699     handleResponse(conn, opResult);
700     buildDocumentResult(opResult, indexName);
701     //supress the etag and url in response for delete as they are not required
702     if (opResult.getDocument() != null) {
703       opResult.getDocument().setEtag(null);
704       opResult.getDocument().setUrl(null);
705     }
706
707     // Generate a metrics log so we can track how long the operation took.
708     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
709         new LogFields()
710             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
711             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
712         override,
713         indexName,
714         document.getId());
715
716     shutdownConnection(conn);
717
718     return opResult;
719   }
720
721   @Override
722   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
723       throws DocumentStoreOperationException {
724     DocumentOperationResult opResult = new DocumentOperationResult();
725
726     // Initialize operation result with a failure codes / fault string
727     opResult.setResultCode(500);
728     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
729
730     // Grab the current time so we can use it to generate a metrics log.
731     MdcOverride override = getStartTime(new MdcOverride());
732
733     String fullUrl = null;
734     if (document.getVersion() == null) {
735       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
736     } else {
737       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
738           + "?version=" + document.getVersion(), false);
739     }
740     HttpURLConnection conn = initializeConnection(fullUrl);
741
742     logger.debug("\nSending 'GET' request to: " + conn.getURL());
743
744     handleResponse(conn, opResult);
745     buildDocumentResult(opResult, indexName);
746
747     // Generate a metrics log so we can track how long the operation took.
748     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
749         new LogFields()
750             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
751             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
752         override,
753         indexName,
754         document.getId());
755
756     shutdownConnection(conn);
757
758     return opResult;
759   }
760
761   public SearchOperationResult search(String indexName, String queryString)
762       throws DocumentStoreOperationException {
763     SearchOperationResult opResult = new SearchOperationResult();
764
765     // Initialize operation result with a failure codes / fault string
766     opResult.setResultCode(500);
767     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
768
769     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
770
771     // Grab the current time so we can use it to generate a metrics log.
772     MdcOverride override = getStartTime(new MdcOverride());
773
774     HttpURLConnection conn = initializeConnection(fullUrl);
775
776     try {
777       conn.setRequestMethod("GET");
778     } catch (ProtocolException e) {
779       shutdownConnection(conn);
780       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
781     }
782
783     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
784
785     handleResponse(conn, opResult);
786     buildSearchResult(opResult, indexName);
787
788
789     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
790         new LogFields()
791             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
792             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
793         override,
794         indexName,
795         queryString);
796
797     return opResult;
798   }
799
800   public SearchOperationResult searchWithPayload(String indexName, String query)
801       throws DocumentStoreOperationException {
802     SearchOperationResult opResult = new SearchOperationResult();
803
804     if (logger.isDebugEnabled()) {
805       logger.debug("Querying index: " + indexName + " with query string: " + query);
806     }
807
808     // Initialize operation result with a failure codes / fault string
809     opResult.setResultCode(500);
810     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
811
812     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
813
814     // Grab the current time so we can use it to generate a metrics log.
815     MdcOverride override = getStartTime(new MdcOverride());
816
817     HttpURLConnection conn = initializeConnection(fullUrl);
818
819     try {
820       conn.setRequestMethod("POST");
821     } catch (ProtocolException e) {
822       shutdownConnection(conn);
823       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
824     }
825
826     attachContent(conn, query);
827
828     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
829     logger.debug("Request body =  Elasticsearch query = " + query);
830
831     handleResponse(conn, opResult);
832     buildSearchResult(opResult, indexName);
833
834     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
835         new LogFields()
836             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
837             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
838         override,
839         indexName,
840         query);
841
842     shutdownConnection(conn);
843
844     return opResult;
845   }
846
847   private void attachContent(HttpURLConnection conn, String content)
848       throws DocumentStoreOperationException {
849     OutputStream outputStream = null;
850     OutputStreamWriter out = null;
851
852     try {
853       outputStream = conn.getOutputStream();
854     } catch (IOException e) {
855       shutdownConnection(conn);
856       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
857     }
858
859     out = new OutputStreamWriter(outputStream);
860
861     try {
862       out.write(content);
863       out.close();
864     } catch (IOException e) {
865       shutdownConnection(conn);
866       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
867     }
868   }
869
870   private HttpURLConnection initializeConnection(String fullUrl)
871       throws DocumentStoreOperationException {
872     URL url = null;
873     HttpURLConnection conn = null;
874
875     try {
876       url = new URL(fullUrl);
877     } catch (MalformedURLException e) {
878       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
879     }
880
881     try {
882       conn = (HttpURLConnection) url.openConnection();
883       conn.setDoOutput(true);
884     } catch (IOException e) {
885       shutdownConnection(conn);
886       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
887     }
888
889     return conn;
890   }
891
892   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
893       throws DocumentStoreOperationException {
894     int resultCode = 200;
895
896     try {
897       resultCode = conn.getResponseCode();
898     } catch (IOException e) {
899       shutdownConnection(conn);
900       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
901     }
902
903     logger.debug("Response Code : " + resultCode);
904
905     InputStream inputStream = null;
906
907     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
908       inputStream = conn.getErrorStream();
909     } else {
910       try {
911         inputStream = conn.getInputStream();
912       } catch (IOException e) {
913         shutdownConnection(conn);
914         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
915       }
916     }
917
918     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
919     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
920
921     StringBuilder result = new StringBuilder(128);
922     String string = null;
923
924     try {
925       while ((string = bufferedreader.readLine()) != null) {
926         result.append(string).append("\n");
927       }
928     } catch (IOException e) {
929       shutdownConnection(conn);
930       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
931     }
932
933     if (resultCode == Status.CONFLICT.getStatusCode()) {
934       opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
935     } else {
936       opResult.setResultCode(resultCode);
937     }
938     if (logger.isDebugEnabled()) {
939       logger.debug("Raw result string from ElasticSearch = " + result.toString());
940     }
941     opResult.setResult(result.toString());
942     opResult.setResultVersion(extractVersion(result.toString()));
943   }
944
945   private String extractVersion(String result) throws DocumentStoreOperationException {
946
947     JSONParser parser = new JSONParser();
948     String version = null;
949     try {
950       JSONObject root = (JSONObject) parser.parse(result);
951       if (root.get("_version") != null) {
952         version = root.get("_version").toString();
953       }
954
955     } catch (ParseException e) {
956
957       // Not all responses from ElasticSearch include a version, so
958       // if we don't get one back, just return an empty string rather
959       // than trigger a false failure.
960       version = "";
961     }
962     return version;
963   }
964
965   /**
966    * This convenience method gets the current system time and stores
967    * it in an attribute in the supplied {@link MdcOverride} object so
968    * that it can be used later by the metrics logger.
969    *
970    * @param override - The {@link MdcOverride} object to update.
971    * @return - The supplied {@link MdcOverride} object.
972    */
973   private MdcOverride getStartTime(MdcOverride override) {
974
975     // Grab the current time...
976     long startTimeInMs = System.currentTimeMillis();
977
978     // ...and add it as an attribute to the supplied MDC Override
979     // object.
980     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
981     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
982
983     // Return the MdcOverride object that we were passed.
984     // This looks odd, but it allows us to do stuff like:
985     //
986     //    MdcOverride ov = getStartTime(new MdcOverride())
987     //
988     // which is quite handy, but also allows us to pass in an existing
989     // MdcOverride object which already has some attributes set.
990     return override;
991   }
992
993   private boolean isSuccess(OperationResult result) {
994
995     return isSuccessCode(result.getResultCode());
996   }
997
998
999   private boolean isSuccessCode(int statusCode) {
1000     return ((statusCode >= 200) && (statusCode < 300));
1001   }
1002
1003
1004   @Override
1005   public OperationResult performBulkOperations(BulkRequest[] requests)
1006       throws DocumentStoreOperationException {
1007
1008     if (logger.isDebugEnabled()) {
1009       String dbgString = "ESController: performBulkOperations - Operations: ";
1010
1011       for (BulkRequest request : requests) {
1012         dbgString += "[" + request.toString() + "] ";
1013       }
1014
1015       logger.debug(dbgString);
1016     }
1017
1018     // Grab the current time so we can use it to generate a metrics log.
1019     MdcOverride override = getStartTime(new MdcOverride());
1020
1021     // Parse the supplied set of operations.
1022     // Iterate over the list of operations which we were provided and
1023     // translate them into a format that ElasticSearh understands.
1024     int opCount = 0;
1025     StringBuilder esOperationSet = new StringBuilder(128);
1026     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1027     for (BulkRequest request : requests) {
1028
1029       // Convert the request to the syntax ElasticSearch likes.
1030       if (buildEsOperation(request, esOperationSet, rejected)) {
1031         opCount++;
1032       }
1033     }
1034
1035     ElasticSearchBulkOperationResult opResult = null;
1036     if (opCount > 0) {
1037
1038       // Open an HTTP connection to the ElasticSearch back end.
1039       String fullUrl = getFullUrl("/_bulk", false);
1040       URL url;
1041       HttpURLConnection conn;
1042       try {
1043
1044         url = new URL(fullUrl);
1045         conn = (HttpURLConnection) url.openConnection();
1046         conn.setRequestMethod("PUT");
1047         conn.setDoOutput(true);
1048         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1049         conn.setRequestProperty("Connection", "Close");
1050
1051       } catch (IOException e) {
1052
1053         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1054         if (logger.isDebugEnabled()) {
1055           logger.debug(Throwables.getStackTraceAsString(e));
1056         }
1057
1058         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
1059             + e.getMessage(), e);
1060       }
1061
1062       StringBuilder bulkResult = new StringBuilder(128);
1063       try {
1064         // Create an output stream to write our request to.
1065         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1066         ;
1067
1068         if (logger.isDebugEnabled()) {
1069           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1070           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1071               "\\n"));
1072         }
1073
1074         // Write the resulting request string to our output stream. (this sends the request to ES?)
1075         out.write(esOperationSet.toString());
1076         out.close();
1077
1078         // Open an input stream on our connection in order to read back the results.
1079         InputStream is = conn.getInputStream();
1080         InputStreamReader inputstreamreader = new InputStreamReader(is);
1081         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1082
1083         // Read the contents of the input stream into our result string...
1084         String esResponseString = null;
1085
1086         while ((esResponseString = bufferedreader.readLine()) != null) {
1087           bulkResult.append(esResponseString).append("\n");
1088         }
1089
1090       } catch (IOException e) {
1091
1092         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1093         if (logger.isDebugEnabled()) {
1094           StringWriter sw = new StringWriter();
1095           e.printStackTrace(new PrintWriter(sw));
1096           logger.debug(sw.toString());
1097         }
1098
1099         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1100             + e.getMessage(), e);
1101       }
1102
1103       if (logger.isDebugEnabled()) {
1104         logger.debug("ESController: Received result string from ElasticSearch: = "
1105             + bulkResult.toString());
1106       }
1107
1108       // ...and marshal the resulting string into a Java object.
1109       try {
1110         opResult = marshallEsBulkResult(bulkResult.toString());
1111
1112       } catch (IOException e) {
1113
1114         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1115         if (logger.isDebugEnabled()) {
1116           logger.debug(Throwables.getStackTraceAsString(e));
1117         }
1118
1119         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1120             + e.getMessage(), e);
1121       }
1122     }
1123
1124     // Finally, build the operation result and return it to the caller.
1125     OperationResult result = new OperationResult();
1126     result.setResultCode(207);
1127     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1128
1129     // In the success case we don't want the entire result string to be
1130     // dumped into the metrics log, so concatenate it.
1131     String resultStringForMetricsLog = result.getResult();
1132     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1133       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1134           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1135     }
1136
1137     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1138         new LogFields()
1139             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1140             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1141         override);
1142
1143     return result;
1144   }
1145
1146
1147   /**
1148    * This method converts a {@link BulkRequest} object into a json structure
1149    * which can be understood by ElasticSearch.
1150    *
1151    * @param request - The request to be performed.
1152    * @param sb      - The string builder to append the json data to
1153    * @throws DocumentStoreOperationException
1154    */
1155   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1156                                    List<ElasticSearchResultItem> fails)
1157       throws DocumentStoreOperationException {
1158
1159     boolean retVal = true;
1160     OperationResult indexExistsResult = null;
1161
1162     // What kind of operation are we performing?
1163     switch (request.getOperationType()) {
1164
1165       // Create a new document.
1166       case CREATE:
1167
1168         // Make sure that we were supplied a document payload.
1169         if (request.getOperation().getDocument() == null) {
1170
1171           fails.add(generateRejectionEntry(request.getOperationType(),
1172               "Missing document payload",
1173               request.getIndex(),
1174               request.getId(),
1175               400,
1176               request.getOperation().getMetaData().getUrl()));
1177           return false;
1178         }
1179
1180         // Make sure that the supplied document URL is formatted
1181         // correctly.
1182         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1183           fails.add(generateRejectionEntry(request.getOperationType(),
1184               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1185               request.getIndex(),
1186               "",
1187               400,
1188               request.getOperation().getMetaData().getUrl()));
1189           return false;
1190         }
1191
1192         // Validate that the specified index actually exists before we
1193         // try to perform the create.
1194         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1195
1196           fails.add(generateRejectionEntry(request.getOperationType(),
1197               "Specified resource does not exist: "
1198                   + request.getOperation().getMetaData().getUrl(),
1199               request.getIndex(),
1200               request.getId(),
1201               404,
1202               request.getOperation().getMetaData().getUrl()));
1203           return false;
1204         }
1205
1206         // If we were supplied an id for the new document, then
1207         // include it in the bulk operation to Elastic Search
1208         if (request.getId() == null) {
1209
1210           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1211               request.getIndex(),
1212               DEFAULT_TYPE));
1213
1214           // Otherwise, we just leave that parameter off and ElasticSearch
1215           // will generate one for us.
1216         } else {
1217           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1218               request.getIndex(),
1219               DEFAULT_TYPE,
1220               request.getId()));
1221         }
1222
1223         try {
1224           // Append the document that we want to create.
1225           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1226         } catch (JsonProcessingException e) {
1227           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1228         }
1229
1230         break;
1231
1232       // Update an existing document.
1233       case UPDATE:
1234
1235         // Make sure that we were supplied a document payload.
1236         if (request.getOperation().getDocument() == null) {
1237
1238           fails.add(generateRejectionEntry(request.getOperationType(),
1239               "Missing document payload",
1240               request.getIndex(),
1241               request.getId(),
1242               400,
1243               request.getOperation().getMetaData().getUrl()));
1244           return false;
1245         }
1246
1247         // Make sure that the supplied document URL is formatted
1248         // correctly.
1249         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1250           fails.add(generateRejectionEntry(request.getOperationType(),
1251               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1252               request.getIndex(),
1253               "",
1254               400,
1255               request.getOperation().getMetaData().getUrl()));
1256           return false;
1257         }
1258
1259         // Validate that the specified index actually exists before we
1260         // try to perform the update.
1261         if (!indexExists(request.getIndex())) {
1262
1263           fails.add(generateRejectionEntry(request.getOperationType(),
1264               "Specified resource does not exist: "
1265                   + request.getOperation().getMetaData().getUrl(),
1266               request.getIndex(),
1267               request.getId(),
1268               404,
1269               request.getOperation().getMetaData().getUrl()));
1270           return false;
1271         }
1272
1273         // Validate that the document we are trying to update actually
1274         // exists before we try to perform the update.
1275         if (!documentExists(request.getIndex(), request.getId())) {
1276
1277           fails.add(generateRejectionEntry(request.getOperationType(),
1278               "Specified resource does not exist: "
1279                   + request.getOperation().getMetaData().getUrl(),
1280               request.getIndex(),
1281               request.getId(),
1282               404,
1283               request.getOperation().getMetaData().getUrl()));
1284           return false;
1285         }
1286
1287         // It is mandatory that a version be supplied for an update operation,
1288         // so validate that now.
1289         if (request.getOperation().getMetaData().getEtag() == null) {
1290
1291           fails.add(generateRejectionEntry(request.getOperationType(),
1292               "Missing mandatory ETag field",
1293               request.getIndex(),
1294               request.getId(),
1295               400,
1296               request.getOperation().getMetaData().getUrl()));
1297           return false;
1298         }
1299
1300         // Generate the update request...
1301         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1302             request.getIndex(),
1303             DEFAULT_TYPE,
1304             request.getId(),
1305             request.getOperation().getMetaData().getEtag()));
1306
1307         // ...and append the document that we want to update.
1308         try {
1309           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1310         } catch (JsonProcessingException e) {
1311           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1312         }
1313         break;
1314
1315       // Delete an existing document.
1316       case DELETE:
1317
1318         // Make sure that the supplied document URL is formatted
1319         // correctly.
1320         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1321           fails.add(generateRejectionEntry(request.getOperationType(),
1322               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1323               request.getIndex(),
1324               "",
1325               400,
1326               request.getOperation().getMetaData().getUrl()));
1327           return false;
1328         }
1329
1330         // Validate that the specified index actually exists before we
1331         // try to perform the delete.
1332         if (!indexExists(request.getIndex())) {
1333
1334           fails.add(generateRejectionEntry(request.getOperationType(),
1335               "Specified resource does not exist: "
1336                   + request.getOperation().getMetaData().getUrl(),
1337               request.getIndex(),
1338               request.getId(),
1339               404,
1340               request.getOperation().getMetaData().getUrl()));
1341           return false;
1342         }
1343
1344         // Validate that the document we are trying to update actually
1345         // exists before we try to perform the delete.
1346         if (!documentExists(request.getIndex(), request.getId())) {
1347
1348           fails.add(generateRejectionEntry(request.getOperationType(),
1349               "Specified resource does not exist: "
1350                   + request.getOperation().getMetaData().getUrl(),
1351               request.getIndex(),
1352               request.getId(),
1353               404,
1354               request.getOperation().getMetaData().getUrl()));
1355           return false;
1356         }
1357
1358         // It is mandatory that a version be supplied for a delete operation,
1359         // so validate that now.
1360         if (request.getOperation().getMetaData().getEtag() == null) {
1361
1362           fails.add(generateRejectionEntry(request.getOperationType(),
1363               "Missing mandatory ETag field",
1364               request.getIndex(),
1365               request.getId(),
1366               400,
1367               request.getOperation().getMetaData().getUrl()));
1368           return false;
1369         }
1370
1371         // Generate the delete request.
1372         sb.append(String.format(BULK_DELETE_TEMPLATE,
1373             request.getIndex(),
1374             DEFAULT_TYPE,
1375             request.getId(),
1376             request.getOperation().getMetaData().getEtag()));
1377         break;
1378       default:
1379     }
1380
1381     return retVal;
1382   }
1383
1384   private boolean indexExists(String index) throws DocumentStoreOperationException {
1385
1386     OperationResult indexExistsResult = checkIndexExistence(index);
1387
1388     return ((indexExistsResult.getResultCode() >= 200)
1389         && (indexExistsResult.getResultCode() < 300));
1390   }
1391
1392   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1393
1394     OperationResult docExistsResult = checkDocumentExistence(index, id);
1395
1396     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1397   }
1398
1399   /**
1400    * This method constructs a status entry for a bulk operation which has
1401    * been rejected before even sending it to the document store.
1402    *
1403    * @param rejectReason - A message describing why the operation was rejected.
1404    * @param anId         - The identifier associated with the document being
1405    *                     acted on.
1406    * @param statusCode  - An HTTP status code.
1407    * @return - A result set item.
1408    */
1409   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1410                                                          String rejectReason,
1411                                                          String index,
1412                                                          String anId,
1413                                                          int statusCode,
1414                                                          String originalUrl) {
1415
1416     ElasticSearchError err = new ElasticSearchError();
1417     err.setReason(rejectReason);
1418
1419     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1420     op.setIndex(index);
1421     op.setId(anId);
1422     op.setStatus(statusCode);
1423     op.setError(err);
1424     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1425
1426     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1427
1428     switch (opType) {
1429       case CREATE:
1430         rejectionResult.setCreate(op);
1431         break;
1432       case UPDATE:
1433         rejectionResult.setIndex(op);
1434         break;
1435       case DELETE:
1436         rejectionResult.setDelete(op);
1437         break;
1438       default:
1439     }
1440
1441     return rejectionResult;
1442   }
1443
1444
1445   /**
1446    * This method takes the json structure returned from ElasticSearch in
1447    * response to a bulk operations request and marshals it into a Java
1448    * object.
1449    *
1450    * @param jsonResult - The bulk operations response returned from
1451    *                   ElasticSearch.
1452    * @return - The marshalled response.
1453    * @throws JsonParseException
1454    * @throws JsonMappingException
1455    * @throws IOException
1456    */
1457   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1458       throws JsonParseException, JsonMappingException, IOException {
1459
1460     if (jsonResult != null) {
1461       if (logger.isDebugEnabled()) {
1462         logger.debug("ESController: Marshalling ES result set from json: "
1463             + jsonResult.replaceAll("\n", ""));
1464       }
1465
1466       ObjectMapper mapper = new ObjectMapper();
1467       mapper.setSerializationInclusion(Include.NON_EMPTY);
1468
1469       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1470     }
1471
1472     return null;
1473   }
1474
1475
1476   /**
1477    * This method takes the marshalled ElasticSearch bulk response and
1478    * converts it into a generic response payload.
1479    *
1480    * @param esResult - ElasticSearch bulk operations response.
1481    * @return - A generic result set.
1482    */
1483   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1484                                            List<ElasticSearchResultItem> rejectedOps) {
1485
1486     int totalOps = 0;
1487     int totalSuccess = 0;
1488     int totalFails = 0;
1489
1490     if (logger.isDebugEnabled()) {
1491
1492       logger.debug("ESController: Build generic result set.  ES Results: "
1493           + ((esResult != null) ? esResult.toString() : "[]")
1494           + " Rejected Ops: " + rejectedOps.toString());
1495     }
1496
1497     // Build a combined list of result items from the results returned
1498     // from ElasticSearch and the list of operations that we rejected
1499     // without sending to ElasticSearch.
1500     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1501     if (esResult != null) {
1502       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1503     }
1504     combinedResults.addAll(rejectedOps);
1505
1506     // Iterate over the individual results in the resulting result set.
1507     StringBuilder resultsBuilder = new StringBuilder();
1508     AtomicBoolean firstItem = new AtomicBoolean(true);
1509     for (ElasticSearchResultItem item : combinedResults) {
1510
1511       // Increment the operation counts.
1512       totalOps++;
1513       if (isSuccessCode(item.operationStatus().getStatus())) {
1514         totalSuccess++;
1515       } else {
1516         totalFails++;
1517       }
1518
1519       // Prepend a comma to our response string unless this it the
1520       // first result in the set.
1521       if (!firstItem.compareAndSet(true, false)) {
1522         resultsBuilder.append(", ");
1523       }
1524
1525       // Append the current result as a generic json structure.
1526       resultsBuilder.append(item.toJson());
1527     }
1528
1529     // Now, build the result string and return it.
1530     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1531         + "\"total_success\": " + totalSuccess + ", "
1532         + "\"total_fails\": " + totalFails + ", "
1533         + "\"results\": ["
1534         + resultsBuilder.toString()
1535         + "]}";
1536
1537     return responseBody;
1538   }
1539
1540
1541   /**
1542    * This method queryies ElasticSearch to determine if the supplied
1543    * index is present in the document store.
1544    *
1545    * @param indexName - The index to look for.
1546    * @return - An operation result indicating the success or failure of
1547    * the check.
1548    * @throws DocumentStoreOperationException
1549    */
1550   public OperationResult checkIndexExistence(String indexName)
1551       throws DocumentStoreOperationException {
1552
1553     // Initialize operation result with a failure codes / fault string
1554     OperationResult opResult = new OperationResult();
1555     opResult.setResultCode(500);
1556
1557     // Grab the current time so we can use it to generate a metrics log.
1558     MdcOverride override = getStartTime(new MdcOverride());
1559
1560     String fullUrl = getFullUrl("/" + indexName, false);
1561     HttpURLConnection conn = initializeConnection(fullUrl);
1562
1563     try {
1564       conn.setRequestMethod("HEAD");
1565
1566     } catch (ProtocolException e) {
1567       shutdownConnection(conn);
1568       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1569     }
1570
1571     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1572
1573     int resultCode;
1574     try {
1575       resultCode = conn.getResponseCode();
1576     } catch (IOException e) {
1577       shutdownConnection(conn);
1578       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1579     }
1580     logger.debug("Response Code : " + resultCode);
1581
1582     opResult.setResultCode(resultCode);
1583
1584     // Generate a metrics log so we can track how long the operation took.
1585     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1586         new LogFields()
1587             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1588             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1589         override,
1590         indexName);
1591
1592     shutdownConnection(conn);
1593
1594     return opResult;
1595   }
1596
1597
1598   private void buildDocumentResult(DocumentOperationResult result, String index)
1599       throws DocumentStoreOperationException {
1600
1601     JSONParser parser = new JSONParser();
1602     JSONObject root;
1603     try {
1604       root = (JSONObject) parser.parse(result.getResult());
1605
1606       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1607         // Success response object
1608         Document doc = new Document();
1609         doc.setEtag(result.getResultVersion());
1610         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1611
1612         doc.setContent((JSONObject) root.get("_source"));
1613         result.setDocument(doc);
1614
1615       } else {
1616         // Error response object
1617         JSONObject error = (JSONObject) root.get("error");
1618         if (error != null) {
1619           result.setError(new ErrorResult(error.get("type").toString(),
1620               error.get("reason").toString()));
1621         }
1622
1623       }
1624     } catch (Exception e) {
1625       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1626           + result.getResult());
1627     }
1628
1629
1630   }
1631
1632   private String buildDocumentResponseUrl(String index, String id) {
1633     return ApiUtils.buildDocumentUri(index, id);
1634   }
1635
1636   private void buildSearchResult(SearchOperationResult result, String index)
1637       throws DocumentStoreOperationException {
1638
1639     JSONParser parser = new JSONParser();
1640     JSONObject root;
1641
1642     try {
1643       root = (JSONObject) parser.parse(result.getResult());
1644       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1645         JSONObject hits = (JSONObject) root.get("hits");
1646         JSONArray hitArray = (JSONArray) hits.get("hits");
1647         SearchHits searchHits = new SearchHits();
1648         searchHits.setTotalHits(hits.get("total").toString());
1649         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1650
1651         for (int i = 0; i < hitArray.size(); i++) {
1652           JSONObject hit = (JSONObject) hitArray.get(i);
1653           SearchHit searchHit = new SearchHit();
1654           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1655           Document doc = new Document();
1656           if (hit.get("_version") != null) {
1657             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1658           }
1659
1660           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1661               ? hit.get("_id").toString() : ""));
1662           doc.setContent((JSONObject) hit.get("_source"));
1663           searchHit.setDocument(doc);
1664           searchHitArray.add(searchHit);
1665         }
1666         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1667         result.setSearchResult(searchHits);
1668
1669         JSONObject aggregations = (JSONObject) root.get("aggregations");
1670         if (aggregations != null) {
1671           AggregationResult[] aggResults =
1672               AggregationParsingUtil.parseAggregationResults(aggregations);
1673           AggregationResults aggs = new AggregationResults();
1674           aggs.setAggregations(aggResults);
1675           result.setAggregationResult(aggs);
1676         }
1677
1678         // success
1679       } else {
1680         JSONObject error = (JSONObject) root.get("error");
1681         if (error != null) {
1682           result.setError(new ErrorResult(error.get("type").toString(),
1683               error.get("reason").toString()));
1684         }
1685       }
1686     } catch (Exception e) {
1687       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1688           + result.getResult());
1689     }
1690
1691   }
1692
1693 }