Updating Search service to be ES 6.1.2 compliant
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
22
23 import com.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import org.json.simple.JSONArray;
30 import org.json.simple.JSONObject;
31 import org.json.simple.parser.JSONParser;
32 import org.json.simple.parser.ParseException;
33 import org.onap.aai.sa.rest.AnalysisConfiguration;
34 import org.onap.aai.sa.rest.ApiUtils;
35 import org.onap.aai.sa.rest.BulkRequest;
36 import org.onap.aai.sa.rest.BulkRequest.OperationType;
37 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
39 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
41 import org.onap.aai.sa.searchdbabstraction.entity.Document;
42 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
48 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
49 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
50 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
52 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
53 import org.onap.aai.cl.api.LogFields;
54 import org.onap.aai.cl.api.LogLine;
55 import org.onap.aai.cl.api.Logger;
56 import org.onap.aai.cl.eelf.LoggerFactory;
57 import org.onap.aai.cl.mdc.MdcContext;
58 import org.onap.aai.cl.mdc.MdcOverride;
59 import org.onap.aai.sa.rest.DocumentSchema;
60 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
61 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
62
63 import java.io.BufferedReader;
64 import java.io.File;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.io.InputStreamReader;
69 import java.io.OutputStream;
70 import java.io.OutputStreamWriter;
71 import java.io.PrintWriter;
72 import java.io.StringWriter;
73 import java.net.HttpURLConnection;
74 import java.net.MalformedURLException;
75 import java.net.ProtocolException;
76 import java.net.URL;
77 import java.text.SimpleDateFormat;
78 import java.util.ArrayList;
79 import java.util.Arrays;
80 import java.util.List;
81 import java.util.Properties;
82 import java.util.concurrent.atomic.AtomicBoolean;
83
84 import org.springframework.http.HttpStatus;
85
86
87
88 /**
89  * This class has the Elasticsearch implementation of the
90  * DB operations defined in DocumentStoreInterface.
91  */
92 public class ElasticSearchHttpController implements DocumentStoreInterface {
93
94   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
95       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
96   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
97       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
98   private static final String BULK_IMPORT_INDEX_TEMPLATE =
99       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
100   private static final String BULK_DELETE_TEMPLATE =
101       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
102
103   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
104       "Internal Error: ElasticSearch operation fault occurred";
105   private static final Logger logger = LoggerFactory.getInstance()
106       .getLogger(ElasticSearchHttpController.class.getName());
107   private static final Logger metricsLogger = LoggerFactory.getInstance()
108       .getMetricsLogger(ElasticSearchHttpController.class.getName());
109   private final ElasticSearchConfig config;
110
111   private static final String DEFAULT_TYPE = "default";
112
113   private static ElasticSearchHttpController instance = null;
114
115   protected AnalysisConfiguration analysisConfig;
116
117   public static ElasticSearchHttpController getInstance() {
118
119     synchronized (ElasticSearchHttpController.class) {
120
121       if (instance == null) {
122
123         Properties properties = new Properties();
124         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
125         try {
126           properties.load(new FileInputStream(file));
127         } catch (Exception e) {
128           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
129             "ElasticSearchHTTPController.getInstance",
130             e.getLocalizedMessage());
131         }
132
133         ElasticSearchConfig config = new ElasticSearchConfig(properties);
134         instance = new ElasticSearchHttpController(config);
135       }
136     }
137
138     return instance;
139   }
140
141   public ElasticSearchHttpController(ElasticSearchConfig config) {
142     this.config = config;
143     analysisConfig = new AnalysisConfiguration();
144
145     try {
146       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
147       checkConnection();
148       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
149     } catch (Exception e) {
150       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
151           getFullUrl("", false), e.getMessage());
152     }
153   }
154
155
156   public AnalysisConfiguration getAnalysisConfig() {
157     return analysisConfig;
158   }
159
160   @Override
161   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
162
163     OperationResult result = new OperationResult();
164     result.setResultCode(500);
165
166     try {
167
168       // Submit the request to ElasticSearch to create the index using a
169       // default document type.
170       result = createTable(index,
171           DEFAULT_TYPE,
172           analysisConfig.getEsIndexSettings(),
173           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
174
175       // ElasticSearch will return us a 200 code on success when we
176       // want to report a 201, so translate the result here.
177       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
178       if (isSuccess(result)) {
179         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
180         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
181       }
182
183     } catch (DocumentStoreOperationException | IOException e) {
184
185       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
186     }
187
188     return result;
189   }
190
191   @Override
192   public OperationResult createDynamicIndex(String index, String dynamicSchema) {
193     OperationResult result = new OperationResult();
194     result.setResultCode(500);
195
196     try {
197       result = createTable(index, dynamicSchema);
198
199       // ElasticSearch will return us a 200 code on success when we
200       // want to report a 201, so translate the result here.
201       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
202       if (isSuccess(result)) {
203         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
204       }
205     } catch (DocumentStoreOperationException e) {
206       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
207     }
208
209     return result;
210   }
211
212
213   @Override
214   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
215
216     //Initialize operation result with a failure codes / fault string
217     OperationResult opResult = new OperationResult();
218     opResult.setResultCode(500);
219     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
220
221     // Grab the current time so we can use it to generate a metrics log.
222     MdcOverride override = getStartTime(new MdcOverride());
223
224     String fullUrl = getFullUrl("/" + indexName + "/", false);
225     HttpURLConnection conn = initializeConnection(fullUrl);
226
227     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
228
229     try {
230       conn.setRequestMethod("DELETE");
231     } catch (ProtocolException e) {
232       shutdownConnection(conn);
233       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
234     }
235
236     handleResponse(conn, opResult);
237
238     // Generate a metrics log so we can track how long the operation took.
239     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
240         new LogFields()
241             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
242             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
243         override,
244         indexName);
245
246     shutdownConnection(conn);
247
248     return opResult;
249   }
250
251
252   private OperationResult checkConnection() throws Exception {
253
254     String fullUrl = getFullUrl("/_cluster/health", false);
255     URL url = null;
256     HttpURLConnection conn = null;
257
258     url = new URL(fullUrl);
259     conn = (HttpURLConnection) url.openConnection();
260     conn.setRequestMethod("GET");
261     conn.setDoOutput(true);
262     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
263
264     int resultCode = conn.getResponseCode();
265     logger.debug("getClusterHealth() response Code : " + resultCode);
266     OperationResult opResult = new OperationResult();
267     opResult.setResultCode(resultCode);
268
269     shutdownConnection(conn);
270
271     return opResult;
272   }
273
274   private String getFullUrl(String resourceUrl, boolean isSecure) {
275
276     final String host = config.getIpAddress();
277     final String port = config.getHttpPort();
278
279     if (isSecure) {
280       return String.format("https://%s:%s%s", host, port, resourceUrl);
281     } else {
282       return String.format("http://%s:%s%s", host, port, resourceUrl);
283     }
284   }
285
286   private void shutdownConnection(HttpURLConnection connection) {
287     if (connection == null) {
288       return;
289     }
290
291     InputStream inputstream = null;
292     OutputStream outputstream = null;
293
294     try {
295       inputstream = connection.getInputStream();
296     } catch (IOException e) {
297       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
298     } finally {
299       if (inputstream != null) {
300         try {
301           inputstream.close();
302         } catch (IOException e) {
303           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
304               e.getLocalizedMessage());
305         }
306       }
307     }
308
309     try {
310       outputstream = connection.getOutputStream();
311     } catch (IOException e) {
312       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
313     } finally {
314       if (outputstream != null) {
315         try {
316           outputstream.close();
317         } catch (IOException e) {
318           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
319               e.getLocalizedMessage());
320         }
321       }
322     }
323
324     connection.disconnect();
325   }
326
327   //@Override
328   protected OperationResult createTable(String indexName, String typeName,
329                                         String indexSettings, String indexMappings)
330       throws DocumentStoreOperationException {
331
332     if (indexSettings == null) {
333       logger.debug("No settings provided.");
334     }
335
336     if (indexMappings == null) {
337       logger.debug("No mappings provided.");
338     }
339
340     OperationResult opResult = new OperationResult();
341
342     // Initialize operation result with a failure codes / fault string
343     opResult.setResultCode(500);
344     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
345
346     // Grab the current time so we can use it to generate a metrics log.
347     MdcOverride override = getStartTime(new MdcOverride());
348
349     String fullUrl = getFullUrl("/" + indexName + "/", false);
350     HttpURLConnection conn = initializeConnection(fullUrl);
351
352     try {
353       conn.setRequestMethod("PUT");
354       conn.setRequestProperty("Content-Type", "application/json");
355     } catch (ProtocolException e) {
356       shutdownConnection(conn);
357       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
358     }
359
360     StringBuilder sb = new StringBuilder(128);
361     sb.append("{ \"settings\" : ");
362     sb.append(indexSettings);
363     sb.append(",");
364
365     sb.append("\"mappings\" : {");
366     sb.append("\"" + typeName + "\" :");
367     sb.append(indexMappings);
368     sb.append("}}");
369
370     try {
371         attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
372     } catch(IOException e) {
373         throw new DocumentStoreOperationException("Error in translating Index payload to make it ES compliant.", e);
374     }
375
376     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
377     logger.debug("Request content: " + sb.toString());
378
379     handleResponse(conn, opResult);
380
381     shutdownConnection(conn);
382
383     // Generate a metrics log so we can track how long the operation took.
384     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
385         new LogFields()
386             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
387             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
388         override,
389         indexName);
390
391     return opResult;
392   }
393
394   /**
395    * Will send the passed in JSON payload to Elasticsearch using the
396    * provided index name in an attempt to create the index.
397    *
398    * @param indexName - The name of the index to be created
399    * @param settingsAndMappings - The actual JSON object that will define the index
400    * @return - The operation result of writing into Elasticsearch
401    * @throws DocumentStoreOperationException
402    */
403   protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
404     OperationResult result = new OperationResult();
405     result.setResultCode(500);
406     result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
407
408     // Grab the current time so we can use it to generate a metrics log.
409     MdcOverride override = getStartTime(new MdcOverride());
410
411     String fullUrl = getFullUrl("/" + indexName + "/", false);
412     HttpURLConnection conn = initializeConnection(fullUrl);
413
414     try {
415       conn.setRequestMethod("PUT");
416       conn.setRequestProperty("Content-Type", "application/json");
417     } catch (ProtocolException e) {
418       shutdownConnection(conn);
419       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
420     }
421
422     try {
423         attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
424     } catch(IOException e) {
425         throw new DocumentStoreOperationException("Error in translating DynamicIndex payload to make it ES compliant.", e);
426     }
427     handleResponse(conn, result);
428
429     // Generate a metrics log so we can track how long the operation took.
430     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
431             new LogFields()
432                     .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
433                     .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
434             override,
435             indexName);
436
437     return result;
438   }
439
440   @Override
441   public DocumentOperationResult createDocument(String                  indexName,
442                                                 DocumentStoreDataEntity document,
443                                                 boolean                 allowImplicitIndexCreation)
444       throws DocumentStoreOperationException {
445
446     if(!allowImplicitIndexCreation) {
447
448       // Before we do anything, make sure that the specified index actually exists in the
449       // document store - we don't want to rely on ElasticSearch to fail the document
450       // create because it could be configured to implicitly create a non-existent index,
451       // which can lead to hard-to-debug behaviour with queries down the road.
452       OperationResult indexExistsResult = checkIndexExistence(indexName);
453       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
454
455         DocumentOperationResult opResult = new DocumentOperationResult();
456         opResult.setResultCode(HttpStatus.NOT_FOUND.value());
457         opResult.setResult("Document Index '" + indexName + "' does not exist.");
458         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
459         return opResult;
460       }
461     }
462
463     if (document.getId() == null || document.getId().isEmpty()) {
464       return createDocumentWithoutId(indexName, document);
465     } else {
466       return createDocumentWithId(indexName, document);
467     }
468   }
469
470   private DocumentOperationResult createDocumentWithId(String indexName,
471                                                        DocumentStoreDataEntity document)
472       throws DocumentStoreOperationException {
473     // check if the document already exists
474     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
475
476
477     if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
478       if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
479         opResult.setFailureCause("A document with the same id already exists.");
480       } else {
481         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
482       }
483       opResult.setResultCode(HttpStatus.CONFLICT.value());
484       return opResult;
485     }
486
487     opResult = new DocumentOperationResult();
488     // Initialize operation result with a failure codes / fault string
489     opResult.setResultCode(500);
490     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
491
492     // Grab the current time so we can use it to generate a metrics log.
493     MdcOverride override = getStartTime(new MdcOverride());
494
495     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
496         + "/" + document.getId(), false);
497     HttpURLConnection conn = initializeConnection(fullUrl);
498
499     try {
500       conn.setRequestMethod("PUT");
501     } catch (ProtocolException e) {
502       shutdownConnection(conn);
503       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
504     }
505
506     attachDocument(conn, document);
507
508     logger.debug("Sending 'PUT' request to: " + conn.getURL());
509
510     handleResponse(conn, opResult);
511     buildDocumentResult(opResult, indexName);
512
513     // Generate a metrics log so we can track how long the operation took.
514     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
515         new LogFields()
516             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
517             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
518         override,
519         indexName);
520
521     shutdownConnection(conn);
522
523     return opResult;
524
525   }
526
527   private DocumentOperationResult createDocumentWithoutId(String indexName,
528                                                           DocumentStoreDataEntity document)
529       throws DocumentStoreOperationException {
530
531     DocumentOperationResult response = new DocumentOperationResult();
532     // Initialize operation result with a failure codes / fault string
533     response.setResultCode(500);
534     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
535
536     // Grab the current time so we can use it to generate a metrics log.
537     MdcOverride override = getStartTime(new MdcOverride());
538
539     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
540     HttpURLConnection conn = initializeConnection(fullUrl);
541
542     try {
543       conn.setRequestMethod("POST");
544     } catch (ProtocolException e) {
545       shutdownConnection(conn);
546       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
547     }
548
549     attachDocument(conn, document);
550
551     logger.debug("Sending 'POST' request to: " + conn.getURL());
552
553     handleResponse(conn, response);
554     buildDocumentResult(response, indexName);
555
556     // Generate a metrics log so we can track how long the operation took.
557     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
558         new LogFields()
559             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
560             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
561         override,
562         indexName);
563
564     shutdownConnection(conn);
565
566     return response;
567   }
568
569   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
570       throws DocumentStoreOperationException {
571 //    conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
572     conn.setRequestProperty("Content-Type", "application/json");
573     conn.setRequestProperty("Connection", "Close");
574     
575     try {
576         attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(doc.getContentInJson()));
577     } catch(IOException e) {
578         throw new DocumentStoreOperationException("Error in translating Document payload to make it ES compliant.", e);
579     }
580   }
581
582   private DocumentOperationResult checkDocumentExistence(String indexName,
583                                                          String docId)
584       throws DocumentStoreOperationException {
585     DocumentOperationResult opResult = new DocumentOperationResult();
586
587     // Initialize operation result with a failure codes / fault string
588     opResult.setResultCode(500);
589
590     // Grab the current time so we can use it to generate a metrics log.
591     MdcOverride override = getStartTime(new MdcOverride());
592
593     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
594     HttpURLConnection conn = initializeConnection(fullUrl);
595
596     try {
597       conn.setRequestMethod("HEAD");
598     } catch (ProtocolException e) {
599       shutdownConnection(conn);
600       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
601     }
602
603     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
604
605     int resultCode;
606     try {
607       resultCode = conn.getResponseCode();
608     } catch (IOException e) {
609       shutdownConnection(conn);
610       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
611     }
612
613     logger.debug("Response Code : " + resultCode);
614
615     opResult.setResultCode(resultCode);
616
617     // Generate a metrics log so we can track how long the operation took.
618     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
619         new LogFields()
620             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
621             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
622         override,
623         indexName,
624         docId);
625
626     shutdownConnection(conn);
627
628     return opResult;
629   }
630
631   @Override
632   public DocumentOperationResult updateDocument(String                  indexName,
633                                                 DocumentStoreDataEntity document,
634                                                 boolean                 allowImplicitIndexCreation)
635       throws DocumentStoreOperationException {
636
637     if(!allowImplicitIndexCreation) {
638
639       // Before we do anything, make sure that the specified index actually exists in the
640       // document store - we don't want to rely on ElasticSearch to fail the document
641       // create because it could be configured to implicitly create a non-existent index,
642       // which can lead to hard-to-debug behaviour with queries down the road.
643       OperationResult indexExistsResult = checkIndexExistence(indexName);
644       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
645
646         DocumentOperationResult opResult = new DocumentOperationResult();
647         opResult.setResultCode(HttpStatus.NOT_FOUND.value());
648         opResult.setResult("Document Index '" + indexName + "' does not exist.");
649         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
650         return opResult;
651       }
652     }
653
654     DocumentOperationResult opResult = new DocumentOperationResult();
655
656     // Initialize operation result with a failure codes / fault string
657     opResult.setResultCode(500);
658     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
659
660     // Grab the current time so we can use it to generate a metrics log.
661     MdcOverride override = getStartTime(new MdcOverride());
662
663     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
664         + "?version=" + document.getVersion(), false);
665     HttpURLConnection conn = initializeConnection(fullUrl);
666
667     try {
668       conn.setRequestMethod("PUT");
669       conn.setRequestProperty("Content-Type", "application/json");
670     } catch (ProtocolException e) {
671       shutdownConnection(conn);
672       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
673     }
674
675     attachDocument(conn, document);
676
677     logger.debug("Sending 'PUT' request to: " + conn.getURL());
678
679     handleResponse(conn, opResult);
680     buildDocumentResult(opResult, indexName);
681
682     // Generate a metrics log so we can track how long the operation took.
683     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
684         new LogFields()
685             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
686             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
687         override,
688         indexName,
689         document.getId());
690
691     shutdownConnection(conn);
692
693     return opResult;
694   }
695
696   @Override
697   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
698       throws DocumentStoreOperationException {
699     DocumentOperationResult opResult = new DocumentOperationResult();
700
701     // Initialize operation result with a failure codes / fault string
702     opResult.setResultCode(500);
703     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
704
705     // Grab the current time so we can use it to generate a metrics log.
706     MdcOverride override = getStartTime(new MdcOverride());
707
708     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
709         + "?version=" + document.getVersion(), false);
710     HttpURLConnection conn = initializeConnection(fullUrl);
711
712     try {
713       conn.setRequestMethod("DELETE");
714     } catch (ProtocolException e) {
715       shutdownConnection(conn);
716       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
717     }
718
719     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
720
721     handleResponse(conn, opResult);
722     buildDocumentResult(opResult, indexName);
723     //supress the etag and url in response for delete as they are not required
724     if (opResult.getDocument() != null) {
725       opResult.getDocument().setEtag(null);
726       opResult.getDocument().setUrl(null);
727     }
728
729     // Generate a metrics log so we can track how long the operation took.
730     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
731         new LogFields()
732             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
733             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
734         override,
735         indexName,
736         document.getId());
737
738     shutdownConnection(conn);
739
740     return opResult;
741   }
742
743   @Override
744   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
745       throws DocumentStoreOperationException {
746     DocumentOperationResult opResult = new DocumentOperationResult();
747
748     // Initialize operation result with a failure codes / fault string
749     opResult.setResultCode(500);
750     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
751
752     // Grab the current time so we can use it to generate a metrics log.
753     MdcOverride override = getStartTime(new MdcOverride());
754
755     String fullUrl = null;
756     if (document.getVersion() == null) {
757       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
758     } else {
759       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
760           + "?version=" + document.getVersion(), false);
761     }
762     HttpURLConnection conn = initializeConnection(fullUrl);
763
764     logger.debug("\nSending 'GET' request to: " + conn.getURL());
765
766     handleResponse(conn, opResult);
767     buildDocumentResult(opResult, indexName);
768
769     // Generate a metrics log so we can track how long the operation took.
770     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
771         new LogFields()
772             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
773             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
774         override,
775         indexName,
776         document.getId());
777
778     shutdownConnection(conn);
779
780     return opResult;
781   }
782
783   public SearchOperationResult search(String indexName, String queryString)
784       throws DocumentStoreOperationException {
785     SearchOperationResult opResult = new SearchOperationResult();
786
787     // Initialize operation result with a failure codes / fault string
788     opResult.setResultCode(500);
789     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
790
791     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
792
793     // Grab the current time so we can use it to generate a metrics log.
794     MdcOverride override = getStartTime(new MdcOverride());
795
796     HttpURLConnection conn = initializeConnection(fullUrl);
797
798     try {
799       conn.setRequestMethod("GET");
800     } catch (ProtocolException e) {
801       shutdownConnection(conn);
802       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
803     }
804
805     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
806
807     handleResponse(conn, opResult);
808     buildSearchResult(opResult, indexName);
809
810
811     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
812         new LogFields()
813             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
814             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
815         override,
816         indexName,
817         queryString);
818
819     return opResult;
820   }
821
822   public SearchOperationResult searchWithPayload(String indexName, String query)
823       throws DocumentStoreOperationException {
824     SearchOperationResult opResult = new SearchOperationResult();
825
826     if (logger.isDebugEnabled()) {
827       logger.debug("Querying index: " + indexName + " with query string: " + query);
828     }
829
830     // Initialize operation result with a failure codes / fault string
831     opResult.setResultCode(500);
832     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
833
834     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
835
836     // Grab the current time so we can use it to generate a metrics log.
837     MdcOverride override = getStartTime(new MdcOverride());
838
839     HttpURLConnection conn = initializeConnection(fullUrl);
840
841     try {
842       conn.setRequestMethod("POST");
843       conn.setRequestProperty("Content-Type", "application/json");
844     } catch (ProtocolException e) {
845       shutdownConnection(conn);
846       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
847     }
848
849     attachContent(conn, query);
850
851     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
852     logger.debug("Request body =  Elasticsearch query = " + query);
853
854     handleResponse(conn, opResult);
855     buildSearchResult(opResult, indexName);
856
857     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
858         new LogFields()
859             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
860             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
861         override,
862         indexName,
863         query);
864
865     shutdownConnection(conn);
866
867     return opResult;
868   }
869
870
871   public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
872           throws DocumentStoreOperationException {
873
874     SearchOperationResult opResult = new SearchOperationResult();
875
876     if (logger.isDebugEnabled()) {
877       logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
878     }
879
880     // Initialize operation result with a failure codes / fault string
881     opResult.setResultCode(500);
882     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
883
884     String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
885
886     // Grab the current time so we can use it to generate a metrics log.
887     MdcOverride override = getStartTime(new MdcOverride());
888
889     HttpURLConnection conn = initializeConnection(fullUrl);
890
891     try {
892       conn.setRequestMethod("POST");
893       conn.setRequestProperty("Content-Type", "application/json");
894     } catch (ProtocolException e) {
895       shutdownConnection(conn);
896       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
897     }
898
899     attachContent(conn, query);
900
901     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
902     logger.debug("Request body =  Elasticsearch query = " + query);
903
904     handleResponse(conn, opResult);
905     buildSuggestResult(opResult, indexName);
906
907     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
908             new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
909                     .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
910             override, indexName, query);
911
912     shutdownConnection(conn);
913
914     return opResult;
915   }
916
917   private void attachContent(HttpURLConnection conn, String content)
918       throws DocumentStoreOperationException {
919     OutputStream outputStream = null;
920     OutputStreamWriter out = null;
921
922     try {
923       outputStream = conn.getOutputStream();
924     } catch (IOException e) {
925       shutdownConnection(conn);
926       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
927     }
928
929     out = new OutputStreamWriter(outputStream);
930
931     try {
932       out.write(content);
933       out.close();
934     } catch (IOException e) {
935       shutdownConnection(conn);
936       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
937     }
938   }
939
940   private HttpURLConnection initializeConnection(String fullUrl)
941       throws DocumentStoreOperationException {
942     URL url = null;
943     HttpURLConnection conn = null;
944
945     try {
946       url = new URL(fullUrl);
947     } catch (MalformedURLException e) {
948       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
949     }
950
951     try {
952       conn = (HttpURLConnection) url.openConnection();
953       conn.setDoOutput(true);
954     } catch (IOException e) {
955       shutdownConnection(conn);
956       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
957     }
958
959     return conn;
960   }
961
962   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
963       throws DocumentStoreOperationException {
964     int resultCode = 200;
965
966     try {
967       resultCode = conn.getResponseCode();
968     } catch (IOException e) {
969       shutdownConnection(conn);
970       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
971     }
972
973     logger.debug("Response Code : " + resultCode);
974
975     InputStream inputStream = null;
976
977     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
978       inputStream = conn.getErrorStream();
979     } else {
980       try {
981         inputStream = conn.getInputStream();
982       } catch (IOException e) {
983         shutdownConnection(conn);
984         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
985       }
986     }
987
988     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
989     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
990
991     StringBuilder result = new StringBuilder(128);
992     String string = null;
993
994     try {
995       while ((string = bufferedreader.readLine()) != null) {
996         result.append(string).append("\n");
997       }
998     } catch (IOException e) {
999       shutdownConnection(conn);
1000       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
1001     }
1002
1003     if (resultCode == HttpStatus.CONFLICT.value()) {
1004       opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
1005     } else {
1006       opResult.setResultCode(resultCode);
1007     }
1008     if (logger.isDebugEnabled()) {
1009       logger.debug("Raw result string from ElasticSearch = " + result.toString());
1010     }
1011     opResult.setResult(result.toString());
1012     opResult.setResultVersion(extractVersion(result.toString()));
1013   }
1014
1015   private String extractVersion(String result) throws DocumentStoreOperationException {
1016
1017     JSONParser parser = new JSONParser();
1018     String version = null;
1019     try {
1020       JSONObject root = (JSONObject) parser.parse(result);
1021       if (root.get("_version") != null) {
1022         version = root.get("_version").toString();
1023       }
1024
1025     } catch (ParseException e) {
1026
1027       // Not all responses from ElasticSearch include a version, so
1028       // if we don't get one back, just return an empty string rather
1029       // than trigger a false failure.
1030       version = "";
1031     }
1032     return version;
1033   }
1034
1035   /**
1036    * This convenience method gets the current system time and stores
1037    * it in an attribute in the supplied {@link MdcOverride} object so
1038    * that it can be used later by the metrics logger.
1039    *
1040    * @param override - The {@link MdcOverride} object to update.
1041    * @return - The supplied {@link MdcOverride} object.
1042    */
1043   private MdcOverride getStartTime(MdcOverride override) {
1044
1045     // Grab the current time...
1046     long startTimeInMs = System.currentTimeMillis();
1047
1048     // ...and add it as an attribute to the supplied MDC Override
1049     // object.
1050     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1051     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1052
1053     // Return the MdcOverride object that we were passed.
1054     // This looks odd, but it allows us to do stuff like:
1055     //
1056     //    MdcOverride ov = getStartTime(new MdcOverride())
1057     //
1058     // which is quite handy, but also allows us to pass in an existing
1059     // MdcOverride object which already has some attributes set.
1060     return override;
1061   }
1062
1063   private boolean isSuccess(OperationResult result) {
1064
1065     return isSuccessCode(result.getResultCode());
1066   }
1067
1068
1069   private boolean isSuccessCode(int statusCode) {
1070     return ((statusCode >= 200) && (statusCode < 300));
1071   }
1072
1073
1074   @Override
1075   public OperationResult performBulkOperations(BulkRequest[] requests)
1076       throws DocumentStoreOperationException {
1077
1078     if (logger.isDebugEnabled()) {
1079       String dbgString = "ESController: performBulkOperations - Operations: ";
1080
1081       for (BulkRequest request : requests) {
1082         dbgString += "[" + request.toString() + "] ";
1083       }
1084
1085       logger.debug(dbgString);
1086     }
1087
1088     // Grab the current time so we can use it to generate a metrics log.
1089     MdcOverride override = getStartTime(new MdcOverride());
1090
1091     // Parse the supplied set of operations.
1092     // Iterate over the list of operations which we were provided and
1093     // translate them into a format that ElasticSearh understands.
1094     int opCount = 0;
1095     StringBuilder esOperationSet = new StringBuilder(128);
1096     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1097     for (BulkRequest request : requests) {
1098
1099       // Convert the request to the syntax ElasticSearch likes.
1100       if (buildEsOperation(request, esOperationSet, rejected)) {
1101         opCount++;
1102       }
1103     }
1104
1105     ElasticSearchBulkOperationResult opResult = null;
1106     if (opCount > 0) {
1107
1108       // Open an HTTP connection to the ElasticSearch back end.
1109       String fullUrl = getFullUrl("/_bulk", false);
1110       URL url;
1111       HttpURLConnection conn;
1112       try {
1113
1114         url = new URL(fullUrl);
1115         conn = (HttpURLConnection) url.openConnection();
1116         conn.setRequestMethod("PUT");
1117         conn.setDoOutput(true);
1118         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1119         conn.setRequestProperty("Connection", "Close");
1120
1121       } catch (IOException e) {
1122
1123         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1124         if (logger.isDebugEnabled()) {
1125           logger.debug(Throwables.getStackTraceAsString(e));
1126         }
1127
1128         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
1129             + e.getMessage(), e);
1130       }
1131
1132       StringBuilder bulkResult = new StringBuilder(128);
1133       try {
1134         // Create an output stream to write our request to.
1135         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1136         ;
1137
1138         if (logger.isDebugEnabled()) {
1139           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1140           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1141               "\\n"));
1142         }
1143
1144         // Write the resulting request string to our output stream. (this sends the request to ES?)
1145         out.write(esOperationSet.toString());
1146         out.close();
1147
1148         // Open an input stream on our connection in order to read back the results.
1149         InputStream is = conn.getInputStream();
1150         InputStreamReader inputstreamreader = new InputStreamReader(is);
1151         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1152
1153         // Read the contents of the input stream into our result string...
1154         String esResponseString = null;
1155
1156         while ((esResponseString = bufferedreader.readLine()) != null) {
1157           bulkResult.append(esResponseString).append("\n");
1158         }
1159
1160       } catch (IOException e) {
1161
1162         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1163         if (logger.isDebugEnabled()) {
1164           StringWriter sw = new StringWriter();
1165           e.printStackTrace(new PrintWriter(sw));
1166           logger.debug(sw.toString());
1167         }
1168
1169         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1170             + e.getMessage(), e);
1171       }
1172
1173       if (logger.isDebugEnabled()) {
1174         logger.debug("ESController: Received result string from ElasticSearch: = "
1175             + bulkResult.toString());
1176       }
1177
1178       // ...and marshal the resulting string into a Java object.
1179       try {
1180         opResult = marshallEsBulkResult(bulkResult.toString());
1181
1182       } catch (IOException e) {
1183
1184         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1185         if (logger.isDebugEnabled()) {
1186           logger.debug(Throwables.getStackTraceAsString(e));
1187         }
1188
1189         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1190             + e.getMessage(), e);
1191       }
1192     }
1193
1194     // Finally, build the operation result and return it to the caller.
1195     OperationResult result = new OperationResult();
1196     result.setResultCode(207);
1197     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1198
1199     // In the success case we don't want the entire result string to be
1200     // dumped into the metrics log, so concatenate it.
1201     String resultStringForMetricsLog = result.getResult();
1202     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1203       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1204           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1205     }
1206
1207     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1208         new LogFields()
1209             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1210             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1211         override);
1212
1213     return result;
1214   }
1215
1216
1217   /**
1218    * This method converts a {@link BulkRequest} object into a json structure
1219    * which can be understood by ElasticSearch.
1220    *
1221    * @param request - The request to be performed.
1222    * @param sb      - The string builder to append the json data to
1223    * @throws DocumentStoreOperationException
1224    */
1225   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1226                                    List<ElasticSearchResultItem> fails)
1227       throws DocumentStoreOperationException {
1228
1229     boolean retVal = true;
1230     OperationResult indexExistsResult = null;
1231
1232     // What kind of operation are we performing?
1233     switch (request.getOperationType()) {
1234
1235       // Create a new document.
1236       case CREATE:
1237
1238         // Make sure that we were supplied a document payload.
1239         if (request.getOperation().getDocument() == null) {
1240
1241           fails.add(generateRejectionEntry(request.getOperationType(),
1242               "Missing document payload",
1243               request.getIndex(),
1244               request.getId(),
1245               400,
1246               request.getOperation().getMetaData().getUrl()));
1247           return false;
1248         }
1249
1250         // Make sure that the supplied document URL is formatted
1251         // correctly.
1252         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1253           fails.add(generateRejectionEntry(request.getOperationType(),
1254               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1255               request.getIndex(),
1256               "",
1257               400,
1258               request.getOperation().getMetaData().getUrl()));
1259           return false;
1260         }
1261
1262         // Validate that the specified index actually exists before we
1263         // try to perform the create.
1264         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1265
1266           fails.add(generateRejectionEntry(request.getOperationType(),
1267               "Specified resource does not exist: "
1268                   + request.getOperation().getMetaData().getUrl(),
1269               request.getIndex(),
1270               request.getId(),
1271               404,
1272               request.getOperation().getMetaData().getUrl()));
1273           return false;
1274         }
1275
1276         // If we were supplied an id for the new document, then
1277         // include it in the bulk operation to Elastic Search
1278         if (request.getId() == null) {
1279
1280           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1281               request.getIndex(),
1282               DEFAULT_TYPE));
1283
1284           // Otherwise, we just leave that parameter off and ElasticSearch
1285           // will generate one for us.
1286         } else {
1287           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1288               request.getIndex(),
1289               DEFAULT_TYPE,
1290               request.getId()));
1291         }
1292
1293         try {
1294           // Append the document that we want to create.
1295           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1296         } catch (JsonProcessingException e) {
1297           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1298         }
1299
1300         break;
1301
1302       // Update an existing document.
1303       case UPDATE:
1304
1305         // Make sure that we were supplied a document payload.
1306         if (request.getOperation().getDocument() == null) {
1307
1308           fails.add(generateRejectionEntry(request.getOperationType(),
1309               "Missing document payload",
1310               request.getIndex(),
1311               request.getId(),
1312               400,
1313               request.getOperation().getMetaData().getUrl()));
1314           return false;
1315         }
1316
1317         // Make sure that the supplied document URL is formatted
1318         // correctly.
1319         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1320           fails.add(generateRejectionEntry(request.getOperationType(),
1321               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1322               request.getIndex(),
1323               "",
1324               400,
1325               request.getOperation().getMetaData().getUrl()));
1326           return false;
1327         }
1328
1329         // Validate that the specified index actually exists before we
1330         // try to perform the update.
1331         if (!indexExists(request.getIndex())) {
1332
1333           fails.add(generateRejectionEntry(request.getOperationType(),
1334               "Specified resource does not exist: "
1335                   + request.getOperation().getMetaData().getUrl(),
1336               request.getIndex(),
1337               request.getId(),
1338               404,
1339               request.getOperation().getMetaData().getUrl()));
1340           return false;
1341         }
1342
1343         // Validate that the document we are trying to update actually
1344         // exists before we try to perform the update.
1345         if (!documentExists(request.getIndex(), request.getId())) {
1346
1347           fails.add(generateRejectionEntry(request.getOperationType(),
1348               "Specified resource does not exist: "
1349                   + request.getOperation().getMetaData().getUrl(),
1350               request.getIndex(),
1351               request.getId(),
1352               404,
1353               request.getOperation().getMetaData().getUrl()));
1354           return false;
1355         }
1356
1357         // It is mandatory that a version be supplied for an update operation,
1358         // so validate that now.
1359         if (request.getOperation().getMetaData().getEtag() == null) {
1360
1361           fails.add(generateRejectionEntry(request.getOperationType(),
1362               "Missing mandatory ETag field",
1363               request.getIndex(),
1364               request.getId(),
1365               400,
1366               request.getOperation().getMetaData().getUrl()));
1367           return false;
1368         }
1369
1370         // Generate the update request...
1371         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1372             request.getIndex(),
1373             DEFAULT_TYPE,
1374             request.getId(),
1375             request.getOperation().getMetaData().getEtag()));
1376
1377         // ...and append the document that we want to update.
1378         try {
1379           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1380         } catch (JsonProcessingException e) {
1381           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1382         }
1383         break;
1384
1385       // Delete an existing document.
1386       case DELETE:
1387
1388         // Make sure that the supplied document URL is formatted
1389         // correctly.
1390         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1391           fails.add(generateRejectionEntry(request.getOperationType(),
1392               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1393               request.getIndex(),
1394               "",
1395               400,
1396               request.getOperation().getMetaData().getUrl()));
1397           return false;
1398         }
1399
1400         // Validate that the specified index actually exists before we
1401         // try to perform the delete.
1402         if (!indexExists(request.getIndex())) {
1403
1404           fails.add(generateRejectionEntry(request.getOperationType(),
1405               "Specified resource does not exist: "
1406                   + request.getOperation().getMetaData().getUrl(),
1407               request.getIndex(),
1408               request.getId(),
1409               404,
1410               request.getOperation().getMetaData().getUrl()));
1411           return false;
1412         }
1413
1414         // Validate that the document we are trying to update actually
1415         // exists before we try to perform the delete.
1416         if (!documentExists(request.getIndex(), request.getId())) {
1417
1418           fails.add(generateRejectionEntry(request.getOperationType(),
1419               "Specified resource does not exist: "
1420                   + request.getOperation().getMetaData().getUrl(),
1421               request.getIndex(),
1422               request.getId(),
1423               404,
1424               request.getOperation().getMetaData().getUrl()));
1425           return false;
1426         }
1427
1428         // It is mandatory that a version be supplied for a delete operation,
1429         // so validate that now.
1430         if (request.getOperation().getMetaData().getEtag() == null) {
1431
1432           fails.add(generateRejectionEntry(request.getOperationType(),
1433               "Missing mandatory ETag field",
1434               request.getIndex(),
1435               request.getId(),
1436               400,
1437               request.getOperation().getMetaData().getUrl()));
1438           return false;
1439         }
1440
1441         // Generate the delete request.
1442         sb.append(String.format(BULK_DELETE_TEMPLATE,
1443             request.getIndex(),
1444             DEFAULT_TYPE,
1445             request.getId(),
1446             request.getOperation().getMetaData().getEtag()));
1447         break;
1448       default:
1449     }
1450
1451     return retVal;
1452   }
1453
1454   private boolean indexExists(String index) throws DocumentStoreOperationException {
1455
1456     OperationResult indexExistsResult = checkIndexExistence(index);
1457
1458     return ((indexExistsResult.getResultCode() >= 200)
1459         && (indexExistsResult.getResultCode() < 300));
1460   }
1461
1462   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1463
1464     OperationResult docExistsResult = checkDocumentExistence(index, id);
1465
1466     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1467   }
1468
1469   /**
1470    * This method constructs a status entry for a bulk operation which has
1471    * been rejected before even sending it to the document store.
1472    *
1473    * @param rejectReason - A message describing why the operation was rejected.
1474    * @param anId         - The identifier associated with the document being
1475    *                     acted on.
1476    * @param statusCode  - An HTTP status code.
1477    * @return - A result set item.
1478    */
1479   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1480                                                          String rejectReason,
1481                                                          String index,
1482                                                          String anId,
1483                                                          int statusCode,
1484                                                          String originalUrl) {
1485
1486     ElasticSearchError err = new ElasticSearchError();
1487     err.setReason(rejectReason);
1488
1489     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1490     op.setIndex(index);
1491     op.setId(anId);
1492     op.setStatus(statusCode);
1493     op.setError(err);
1494     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1495
1496     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1497
1498     switch (opType) {
1499       case CREATE:
1500         rejectionResult.setCreate(op);
1501         break;
1502       case UPDATE:
1503         rejectionResult.setIndex(op);
1504         break;
1505       case DELETE:
1506         rejectionResult.setDelete(op);
1507         break;
1508       default:
1509     }
1510
1511     return rejectionResult;
1512   }
1513
1514
1515   /**
1516    * This method takes the json structure returned from ElasticSearch in
1517    * response to a bulk operations request and marshals it into a Java
1518    * object.
1519    *
1520    * @param jsonResult - The bulk operations response returned from
1521    *                   ElasticSearch.
1522    * @return - The marshalled response.
1523    * @throws JsonParseException
1524    * @throws JsonMappingException
1525    * @throws IOException
1526    */
1527   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1528       throws JsonParseException, JsonMappingException, IOException {
1529
1530     if (jsonResult != null) {
1531       if (logger.isDebugEnabled()) {
1532         logger.debug("ESController: Marshalling ES result set from json: "
1533             + jsonResult.replaceAll("\n", ""));
1534       }
1535
1536       ObjectMapper mapper = new ObjectMapper();
1537       mapper.setSerializationInclusion(Include.NON_EMPTY);
1538
1539       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1540     }
1541
1542     return null;
1543   }
1544
1545
1546   /**
1547    * This method takes the marshalled ElasticSearch bulk response and
1548    * converts it into a generic response payload.
1549    *
1550    * @param esResult - ElasticSearch bulk operations response.
1551    * @return - A generic result set.
1552    */
1553   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1554                                            List<ElasticSearchResultItem> rejectedOps) {
1555
1556     int totalOps = 0;
1557     int totalSuccess = 0;
1558     int totalFails = 0;
1559
1560     if (logger.isDebugEnabled()) {
1561
1562       logger.debug("ESController: Build generic result set.  ES Results: "
1563           + ((esResult != null) ? esResult.toString() : "[]")
1564           + " Rejected Ops: " + rejectedOps.toString());
1565     }
1566
1567     // Build a combined list of result items from the results returned
1568     // from ElasticSearch and the list of operations that we rejected
1569     // without sending to ElasticSearch.
1570     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1571     if (esResult != null) {
1572       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1573     }
1574     combinedResults.addAll(rejectedOps);
1575
1576     // Iterate over the individual results in the resulting result set.
1577     StringBuilder resultsBuilder = new StringBuilder();
1578     AtomicBoolean firstItem = new AtomicBoolean(true);
1579     for (ElasticSearchResultItem item : combinedResults) {
1580
1581       // Increment the operation counts.
1582       totalOps++;
1583       if (isSuccessCode(item.operationStatus().getStatus())) {
1584         totalSuccess++;
1585       } else {
1586         totalFails++;
1587       }
1588
1589       // Prepend a comma to our response string unless this it the
1590       // first result in the set.
1591       if (!firstItem.compareAndSet(true, false)) {
1592         resultsBuilder.append(", ");
1593       }
1594
1595       // Append the current result as a generic json structure.
1596       resultsBuilder.append(item.toJson());
1597     }
1598
1599     // Now, build the result string and return it.
1600     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1601         + "\"total_success\": " + totalSuccess + ", "
1602         + "\"total_fails\": " + totalFails + ", "
1603         + "\"results\": ["
1604         + resultsBuilder.toString()
1605         + "]}";
1606
1607     return responseBody;
1608   }
1609
1610
1611   /**
1612    * This method queryies ElasticSearch to determine if the supplied
1613    * index is present in the document store.
1614    *
1615    * @param indexName - The index to look for.
1616    * @return - An operation result indicating the success or failure of
1617    * the check.
1618    * @throws DocumentStoreOperationException
1619    */
1620   public OperationResult checkIndexExistence(String indexName)
1621       throws DocumentStoreOperationException {
1622
1623     // Initialize operation result with a failure codes / fault string
1624     OperationResult opResult = new OperationResult();
1625     opResult.setResultCode(500);
1626
1627     // Grab the current time so we can use it to generate a metrics log.
1628     MdcOverride override = getStartTime(new MdcOverride());
1629
1630     String fullUrl = getFullUrl("/" + indexName, false);
1631     HttpURLConnection conn = initializeConnection(fullUrl);
1632
1633     try {
1634       conn.setRequestMethod("HEAD");
1635
1636     } catch (ProtocolException e) {
1637       shutdownConnection(conn);
1638       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1639     }
1640
1641     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1642
1643     int resultCode;
1644     try {
1645       resultCode = conn.getResponseCode();
1646     } catch (IOException e) {
1647       shutdownConnection(conn);
1648       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1649     }
1650     logger.debug("Response Code : " + resultCode);
1651
1652     opResult.setResultCode(resultCode);
1653
1654     // Generate a metrics log so we can track how long the operation took.
1655     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1656         new LogFields()
1657             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1658             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1659         override,
1660         indexName);
1661
1662     shutdownConnection(conn);
1663
1664     return opResult;
1665   }
1666
1667
1668   private void buildDocumentResult(DocumentOperationResult result, String index)
1669       throws DocumentStoreOperationException {
1670
1671     JSONParser parser = new JSONParser();
1672     JSONObject root;
1673     try {
1674       root = (JSONObject) parser.parse(result.getResult());
1675
1676       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1677         // Success response object
1678         Document doc = new Document();
1679         doc.setEtag(result.getResultVersion());
1680         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1681
1682         doc.setContent((JSONObject) root.get("_source"));
1683         result.setDocument(doc);
1684
1685       } else {
1686         // Error response object
1687         JSONObject error = (JSONObject) root.get("error");
1688         if (error != null) {
1689           result.setError(new ErrorResult(error.get("type").toString(),
1690               error.get("reason").toString()));
1691         }
1692
1693       }
1694     } catch (Exception e) {
1695       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1696           + result.getResult());
1697     }
1698
1699
1700   }
1701
1702   private String buildDocumentResponseUrl(String index, String id) {
1703     return ApiUtils.buildDocumentUri(index, id);
1704   }
1705
1706   private void buildSearchResult(SearchOperationResult result, String index)
1707       throws DocumentStoreOperationException {
1708
1709     JSONParser parser = new JSONParser();
1710     JSONObject root;
1711
1712     try {
1713       root = (JSONObject) parser.parse(result.getResult());
1714       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1715         JSONObject hits = (JSONObject) root.get("hits");
1716         JSONArray hitArray = (JSONArray) hits.get("hits");
1717         SearchHits searchHits = new SearchHits();
1718         searchHits.setTotalHits(hits.get("total").toString());
1719         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1720
1721         for (int i = 0; i < hitArray.size(); i++) {
1722           JSONObject hit = (JSONObject) hitArray.get(i);
1723           SearchHit searchHit = new SearchHit();
1724           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1725           Document doc = new Document();
1726           if (hit.get("_version") != null) {
1727             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1728           }
1729
1730           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1731               ? hit.get("_id").toString() : ""));
1732           doc.setContent((JSONObject) hit.get("_source"));
1733           searchHit.setDocument(doc);
1734           searchHitArray.add(searchHit);
1735         }
1736         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1737         result.setSearchResult(searchHits);
1738
1739         JSONObject aggregations = (JSONObject) root.get("aggregations");
1740         if (aggregations != null) {
1741           AggregationResult[] aggResults =
1742               AggregationParsingUtil.parseAggregationResults(aggregations);
1743           AggregationResults aggs = new AggregationResults();
1744           aggs.setAggregations(aggResults);
1745           result.setAggregationResult(aggs);
1746         }
1747
1748         // success
1749       } else {
1750         JSONObject error = (JSONObject) root.get("error");
1751         if (error != null) {
1752           result.setError(new ErrorResult(error.get("type").toString(),
1753               error.get("reason").toString()));
1754         }
1755       }
1756     } catch (Exception e) {
1757       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1758           + result.getResult());
1759     }
1760
1761   }
1762
1763   private void buildSuggestResult(SearchOperationResult result, String index)
1764           throws DocumentStoreOperationException {
1765
1766     JSONParser parser = new JSONParser ();
1767     JSONObject root;
1768     try {
1769       root = (JSONObject) parser.parse ( result.getResult () );
1770       if (result.getResultCode () >= 200 && result.getResultCode () <= 299) {
1771         JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" );
1772         JSONObject hitdata = (JSONObject) hitArray.get ( 0 );
1773         JSONArray optionsArray = (JSONArray) hitdata.get ( "options" );
1774         SuggestHits suggestHits = new SuggestHits ();
1775         suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) );
1776
1777         ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> ();
1778
1779         for (int i = 0; i < optionsArray.size (); i++) {
1780           JSONObject hit = (JSONObject) optionsArray.get ( i );
1781
1782           SuggestHit suggestHit = new SuggestHit ();
1783           suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" );
1784           suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" );
1785           Document doc = new Document ();
1786           if (hit.get ( "_version" ) != null) {
1787             doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" );
1788           }
1789           doc.setUrl ( buildDocumentResponseUrl ( index,
1790                   (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) );
1791
1792           doc.setContent ( (JSONObject) hit.get ( "payload" ) );
1793           suggestHit.setDocument ( doc );
1794           suggestHitArray.add ( suggestHit );
1795         }
1796         suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) );
1797         result.setSuggestResult ( suggestHits );
1798
1799         JSONObject aggregations = (JSONObject) root.get ( "aggregations" );
1800         if (aggregations != null) {
1801           AggregationResult[] aggResults =
1802                   AggregationParsingUtil.parseAggregationResults ( aggregations );
1803           AggregationResults aggs = new AggregationResults ();
1804           aggs.setAggregations ( aggResults );
1805           result.setAggregationResult ( aggs );
1806         }
1807
1808         // success
1809       } else {
1810         JSONObject error = (JSONObject) root.get ( "error" );
1811         if (error != null) {
1812           result.setError (
1813                   new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) );
1814         }
1815       }
1816     } catch (Exception e) {
1817       throw new DocumentStoreOperationException (
1818               "Failed to parse Elastic Search response." + result.getResult () );
1819     }
1820   }
1821
1822
1823   }