98a254c836af4bf67570a0d878342899465c1592
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
22
23 import com.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import org.json.simple.JSONArray;
30 import org.json.simple.JSONObject;
31 import org.json.simple.parser.JSONParser;
32 import org.json.simple.parser.ParseException;
33 import org.onap.aai.sa.rest.AnalysisConfiguration;
34 import org.onap.aai.sa.rest.ApiUtils;
35 import org.onap.aai.sa.rest.BulkRequest;
36 import org.onap.aai.sa.rest.BulkRequest.OperationType;
37 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
39 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
41 import org.onap.aai.sa.searchdbabstraction.entity.Document;
42 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
48 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
49 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
50 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
52 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
53 import org.onap.aai.cl.api.LogFields;
54 import org.onap.aai.cl.api.LogLine;
55 import org.onap.aai.cl.api.Logger;
56 import org.onap.aai.cl.eelf.LoggerFactory;
57 import org.onap.aai.cl.mdc.MdcContext;
58 import org.onap.aai.cl.mdc.MdcOverride;
59 import org.onap.aai.sa.rest.DocumentSchema;
60 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
61 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
62
63 import java.io.BufferedReader;
64 import java.io.File;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.io.InputStreamReader;
69 import java.io.OutputStream;
70 import java.io.OutputStreamWriter;
71 import java.io.PrintWriter;
72 import java.io.StringWriter;
73 import java.net.HttpURLConnection;
74 import java.net.MalformedURLException;
75 import java.net.ProtocolException;
76 import java.net.URL;
77 import java.text.SimpleDateFormat;
78 import java.util.ArrayList;
79 import java.util.Arrays;
80 import java.util.List;
81 import java.util.Properties;
82 import java.util.concurrent.atomic.AtomicBoolean;
83
84 import org.springframework.http.HttpStatus;
85
86
87
88 /**
89  * This class has the Elasticsearch implementation of the
90  * DB operations defined in DocumentStoreInterface.
91  */
92 public class ElasticSearchHttpController implements DocumentStoreInterface {
93
94   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
95       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
96   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
97       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
98   private static final String BULK_IMPORT_INDEX_TEMPLATE =
99       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
100   private static final String BULK_DELETE_TEMPLATE =
101       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
102
103   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
104       "Internal Error: ElasticSearch operation fault occurred";
105   private static final Logger logger = LoggerFactory.getInstance()
106       .getLogger(ElasticSearchHttpController.class.getName());
107   private static final Logger metricsLogger = LoggerFactory.getInstance()
108       .getMetricsLogger(ElasticSearchHttpController.class.getName());
109   private final ElasticSearchConfig config;
110
111   private static final String DEFAULT_TYPE = "default";
112
113   private static ElasticSearchHttpController instance = null;
114
115   protected AnalysisConfiguration analysisConfig;
116
117   public static ElasticSearchHttpController getInstance() {
118
119     synchronized (ElasticSearchHttpController.class) {
120
121       if (instance == null) {
122
123         Properties properties = new Properties();
124         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
125         try {
126           properties.load(new FileInputStream(file));
127         } catch (Exception e) {
128           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
129             "ElasticSearchHTTPController.getInstance",
130             e.getLocalizedMessage());
131         }
132
133         ElasticSearchConfig config = new ElasticSearchConfig(properties);
134         instance = new ElasticSearchHttpController(config);
135       }
136     }
137
138     return instance;
139   }
140
141   public ElasticSearchHttpController(ElasticSearchConfig config) {
142     this.config = config;
143     analysisConfig = new AnalysisConfiguration();
144
145     try {
146       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
147       checkConnection();
148       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
149     } catch (Exception e) {
150       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
151           getFullUrl("", false), e.getMessage());
152     }
153   }
154
155
156   public AnalysisConfiguration getAnalysisConfig() {
157     return analysisConfig;
158   }
159
160   @Override
161   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
162
163     OperationResult result = new OperationResult();
164     result.setResultCode(500);
165
166     try {
167
168       // Submit the request to ElasticSearch to create the index using a
169       // default document type.
170       result = createTable(index,
171           DEFAULT_TYPE,
172           analysisConfig.getEsIndexSettings(),
173           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
174
175       // ElasticSearch will return us a 200 code on success when we
176       // want to report a 201, so translate the result here.
177       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
178       if (isSuccess(result)) {
179         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
180         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
181       }
182
183     } catch (DocumentStoreOperationException | IOException e) {
184
185       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
186     }
187
188     return result;
189   }
190
191   @Override
192   public OperationResult createDynamicIndex(String index, String dynamicSchema) {
193     OperationResult result = new OperationResult();
194     result.setResultCode(500);
195
196     try {
197       result = createTable(index, dynamicSchema);
198
199       // ElasticSearch will return us a 200 code on success when we
200       // want to report a 201, so translate the result here.
201       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
202       if (isSuccess(result)) {
203         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
204       }
205     } catch (DocumentStoreOperationException e) {
206       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
207     }
208
209     return result;
210   }
211
212
213   @Override
214   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
215
216     //Initialize operation result with a failure codes / fault string
217     OperationResult opResult = new OperationResult();
218     opResult.setResultCode(500);
219     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
220
221     // Grab the current time so we can use it to generate a metrics log.
222     MdcOverride override = getStartTime(new MdcOverride());
223
224     String fullUrl = getFullUrl("/" + indexName + "/", false);
225     HttpURLConnection conn = initializeConnection(fullUrl);
226
227     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
228
229     try {
230       conn.setRequestMethod("DELETE");
231     } catch (ProtocolException e) {
232       shutdownConnection(conn);
233       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
234     }
235
236     handleResponse(conn, opResult);
237
238     // Generate a metrics log so we can track how long the operation took.
239     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
240         new LogFields()
241             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
242             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
243         override,
244         indexName);
245
246     shutdownConnection(conn);
247
248     return opResult;
249   }
250
251
252   private OperationResult checkConnection() throws Exception {
253
254     String fullUrl = getFullUrl("/_cluster/health", false);
255     URL url = null;
256     HttpURLConnection conn = null;
257
258     url = new URL(fullUrl);
259     conn = (HttpURLConnection) url.openConnection();
260     conn.setRequestMethod("GET");
261     conn.setDoOutput(true);
262     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
263
264     int resultCode = conn.getResponseCode();
265     logger.debug("getClusterHealth() response Code : " + resultCode);
266     OperationResult opResult = new OperationResult();
267     opResult.setResultCode(resultCode);
268
269     shutdownConnection(conn);
270
271     return opResult;
272   }
273
274   private String getFullUrl(String resourceUrl, boolean isSecure) {
275
276     final String host = config.getIpAddress();
277     final String port = config.getHttpPort();
278
279     if (isSecure) {
280       return String.format("https://%s:%s%s", host, port, resourceUrl);
281     } else {
282       return String.format("http://%s:%s%s", host, port, resourceUrl);
283     }
284   }
285
286   private void shutdownConnection(HttpURLConnection connection) {
287     if (connection == null) {
288       return;
289     }
290
291     InputStream inputstream = null;
292     OutputStream outputstream = null;
293
294     try {
295       inputstream = connection.getInputStream();
296     } catch (IOException e) {
297       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
298     } finally {
299       if (inputstream != null) {
300         try {
301           inputstream.close();
302         } catch (IOException e) {
303           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
304               e.getLocalizedMessage());
305         }
306       }
307     }
308
309     try {
310       outputstream = connection.getOutputStream();
311     } catch (IOException e) {
312       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
313     } finally {
314       if (outputstream != null) {
315         try {
316           outputstream.close();
317         } catch (IOException e) {
318           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
319               e.getLocalizedMessage());
320         }
321       }
322     }
323
324     connection.disconnect();
325   }
326
327   //@Override
328   protected OperationResult createTable(String indexName, String typeName,
329                                         String indexSettings, String indexMappings)
330       throws DocumentStoreOperationException {
331
332     if (indexSettings == null) {
333       logger.debug("No settings provided.");
334     }
335
336     if (indexMappings == null) {
337       logger.debug("No mappings provided.");
338     }
339
340     OperationResult opResult = new OperationResult();
341
342     // Initialize operation result with a failure codes / fault string
343     opResult.setResultCode(500);
344     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
345
346     // Grab the current time so we can use it to generate a metrics log.
347     MdcOverride override = getStartTime(new MdcOverride());
348
349     String fullUrl = getFullUrl("/" + indexName + "/", false);
350     HttpURLConnection conn = initializeConnection(fullUrl);
351
352     try {
353       conn.setRequestMethod("PUT");
354       conn.setRequestProperty("Content-Type", "application/json");
355     } catch (ProtocolException e) {
356       shutdownConnection(conn);
357       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
358     }
359
360     StringBuilder sb = new StringBuilder(128);
361     sb.append("{ \"settings\" : ");
362     sb.append(indexSettings);
363     sb.append(",");
364
365     sb.append("\"mappings\" : {");
366     sb.append("\"" + typeName + "\" :");
367     sb.append(indexMappings);
368     sb.append("}}");
369
370     try {
371         attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
372     } catch(IOException e) {
373         logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
374         throw new DocumentStoreOperationException(e.getMessage(), e);
375     }
376
377     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
378     logger.debug("Request content: " + sb.toString());
379
380     handleResponse(conn, opResult);
381
382     shutdownConnection(conn);
383
384     // Generate a metrics log so we can track how long the operation took.
385     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
386         new LogFields()
387             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
388             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
389         override,
390         indexName);
391
392     return opResult;
393   }
394
395   /**
396    * Will send the passed in JSON payload to Elasticsearch using the
397    * provided index name in an attempt to create the index.
398    *
399    * @param indexName - The name of the index to be created
400    * @param settingsAndMappings - The actual JSON object that will define the index
401    * @return - The operation result of writing into Elasticsearch
402    * @throws DocumentStoreOperationException
403    */
404   protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
405     OperationResult result = new OperationResult();
406     result.setResultCode(500);
407     result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
408
409     // Grab the current time so we can use it to generate a metrics log.
410     MdcOverride override = getStartTime(new MdcOverride());
411
412     String fullUrl = getFullUrl("/" + indexName + "/", false);
413     HttpURLConnection conn = initializeConnection(fullUrl);
414
415     try {
416       conn.setRequestMethod("PUT");
417       conn.setRequestProperty("Content-Type", "application/json");
418     } catch (ProtocolException e) {
419       shutdownConnection(conn);
420       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
421     }
422
423     try {
424         attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
425     } catch(IOException e) {
426         logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
427         throw new DocumentStoreOperationException(e.getMessage());
428     }
429     handleResponse(conn, result);
430
431     // Generate a metrics log so we can track how long the operation took.
432     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
433             new LogFields()
434                     .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
435                     .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
436             override,
437             indexName);
438
439     return result;
440   }
441
442   @Override
443   public DocumentOperationResult createDocument(String                  indexName,
444                                                 DocumentStoreDataEntity document,
445                                                 boolean                 allowImplicitIndexCreation)
446       throws DocumentStoreOperationException {
447
448     if(!allowImplicitIndexCreation) {
449
450       // Before we do anything, make sure that the specified index actually exists in the
451       // document store - we don't want to rely on ElasticSearch to fail the document
452       // create because it could be configured to implicitly create a non-existent index,
453       // which can lead to hard-to-debug behaviour with queries down the road.
454       OperationResult indexExistsResult = checkIndexExistence(indexName);
455       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
456
457         DocumentOperationResult opResult = new DocumentOperationResult();
458         opResult.setResultCode(HttpStatus.NOT_FOUND.value());
459         opResult.setResult("Document Index '" + indexName + "' does not exist.");
460         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
461         return opResult;
462       }
463     }
464
465     if (document.getId() == null || document.getId().isEmpty()) {
466       return createDocumentWithoutId(indexName, document);
467     } else {
468       return createDocumentWithId(indexName, document);
469     }
470   }
471
472   private DocumentOperationResult createDocumentWithId(String indexName,
473                                                        DocumentStoreDataEntity document)
474       throws DocumentStoreOperationException {
475     // check if the document already exists
476     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
477
478
479     if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
480       if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
481         opResult.setFailureCause("A document with the same id already exists.");
482       } else {
483         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
484       }
485       opResult.setResultCode(HttpStatus.CONFLICT.value());
486       return opResult;
487     }
488
489     opResult = new DocumentOperationResult();
490     // Initialize operation result with a failure codes / fault string
491     opResult.setResultCode(500);
492     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
493
494     // Grab the current time so we can use it to generate a metrics log.
495     MdcOverride override = getStartTime(new MdcOverride());
496
497     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
498         + "/" + document.getId(), false);
499     HttpURLConnection conn = initializeConnection(fullUrl);
500
501     try {
502       conn.setRequestMethod("PUT");
503     } catch (ProtocolException e) {
504       shutdownConnection(conn);
505       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
506     }
507
508     attachDocument(conn, document);
509
510     logger.debug("Sending 'PUT' request to: " + conn.getURL());
511
512     handleResponse(conn, opResult);
513     buildDocumentResult(opResult, indexName);
514
515     // Generate a metrics log so we can track how long the operation took.
516     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
517         new LogFields()
518             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
519             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
520         override,
521         indexName);
522
523     shutdownConnection(conn);
524
525     return opResult;
526
527   }
528
529   private DocumentOperationResult createDocumentWithoutId(String indexName,
530                                                           DocumentStoreDataEntity document)
531       throws DocumentStoreOperationException {
532
533     DocumentOperationResult response = new DocumentOperationResult();
534     // Initialize operation result with a failure codes / fault string
535     response.setResultCode(500);
536     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
537
538     // Grab the current time so we can use it to generate a metrics log.
539     MdcOverride override = getStartTime(new MdcOverride());
540
541     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
542     HttpURLConnection conn = initializeConnection(fullUrl);
543
544     try {
545       conn.setRequestMethod("POST");
546     } catch (ProtocolException e) {
547       shutdownConnection(conn);
548       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
549     }
550
551     attachDocument(conn, document);
552
553     logger.debug("Sending 'POST' request to: " + conn.getURL());
554
555     handleResponse(conn, response);
556     buildDocumentResult(response, indexName);
557
558     // Generate a metrics log so we can track how long the operation took.
559     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
560         new LogFields()
561             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
562             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
563         override,
564         indexName);
565
566     shutdownConnection(conn);
567
568     return response;
569   }
570
571   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
572       throws DocumentStoreOperationException {
573 //    conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
574     conn.setRequestProperty("Content-Type", "application/json");
575     conn.setRequestProperty("Connection", "Close");
576     attachContent(conn, doc.getContentInJson());
577   }
578
579   private DocumentOperationResult checkDocumentExistence(String indexName,
580                                                          String docId)
581       throws DocumentStoreOperationException {
582     DocumentOperationResult opResult = new DocumentOperationResult();
583
584     // Initialize operation result with a failure codes / fault string
585     opResult.setResultCode(500);
586
587     // Grab the current time so we can use it to generate a metrics log.
588     MdcOverride override = getStartTime(new MdcOverride());
589
590     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
591     HttpURLConnection conn = initializeConnection(fullUrl);
592
593     try {
594       conn.setRequestMethod("HEAD");
595     } catch (ProtocolException e) {
596       shutdownConnection(conn);
597       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
598     }
599
600     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
601
602     int resultCode;
603     try {
604       resultCode = conn.getResponseCode();
605     } catch (IOException e) {
606       shutdownConnection(conn);
607       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
608     }
609
610     logger.debug("Response Code : " + resultCode);
611
612     opResult.setResultCode(resultCode);
613
614     // Generate a metrics log so we can track how long the operation took.
615     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
616         new LogFields()
617             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
618             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
619         override,
620         indexName,
621         docId);
622
623     shutdownConnection(conn);
624
625     return opResult;
626   }
627
628   @Override
629   public DocumentOperationResult updateDocument(String                  indexName,
630                                                 DocumentStoreDataEntity document,
631                                                 boolean                 allowImplicitIndexCreation)
632       throws DocumentStoreOperationException {
633
634     if(!allowImplicitIndexCreation) {
635
636       // Before we do anything, make sure that the specified index actually exists in the
637       // document store - we don't want to rely on ElasticSearch to fail the document
638       // create because it could be configured to implicitly create a non-existent index,
639       // which can lead to hard-to-debug behaviour with queries down the road.
640       OperationResult indexExistsResult = checkIndexExistence(indexName);
641       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
642
643         DocumentOperationResult opResult = new DocumentOperationResult();
644         opResult.setResultCode(HttpStatus.NOT_FOUND.value());
645         opResult.setResult("Document Index '" + indexName + "' does not exist.");
646         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
647         return opResult;
648       }
649     }
650
651     DocumentOperationResult opResult = new DocumentOperationResult();
652
653     // Initialize operation result with a failure codes / fault string
654     opResult.setResultCode(500);
655     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
656
657     // Grab the current time so we can use it to generate a metrics log.
658     MdcOverride override = getStartTime(new MdcOverride());
659
660     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
661         + "?version=" + document.getVersion(), false);
662     HttpURLConnection conn = initializeConnection(fullUrl);
663
664     try {
665       conn.setRequestMethod("PUT");
666       conn.setRequestProperty("Content-Type", "application/json");
667     } catch (ProtocolException e) {
668       shutdownConnection(conn);
669       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
670     }
671
672     attachDocument(conn, document);
673
674     logger.debug("Sending 'PUT' request to: " + conn.getURL());
675
676     handleResponse(conn, opResult);
677     buildDocumentResult(opResult, indexName);
678
679     // Generate a metrics log so we can track how long the operation took.
680     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
681         new LogFields()
682             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
683             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
684         override,
685         indexName,
686         document.getId());
687
688     shutdownConnection(conn);
689
690     return opResult;
691   }
692
693   @Override
694   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
695       throws DocumentStoreOperationException {
696     DocumentOperationResult opResult = new DocumentOperationResult();
697
698     // Initialize operation result with a failure codes / fault string
699     opResult.setResultCode(500);
700     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
701
702     // Grab the current time so we can use it to generate a metrics log.
703     MdcOverride override = getStartTime(new MdcOverride());
704
705     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
706         + "?version=" + document.getVersion(), false);
707     HttpURLConnection conn = initializeConnection(fullUrl);
708
709     try {
710       conn.setRequestMethod("DELETE");
711     } catch (ProtocolException e) {
712       shutdownConnection(conn);
713       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
714     }
715
716     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
717
718     handleResponse(conn, opResult);
719     buildDocumentResult(opResult, indexName);
720     //supress the etag and url in response for delete as they are not required
721     if (opResult.getDocument() != null) {
722       opResult.getDocument().setEtag(null);
723       opResult.getDocument().setUrl(null);
724     }
725
726     // Generate a metrics log so we can track how long the operation took.
727     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
728         new LogFields()
729             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
730             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
731         override,
732         indexName,
733         document.getId());
734
735     shutdownConnection(conn);
736
737     return opResult;
738   }
739
740   @Override
741   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
742       throws DocumentStoreOperationException {
743     DocumentOperationResult opResult = new DocumentOperationResult();
744
745     // Initialize operation result with a failure codes / fault string
746     opResult.setResultCode(500);
747     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
748
749     // Grab the current time so we can use it to generate a metrics log.
750     MdcOverride override = getStartTime(new MdcOverride());
751
752     String fullUrl = null;
753     if (document.getVersion() == null) {
754       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
755     } else {
756       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
757           + "?version=" + document.getVersion(), false);
758     }
759     HttpURLConnection conn = initializeConnection(fullUrl);
760
761     logger.debug("\nSending 'GET' request to: " + conn.getURL());
762
763     handleResponse(conn, opResult);
764     buildDocumentResult(opResult, indexName);
765
766     // Generate a metrics log so we can track how long the operation took.
767     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
768         new LogFields()
769             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
770             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
771         override,
772         indexName,
773         document.getId());
774
775     shutdownConnection(conn);
776
777     return opResult;
778   }
779
780   public SearchOperationResult search(String indexName, String queryString)
781       throws DocumentStoreOperationException {
782     SearchOperationResult opResult = new SearchOperationResult();
783
784     // Initialize operation result with a failure codes / fault string
785     opResult.setResultCode(500);
786     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
787
788     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
789
790     // Grab the current time so we can use it to generate a metrics log.
791     MdcOverride override = getStartTime(new MdcOverride());
792
793     HttpURLConnection conn = initializeConnection(fullUrl);
794
795     try {
796       conn.setRequestMethod("GET");
797     } catch (ProtocolException e) {
798       shutdownConnection(conn);
799       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
800     }
801
802     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
803
804     handleResponse(conn, opResult);
805     buildSearchResult(opResult, indexName);
806
807
808     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
809         new LogFields()
810             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
811             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
812         override,
813         indexName,
814         queryString);
815
816     return opResult;
817   }
818
819   public SearchOperationResult searchWithPayload(String indexName, String query)
820       throws DocumentStoreOperationException {
821     SearchOperationResult opResult = new SearchOperationResult();
822
823     if (logger.isDebugEnabled()) {
824       logger.debug("Querying index: " + indexName + " with query string: " + query);
825     }
826
827     // Initialize operation result with a failure codes / fault string
828     opResult.setResultCode(500);
829     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
830
831     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
832
833     // Grab the current time so we can use it to generate a metrics log.
834     MdcOverride override = getStartTime(new MdcOverride());
835
836     HttpURLConnection conn = initializeConnection(fullUrl);
837
838     try {
839       conn.setRequestMethod("POST");
840       conn.setRequestProperty("Content-Type", "application/json");
841     } catch (ProtocolException e) {
842       shutdownConnection(conn);
843       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
844     }
845
846     attachContent(conn, query);
847
848     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
849     logger.debug("Request body =  Elasticsearch query = " + query);
850
851     handleResponse(conn, opResult);
852     buildSearchResult(opResult, indexName);
853
854     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
855         new LogFields()
856             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
857             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
858         override,
859         indexName,
860         query);
861
862     shutdownConnection(conn);
863
864     return opResult;
865   }
866
867
868   public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
869           throws DocumentStoreOperationException {
870
871     SearchOperationResult opResult = new SearchOperationResult();
872
873     if (logger.isDebugEnabled()) {
874       logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
875     }
876
877     // Initialize operation result with a failure codes / fault string
878     opResult.setResultCode(500);
879     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
880
881     String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
882
883     // Grab the current time so we can use it to generate a metrics log.
884     MdcOverride override = getStartTime(new MdcOverride());
885
886     HttpURLConnection conn = initializeConnection(fullUrl);
887
888     try {
889       conn.setRequestMethod("POST");
890       conn.setRequestProperty("Content-Type", "application/json");
891     } catch (ProtocolException e) {
892       shutdownConnection(conn);
893       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
894     }
895
896     attachContent(conn, query);
897
898     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
899     logger.debug("Request body =  Elasticsearch query = " + query);
900
901     handleResponse(conn, opResult);
902     buildSuggestResult(opResult, indexName);
903
904     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
905             new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
906                     .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
907             override, indexName, query);
908
909     shutdownConnection(conn);
910
911     return opResult;
912   }
913
914   private void attachContent(HttpURLConnection conn, String content)
915       throws DocumentStoreOperationException {
916     OutputStream outputStream = null;
917     OutputStreamWriter out = null;
918
919     try {
920       outputStream = conn.getOutputStream();
921     } catch (IOException e) {
922       shutdownConnection(conn);
923       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
924     }
925
926     out = new OutputStreamWriter(outputStream);
927
928     try {
929       out.write(content);
930       out.close();
931     } catch (IOException e) {
932       shutdownConnection(conn);
933       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
934     }
935   }
936
937   private HttpURLConnection initializeConnection(String fullUrl)
938       throws DocumentStoreOperationException {
939     URL url = null;
940     HttpURLConnection conn = null;
941
942     try {
943       url = new URL(fullUrl);
944     } catch (MalformedURLException e) {
945       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
946     }
947
948     try {
949       conn = (HttpURLConnection) url.openConnection();
950       conn.setDoOutput(true);
951     } catch (IOException e) {
952       shutdownConnection(conn);
953       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
954     }
955
956     return conn;
957   }
958
959   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
960       throws DocumentStoreOperationException {
961     int resultCode = 200;
962
963     try {
964       resultCode = conn.getResponseCode();
965     } catch (IOException e) {
966       shutdownConnection(conn);
967       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
968     }
969
970     logger.debug("Response Code : " + resultCode);
971
972     InputStream inputStream = null;
973
974     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
975       inputStream = conn.getErrorStream();
976     } else {
977       try {
978         inputStream = conn.getInputStream();
979       } catch (IOException e) {
980         shutdownConnection(conn);
981         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
982       }
983     }
984
985     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
986     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
987
988     StringBuilder result = new StringBuilder(128);
989     String string = null;
990
991     try {
992       while ((string = bufferedreader.readLine()) != null) {
993         result.append(string).append("\n");
994       }
995     } catch (IOException e) {
996       shutdownConnection(conn);
997       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
998     }
999
1000     if (resultCode == HttpStatus.CONFLICT.value()) {
1001       opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
1002     } else {
1003       opResult.setResultCode(resultCode);
1004     }
1005     if (logger.isDebugEnabled()) {
1006       logger.debug("Raw result string from ElasticSearch = " + result.toString());
1007     }
1008     opResult.setResult(result.toString());
1009     opResult.setResultVersion(extractVersion(result.toString()));
1010   }
1011
1012   private String extractVersion(String result) throws DocumentStoreOperationException {
1013
1014     JSONParser parser = new JSONParser();
1015     String version = null;
1016     try {
1017       JSONObject root = (JSONObject) parser.parse(result);
1018       if (root.get("_version") != null) {
1019         version = root.get("_version").toString();
1020       }
1021
1022     } catch (ParseException e) {
1023
1024       // Not all responses from ElasticSearch include a version, so
1025       // if we don't get one back, just return an empty string rather
1026       // than trigger a false failure.
1027       version = "";
1028     }
1029     return version;
1030   }
1031
1032   /**
1033    * This convenience method gets the current system time and stores
1034    * it in an attribute in the supplied {@link MdcOverride} object so
1035    * that it can be used later by the metrics logger.
1036    *
1037    * @param override - The {@link MdcOverride} object to update.
1038    * @return - The supplied {@link MdcOverride} object.
1039    */
1040   private MdcOverride getStartTime(MdcOverride override) {
1041
1042     // Grab the current time...
1043     long startTimeInMs = System.currentTimeMillis();
1044
1045     // ...and add it as an attribute to the supplied MDC Override
1046     // object.
1047     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1048     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1049
1050     // Return the MdcOverride object that we were passed.
1051     // This looks odd, but it allows us to do stuff like:
1052     //
1053     //    MdcOverride ov = getStartTime(new MdcOverride())
1054     //
1055     // which is quite handy, but also allows us to pass in an existing
1056     // MdcOverride object which already has some attributes set.
1057     return override;
1058   }
1059
1060   private boolean isSuccess(OperationResult result) {
1061
1062     return isSuccessCode(result.getResultCode());
1063   }
1064
1065
1066   private boolean isSuccessCode(int statusCode) {
1067     return ((statusCode >= 200) && (statusCode < 300));
1068   }
1069
1070
1071   @Override
1072   public OperationResult performBulkOperations(BulkRequest[] requests)
1073       throws DocumentStoreOperationException {
1074
1075     if (logger.isDebugEnabled()) {
1076       String dbgString = "ESController: performBulkOperations - Operations: ";
1077
1078       for (BulkRequest request : requests) {
1079         dbgString += "[" + request.toString() + "] ";
1080       }
1081
1082       logger.debug(dbgString);
1083     }
1084
1085     // Grab the current time so we can use it to generate a metrics log.
1086     MdcOverride override = getStartTime(new MdcOverride());
1087
1088     // Parse the supplied set of operations.
1089     // Iterate over the list of operations which we were provided and
1090     // translate them into a format that ElasticSearh understands.
1091     int opCount = 0;
1092     StringBuilder esOperationSet = new StringBuilder(128);
1093     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1094     for (BulkRequest request : requests) {
1095
1096       // Convert the request to the syntax ElasticSearch likes.
1097       if (buildEsOperation(request, esOperationSet, rejected)) {
1098         opCount++;
1099       }
1100     }
1101
1102     ElasticSearchBulkOperationResult opResult = null;
1103     if (opCount > 0) {
1104
1105       // Open an HTTP connection to the ElasticSearch back end.
1106       String fullUrl = getFullUrl("/_bulk", false);
1107       URL url;
1108       HttpURLConnection conn;
1109       try {
1110
1111         url = new URL(fullUrl);
1112         conn = (HttpURLConnection) url.openConnection();
1113         conn.setRequestMethod("PUT");
1114         conn.setDoOutput(true);
1115         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1116         conn.setRequestProperty("Connection", "Close");
1117
1118       } catch (IOException e) {
1119
1120         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1121         if (logger.isDebugEnabled()) {
1122           logger.debug(Throwables.getStackTraceAsString(e));
1123         }
1124
1125         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
1126             + e.getMessage(), e);
1127       }
1128
1129       StringBuilder bulkResult = new StringBuilder(128);
1130       try {
1131         // Create an output stream to write our request to.
1132         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1133         ;
1134
1135         if (logger.isDebugEnabled()) {
1136           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1137           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1138               "\\n"));
1139         }
1140
1141         // Write the resulting request string to our output stream. (this sends the request to ES?)
1142         out.write(esOperationSet.toString());
1143         out.close();
1144
1145         // Open an input stream on our connection in order to read back the results.
1146         InputStream is = conn.getInputStream();
1147         InputStreamReader inputstreamreader = new InputStreamReader(is);
1148         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1149
1150         // Read the contents of the input stream into our result string...
1151         String esResponseString = null;
1152
1153         while ((esResponseString = bufferedreader.readLine()) != null) {
1154           bulkResult.append(esResponseString).append("\n");
1155         }
1156
1157       } catch (IOException e) {
1158
1159         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1160         if (logger.isDebugEnabled()) {
1161           StringWriter sw = new StringWriter();
1162           e.printStackTrace(new PrintWriter(sw));
1163           logger.debug(sw.toString());
1164         }
1165
1166         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1167             + e.getMessage(), e);
1168       }
1169
1170       if (logger.isDebugEnabled()) {
1171         logger.debug("ESController: Received result string from ElasticSearch: = "
1172             + bulkResult.toString());
1173       }
1174
1175       // ...and marshal the resulting string into a Java object.
1176       try {
1177         opResult = marshallEsBulkResult(bulkResult.toString());
1178
1179       } catch (IOException e) {
1180
1181         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1182         if (logger.isDebugEnabled()) {
1183           logger.debug(Throwables.getStackTraceAsString(e));
1184         }
1185
1186         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1187             + e.getMessage(), e);
1188       }
1189     }
1190
1191     // Finally, build the operation result and return it to the caller.
1192     OperationResult result = new OperationResult();
1193     result.setResultCode(207);
1194     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1195
1196     // In the success case we don't want the entire result string to be
1197     // dumped into the metrics log, so concatenate it.
1198     String resultStringForMetricsLog = result.getResult();
1199     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1200       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1201           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1202     }
1203
1204     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1205         new LogFields()
1206             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1207             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1208         override);
1209
1210     return result;
1211   }
1212
1213
1214   /**
1215    * This method converts a {@link BulkRequest} object into a json structure
1216    * which can be understood by ElasticSearch.
1217    *
1218    * @param request - The request to be performed.
1219    * @param sb      - The string builder to append the json data to
1220    * @throws DocumentStoreOperationException
1221    */
1222   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1223                                    List<ElasticSearchResultItem> fails)
1224       throws DocumentStoreOperationException {
1225
1226     boolean retVal = true;
1227     OperationResult indexExistsResult = null;
1228
1229     // What kind of operation are we performing?
1230     switch (request.getOperationType()) {
1231
1232       // Create a new document.
1233       case CREATE:
1234
1235         // Make sure that we were supplied a document payload.
1236         if (request.getOperation().getDocument() == null) {
1237
1238           fails.add(generateRejectionEntry(request.getOperationType(),
1239               "Missing document payload",
1240               request.getIndex(),
1241               request.getId(),
1242               400,
1243               request.getOperation().getMetaData().getUrl()));
1244           return false;
1245         }
1246
1247         // Make sure that the supplied document URL is formatted
1248         // correctly.
1249         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1250           fails.add(generateRejectionEntry(request.getOperationType(),
1251               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1252               request.getIndex(),
1253               "",
1254               400,
1255               request.getOperation().getMetaData().getUrl()));
1256           return false;
1257         }
1258
1259         // Validate that the specified index actually exists before we
1260         // try to perform the create.
1261         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1262
1263           fails.add(generateRejectionEntry(request.getOperationType(),
1264               "Specified resource does not exist: "
1265                   + request.getOperation().getMetaData().getUrl(),
1266               request.getIndex(),
1267               request.getId(),
1268               404,
1269               request.getOperation().getMetaData().getUrl()));
1270           return false;
1271         }
1272
1273         // If we were supplied an id for the new document, then
1274         // include it in the bulk operation to Elastic Search
1275         if (request.getId() == null) {
1276
1277           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1278               request.getIndex(),
1279               DEFAULT_TYPE));
1280
1281           // Otherwise, we just leave that parameter off and ElasticSearch
1282           // will generate one for us.
1283         } else {
1284           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1285               request.getIndex(),
1286               DEFAULT_TYPE,
1287               request.getId()));
1288         }
1289
1290         try {
1291           // Append the document that we want to create.
1292           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1293         } catch (JsonProcessingException e) {
1294           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1295         }
1296
1297         break;
1298
1299       // Update an existing document.
1300       case UPDATE:
1301
1302         // Make sure that we were supplied a document payload.
1303         if (request.getOperation().getDocument() == null) {
1304
1305           fails.add(generateRejectionEntry(request.getOperationType(),
1306               "Missing document payload",
1307               request.getIndex(),
1308               request.getId(),
1309               400,
1310               request.getOperation().getMetaData().getUrl()));
1311           return false;
1312         }
1313
1314         // Make sure that the supplied document URL is formatted
1315         // correctly.
1316         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1317           fails.add(generateRejectionEntry(request.getOperationType(),
1318               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1319               request.getIndex(),
1320               "",
1321               400,
1322               request.getOperation().getMetaData().getUrl()));
1323           return false;
1324         }
1325
1326         // Validate that the specified index actually exists before we
1327         // try to perform the update.
1328         if (!indexExists(request.getIndex())) {
1329
1330           fails.add(generateRejectionEntry(request.getOperationType(),
1331               "Specified resource does not exist: "
1332                   + request.getOperation().getMetaData().getUrl(),
1333               request.getIndex(),
1334               request.getId(),
1335               404,
1336               request.getOperation().getMetaData().getUrl()));
1337           return false;
1338         }
1339
1340         // Validate that the document we are trying to update actually
1341         // exists before we try to perform the update.
1342         if (!documentExists(request.getIndex(), request.getId())) {
1343
1344           fails.add(generateRejectionEntry(request.getOperationType(),
1345               "Specified resource does not exist: "
1346                   + request.getOperation().getMetaData().getUrl(),
1347               request.getIndex(),
1348               request.getId(),
1349               404,
1350               request.getOperation().getMetaData().getUrl()));
1351           return false;
1352         }
1353
1354         // It is mandatory that a version be supplied for an update operation,
1355         // so validate that now.
1356         if (request.getOperation().getMetaData().getEtag() == null) {
1357
1358           fails.add(generateRejectionEntry(request.getOperationType(),
1359               "Missing mandatory ETag field",
1360               request.getIndex(),
1361               request.getId(),
1362               400,
1363               request.getOperation().getMetaData().getUrl()));
1364           return false;
1365         }
1366
1367         // Generate the update request...
1368         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1369             request.getIndex(),
1370             DEFAULT_TYPE,
1371             request.getId(),
1372             request.getOperation().getMetaData().getEtag()));
1373
1374         // ...and append the document that we want to update.
1375         try {
1376           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1377         } catch (JsonProcessingException e) {
1378           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1379         }
1380         break;
1381
1382       // Delete an existing document.
1383       case DELETE:
1384
1385         // Make sure that the supplied document URL is formatted
1386         // correctly.
1387         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1388           fails.add(generateRejectionEntry(request.getOperationType(),
1389               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1390               request.getIndex(),
1391               "",
1392               400,
1393               request.getOperation().getMetaData().getUrl()));
1394           return false;
1395         }
1396
1397         // Validate that the specified index actually exists before we
1398         // try to perform the delete.
1399         if (!indexExists(request.getIndex())) {
1400
1401           fails.add(generateRejectionEntry(request.getOperationType(),
1402               "Specified resource does not exist: "
1403                   + request.getOperation().getMetaData().getUrl(),
1404               request.getIndex(),
1405               request.getId(),
1406               404,
1407               request.getOperation().getMetaData().getUrl()));
1408           return false;
1409         }
1410
1411         // Validate that the document we are trying to update actually
1412         // exists before we try to perform the delete.
1413         if (!documentExists(request.getIndex(), request.getId())) {
1414
1415           fails.add(generateRejectionEntry(request.getOperationType(),
1416               "Specified resource does not exist: "
1417                   + request.getOperation().getMetaData().getUrl(),
1418               request.getIndex(),
1419               request.getId(),
1420               404,
1421               request.getOperation().getMetaData().getUrl()));
1422           return false;
1423         }
1424
1425         // It is mandatory that a version be supplied for a delete operation,
1426         // so validate that now.
1427         if (request.getOperation().getMetaData().getEtag() == null) {
1428
1429           fails.add(generateRejectionEntry(request.getOperationType(),
1430               "Missing mandatory ETag field",
1431               request.getIndex(),
1432               request.getId(),
1433               400,
1434               request.getOperation().getMetaData().getUrl()));
1435           return false;
1436         }
1437
1438         // Generate the delete request.
1439         sb.append(String.format(BULK_DELETE_TEMPLATE,
1440             request.getIndex(),
1441             DEFAULT_TYPE,
1442             request.getId(),
1443             request.getOperation().getMetaData().getEtag()));
1444         break;
1445       default:
1446     }
1447
1448     return retVal;
1449   }
1450
1451   private boolean indexExists(String index) throws DocumentStoreOperationException {
1452
1453     OperationResult indexExistsResult = checkIndexExistence(index);
1454
1455     return ((indexExistsResult.getResultCode() >= 200)
1456         && (indexExistsResult.getResultCode() < 300));
1457   }
1458
1459   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1460
1461     OperationResult docExistsResult = checkDocumentExistence(index, id);
1462
1463     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1464   }
1465
1466   /**
1467    * This method constructs a status entry for a bulk operation which has
1468    * been rejected before even sending it to the document store.
1469    *
1470    * @param rejectReason - A message describing why the operation was rejected.
1471    * @param anId         - The identifier associated with the document being
1472    *                     acted on.
1473    * @param statusCode  - An HTTP status code.
1474    * @return - A result set item.
1475    */
1476   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1477                                                          String rejectReason,
1478                                                          String index,
1479                                                          String anId,
1480                                                          int statusCode,
1481                                                          String originalUrl) {
1482
1483     ElasticSearchError err = new ElasticSearchError();
1484     err.setReason(rejectReason);
1485
1486     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1487     op.setIndex(index);
1488     op.setId(anId);
1489     op.setStatus(statusCode);
1490     op.setError(err);
1491     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1492
1493     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1494
1495     switch (opType) {
1496       case CREATE:
1497         rejectionResult.setCreate(op);
1498         break;
1499       case UPDATE:
1500         rejectionResult.setIndex(op);
1501         break;
1502       case DELETE:
1503         rejectionResult.setDelete(op);
1504         break;
1505       default:
1506     }
1507
1508     return rejectionResult;
1509   }
1510
1511
1512   /**
1513    * This method takes the json structure returned from ElasticSearch in
1514    * response to a bulk operations request and marshals it into a Java
1515    * object.
1516    *
1517    * @param jsonResult - The bulk operations response returned from
1518    *                   ElasticSearch.
1519    * @return - The marshalled response.
1520    * @throws JsonParseException
1521    * @throws JsonMappingException
1522    * @throws IOException
1523    */
1524   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1525       throws JsonParseException, JsonMappingException, IOException {
1526
1527     if (jsonResult != null) {
1528       if (logger.isDebugEnabled()) {
1529         logger.debug("ESController: Marshalling ES result set from json: "
1530             + jsonResult.replaceAll("\n", ""));
1531       }
1532
1533       ObjectMapper mapper = new ObjectMapper();
1534       mapper.setSerializationInclusion(Include.NON_EMPTY);
1535
1536       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1537     }
1538
1539     return null;
1540   }
1541
1542
1543   /**
1544    * This method takes the marshalled ElasticSearch bulk response and
1545    * converts it into a generic response payload.
1546    *
1547    * @param esResult - ElasticSearch bulk operations response.
1548    * @return - A generic result set.
1549    */
1550   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1551                                            List<ElasticSearchResultItem> rejectedOps) {
1552
1553     int totalOps = 0;
1554     int totalSuccess = 0;
1555     int totalFails = 0;
1556
1557     if (logger.isDebugEnabled()) {
1558
1559       logger.debug("ESController: Build generic result set.  ES Results: "
1560           + ((esResult != null) ? esResult.toString() : "[]")
1561           + " Rejected Ops: " + rejectedOps.toString());
1562     }
1563
1564     // Build a combined list of result items from the results returned
1565     // from ElasticSearch and the list of operations that we rejected
1566     // without sending to ElasticSearch.
1567     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1568     if (esResult != null) {
1569       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1570     }
1571     combinedResults.addAll(rejectedOps);
1572
1573     // Iterate over the individual results in the resulting result set.
1574     StringBuilder resultsBuilder = new StringBuilder();
1575     AtomicBoolean firstItem = new AtomicBoolean(true);
1576     for (ElasticSearchResultItem item : combinedResults) {
1577
1578       // Increment the operation counts.
1579       totalOps++;
1580       if (isSuccessCode(item.operationStatus().getStatus())) {
1581         totalSuccess++;
1582       } else {
1583         totalFails++;
1584       }
1585
1586       // Prepend a comma to our response string unless this it the
1587       // first result in the set.
1588       if (!firstItem.compareAndSet(true, false)) {
1589         resultsBuilder.append(", ");
1590       }
1591
1592       // Append the current result as a generic json structure.
1593       resultsBuilder.append(item.toJson());
1594     }
1595
1596     // Now, build the result string and return it.
1597     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1598         + "\"total_success\": " + totalSuccess + ", "
1599         + "\"total_fails\": " + totalFails + ", "
1600         + "\"results\": ["
1601         + resultsBuilder.toString()
1602         + "]}";
1603
1604     return responseBody;
1605   }
1606
1607
1608   /**
1609    * This method queryies ElasticSearch to determine if the supplied
1610    * index is present in the document store.
1611    *
1612    * @param indexName - The index to look for.
1613    * @return - An operation result indicating the success or failure of
1614    * the check.
1615    * @throws DocumentStoreOperationException
1616    */
1617   public OperationResult checkIndexExistence(String indexName)
1618       throws DocumentStoreOperationException {
1619
1620     // Initialize operation result with a failure codes / fault string
1621     OperationResult opResult = new OperationResult();
1622     opResult.setResultCode(500);
1623
1624     // Grab the current time so we can use it to generate a metrics log.
1625     MdcOverride override = getStartTime(new MdcOverride());
1626
1627     String fullUrl = getFullUrl("/" + indexName, false);
1628     HttpURLConnection conn = initializeConnection(fullUrl);
1629
1630     try {
1631       conn.setRequestMethod("HEAD");
1632
1633     } catch (ProtocolException e) {
1634       shutdownConnection(conn);
1635       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1636     }
1637
1638     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1639
1640     int resultCode;
1641     try {
1642       resultCode = conn.getResponseCode();
1643     } catch (IOException e) {
1644       shutdownConnection(conn);
1645       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1646     }
1647     logger.debug("Response Code : " + resultCode);
1648
1649     opResult.setResultCode(resultCode);
1650
1651     // Generate a metrics log so we can track how long the operation took.
1652     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1653         new LogFields()
1654             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1655             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1656         override,
1657         indexName);
1658
1659     shutdownConnection(conn);
1660
1661     return opResult;
1662   }
1663
1664
1665   private void buildDocumentResult(DocumentOperationResult result, String index)
1666       throws DocumentStoreOperationException {
1667
1668     JSONParser parser = new JSONParser();
1669     JSONObject root;
1670     try {
1671       root = (JSONObject) parser.parse(result.getResult());
1672
1673       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1674         // Success response object
1675         Document doc = new Document();
1676         doc.setEtag(result.getResultVersion());
1677         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1678
1679         doc.setContent((JSONObject) root.get("_source"));
1680         result.setDocument(doc);
1681
1682       } else {
1683         // Error response object
1684         JSONObject error = (JSONObject) root.get("error");
1685         if (error != null) {
1686           result.setError(new ErrorResult(error.get("type").toString(),
1687               error.get("reason").toString()));
1688         }
1689
1690       }
1691     } catch (Exception e) {
1692       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1693           + result.getResult());
1694     }
1695
1696
1697   }
1698
1699   private String buildDocumentResponseUrl(String index, String id) {
1700     return ApiUtils.buildDocumentUri(index, id);
1701   }
1702
1703   private void buildSearchResult(SearchOperationResult result, String index)
1704       throws DocumentStoreOperationException {
1705
1706     JSONParser parser = new JSONParser();
1707     JSONObject root;
1708
1709     try {
1710       root = (JSONObject) parser.parse(result.getResult());
1711       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1712         JSONObject hits = (JSONObject) root.get("hits");
1713         JSONArray hitArray = (JSONArray) hits.get("hits");
1714         SearchHits searchHits = new SearchHits();
1715         searchHits.setTotalHits(hits.get("total").toString());
1716         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1717
1718         for (int i = 0; i < hitArray.size(); i++) {
1719           JSONObject hit = (JSONObject) hitArray.get(i);
1720           SearchHit searchHit = new SearchHit();
1721           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1722           Document doc = new Document();
1723           if (hit.get("_version") != null) {
1724             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1725           }
1726
1727           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1728               ? hit.get("_id").toString() : ""));
1729           doc.setContent((JSONObject) hit.get("_source"));
1730           searchHit.setDocument(doc);
1731           searchHitArray.add(searchHit);
1732         }
1733         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1734         result.setSearchResult(searchHits);
1735
1736         JSONObject aggregations = (JSONObject) root.get("aggregations");
1737         if (aggregations != null) {
1738           AggregationResult[] aggResults =
1739               AggregationParsingUtil.parseAggregationResults(aggregations);
1740           AggregationResults aggs = new AggregationResults();
1741           aggs.setAggregations(aggResults);
1742           result.setAggregationResult(aggs);
1743         }
1744
1745         // success
1746       } else {
1747         JSONObject error = (JSONObject) root.get("error");
1748         if (error != null) {
1749           result.setError(new ErrorResult(error.get("type").toString(),
1750               error.get("reason").toString()));
1751         }
1752       }
1753     } catch (Exception e) {
1754       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1755           + result.getResult());
1756     }
1757
1758   }
1759
1760   private void buildSuggestResult(SearchOperationResult result, String index)
1761           throws DocumentStoreOperationException {
1762
1763     JSONParser parser = new JSONParser ();
1764     JSONObject root;
1765     try {
1766       root = (JSONObject) parser.parse ( result.getResult () );
1767       if (result.getResultCode () >= 200 && result.getResultCode () <= 299) {
1768         JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" );
1769         JSONObject hitdata = (JSONObject) hitArray.get ( 0 );
1770         JSONArray optionsArray = (JSONArray) hitdata.get ( "options" );
1771         SuggestHits suggestHits = new SuggestHits ();
1772         suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) );
1773
1774         ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> ();
1775
1776         for (int i = 0; i < optionsArray.size (); i++) {
1777           JSONObject hit = (JSONObject) optionsArray.get ( i );
1778
1779           SuggestHit suggestHit = new SuggestHit ();
1780           suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" );
1781           suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" );
1782           Document doc = new Document ();
1783           if (hit.get ( "_version" ) != null) {
1784             doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" );
1785           }
1786           doc.setUrl ( buildDocumentResponseUrl ( index,
1787                   (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) );
1788
1789           doc.setContent ( (JSONObject) hit.get ( "payload" ) );
1790           suggestHit.setDocument ( doc );
1791           suggestHitArray.add ( suggestHit );
1792         }
1793         suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) );
1794         result.setSuggestResult ( suggestHits );
1795
1796         JSONObject aggregations = (JSONObject) root.get ( "aggregations" );
1797         if (aggregations != null) {
1798           AggregationResult[] aggResults =
1799                   AggregationParsingUtil.parseAggregationResults ( aggregations );
1800           AggregationResults aggs = new AggregationResults ();
1801           aggs.setAggregations ( aggResults );
1802           result.setAggregationResult ( aggs );
1803         }
1804
1805         // success
1806       } else {
1807         JSONObject error = (JSONObject) root.get ( "error" );
1808         if (error != null) {
1809           result.setError (
1810                   new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) );
1811         }
1812       }
1813     } catch (Exception e) {
1814       throw new DocumentStoreOperationException (
1815               "Failed to parse Elastic Search response." + result.getResult () );
1816     }
1817   }
1818
1819
1820   }