626ea8c24d29f3bb0c4b0ae7abffd1b2a84564c5
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
22
23 import com.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import org.json.simple.JSONArray;
30 import org.json.simple.JSONObject;
31 import org.json.simple.parser.JSONParser;
32 import org.json.simple.parser.ParseException;
33 import org.onap.aai.sa.rest.AnalysisConfiguration;
34 import org.onap.aai.sa.rest.ApiUtils;
35 import org.onap.aai.sa.rest.BulkRequest;
36 import org.onap.aai.sa.rest.BulkRequest.OperationType;
37 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
39 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
41 import org.onap.aai.sa.searchdbabstraction.entity.Document;
42 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
48 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
49 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
50 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
52 import org.onap.aai.cl.api.LogFields;
53 import org.onap.aai.cl.api.LogLine;
54 import org.onap.aai.cl.api.Logger;
55 import org.onap.aai.cl.eelf.LoggerFactory;
56 import org.onap.aai.cl.mdc.MdcContext;
57 import org.onap.aai.cl.mdc.MdcOverride;
58 import org.onap.aai.sa.rest.DocumentSchema;
59 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
60 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
61
62 import java.io.BufferedReader;
63 import java.io.File;
64 import java.io.FileInputStream;
65 import java.io.IOException;
66 import java.io.InputStream;
67 import java.io.InputStreamReader;
68 import java.io.OutputStream;
69 import java.io.OutputStreamWriter;
70 import java.io.PrintWriter;
71 import java.io.StringWriter;
72 import java.net.HttpURLConnection;
73 import java.net.MalformedURLException;
74 import java.net.ProtocolException;
75 import java.net.URL;
76 import java.text.SimpleDateFormat;
77 import java.util.ArrayList;
78 import java.util.Arrays;
79 import java.util.List;
80 import java.util.Properties;
81 import java.util.concurrent.atomic.AtomicBoolean;
82
83 import org.springframework.http.HttpStatus;
84
85
86
87 /**
88  * This class has the Elasticsearch implementation of the
89  * DB operations defined in DocumentStoreInterface.
90  */
91 public class ElasticSearchHttpController implements DocumentStoreInterface {
92
93   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
94       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
95   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
96       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
97   private static final String BULK_IMPORT_INDEX_TEMPLATE =
98       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
99   private static final String BULK_DELETE_TEMPLATE =
100       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
101
102   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
103       "Internal Error: ElasticSearch operation fault occurred";
104   private static final Logger logger = LoggerFactory.getInstance()
105       .getLogger(ElasticSearchHttpController.class.getName());
106   private static final Logger metricsLogger = LoggerFactory.getInstance()
107       .getMetricsLogger(ElasticSearchHttpController.class.getName());
108   private final ElasticSearchConfig config;
109
110   private static final String DEFAULT_TYPE = "default";
111
112   private static ElasticSearchHttpController instance = null;
113
114   protected AnalysisConfiguration analysisConfig;
115
116   public static ElasticSearchHttpController getInstance() {
117
118     synchronized (ElasticSearchHttpController.class) {
119
120       if (instance == null) {
121
122         Properties properties = new Properties();
123         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
124         try {
125           properties.load(new FileInputStream(file));
126         } catch (Exception e) {
127           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
128             "ElasticSearchHTTPController.getInstance",
129             e.getLocalizedMessage());
130         }
131
132         ElasticSearchConfig config = new ElasticSearchConfig(properties);
133         instance = new ElasticSearchHttpController(config);
134       }
135     }
136
137     return instance;
138   }
139
140   public ElasticSearchHttpController(ElasticSearchConfig config) {
141     this.config = config;
142     analysisConfig = new AnalysisConfiguration();
143
144     try {
145       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
146       checkConnection();
147       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
148     } catch (Exception e) {
149       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
150           getFullUrl("", false), e.getMessage());
151     }
152   }
153
154
155   public AnalysisConfiguration getAnalysisConfig() {
156     return analysisConfig;
157   }
158
159   @Override
160   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
161
162     OperationResult result = new OperationResult();
163     result.setResultCode(500);
164
165     try {
166
167       // Submit the request to ElasticSearch to create the index using a
168       // default document type.
169       result = createTable(index,
170           DEFAULT_TYPE,
171           analysisConfig.getEsIndexSettings(),
172           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
173
174       // ElasticSearch will return us a 200 code on success when we
175       // want to report a 201, so translate the result here.
176       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
177       if (isSuccess(result)) {
178         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
179         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
180       }
181
182     } catch (DocumentStoreOperationException e) {
183
184       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
185     }
186
187     return result;
188   }
189
190   @Override
191   public OperationResult createDynamicIndex(String index, String dynamicSchema) {
192     OperationResult result = new OperationResult();
193     result.setResultCode(500);
194
195     try {
196       result = createTable(index, dynamicSchema);
197
198       // ElasticSearch will return us a 200 code on success when we
199       // want to report a 201, so translate the result here.
200       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
201       if (isSuccess(result)) {
202         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
203       }
204     } catch (DocumentStoreOperationException e) {
205       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
206     }
207
208     return result;
209   }
210
211
212   @Override
213   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
214
215     //Initialize operation result with a failure codes / fault string
216     OperationResult opResult = new OperationResult();
217     opResult.setResultCode(500);
218     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
219
220     // Grab the current time so we can use it to generate a metrics log.
221     MdcOverride override = getStartTime(new MdcOverride());
222
223     String fullUrl = getFullUrl("/" + indexName + "/", false);
224     HttpURLConnection conn = initializeConnection(fullUrl);
225
226     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
227
228     try {
229       conn.setRequestMethod("DELETE");
230     } catch (ProtocolException e) {
231       shutdownConnection(conn);
232       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
233     }
234
235     handleResponse(conn, opResult);
236
237     // Generate a metrics log so we can track how long the operation took.
238     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
239         new LogFields()
240             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
241             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
242         override,
243         indexName);
244
245     shutdownConnection(conn);
246
247     return opResult;
248   }
249
250
251   private OperationResult checkConnection() throws Exception {
252
253     String fullUrl = getFullUrl("/_cluster/health", false);
254     URL url = null;
255     HttpURLConnection conn = null;
256
257     url = new URL(fullUrl);
258     conn = (HttpURLConnection) url.openConnection();
259     conn.setRequestMethod("GET");
260     conn.setDoOutput(true);
261     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
262
263     int resultCode = conn.getResponseCode();
264     logger.debug("getClusterHealth() response Code : " + resultCode);
265     OperationResult opResult = new OperationResult();
266     opResult.setResultCode(resultCode);
267
268     shutdownConnection(conn);
269
270     return opResult;
271   }
272
273   private String getFullUrl(String resourceUrl, boolean isSecure) {
274
275     final String host = config.getIpAddress();
276     final String port = config.getHttpPort();
277
278     if (isSecure) {
279       return String.format("https://%s:%s%s", host, port, resourceUrl);
280     } else {
281       return String.format("http://%s:%s%s", host, port, resourceUrl);
282     }
283   }
284
285   private void shutdownConnection(HttpURLConnection connection) {
286     if (connection == null) {
287       return;
288     }
289
290     InputStream inputstream = null;
291     OutputStream outputstream = null;
292
293     try {
294       inputstream = connection.getInputStream();
295     } catch (IOException e) {
296       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
297     } finally {
298       if (inputstream != null) {
299         try {
300           inputstream.close();
301         } catch (IOException e) {
302           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
303               e.getLocalizedMessage());
304         }
305       }
306     }
307
308     try {
309       outputstream = connection.getOutputStream();
310     } catch (IOException e) {
311       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
312     } finally {
313       if (outputstream != null) {
314         try {
315           outputstream.close();
316         } catch (IOException e) {
317           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
318               e.getLocalizedMessage());
319         }
320       }
321     }
322
323     connection.disconnect();
324   }
325
326   //@Override
327   protected OperationResult createTable(String indexName, String typeName,
328                                         String indexSettings, String indexMappings)
329       throws DocumentStoreOperationException {
330
331     if (indexSettings == null) {
332       logger.debug("No settings provided.");
333     }
334
335     if (indexMappings == null) {
336       logger.debug("No mappings provided.");
337     }
338
339     OperationResult opResult = new OperationResult();
340
341     // Initialize operation result with a failure codes / fault string
342     opResult.setResultCode(500);
343     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
344
345     // Grab the current time so we can use it to generate a metrics log.
346     MdcOverride override = getStartTime(new MdcOverride());
347
348     String fullUrl = getFullUrl("/" + indexName + "/", false);
349     HttpURLConnection conn = initializeConnection(fullUrl);
350
351     try {
352       conn.setRequestMethod("PUT");
353     } catch (ProtocolException e) {
354       shutdownConnection(conn);
355       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
356     }
357
358     StringBuilder sb = new StringBuilder(128);
359     sb.append("{ \"settings\" : ");
360     sb.append(indexSettings);
361     sb.append(",");
362
363     sb.append("\"mappings\" : {");
364     sb.append("\"" + typeName + "\" :");
365     sb.append(indexMappings);
366     sb.append("}}");
367
368     attachContent(conn, sb.toString());
369
370     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
371     logger.debug("Request content: " + sb.toString());
372
373     handleResponse(conn, opResult);
374
375     shutdownConnection(conn);
376
377     // Generate a metrics log so we can track how long the operation took.
378     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
379         new LogFields()
380             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
381             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
382         override,
383         indexName);
384
385     return opResult;
386   }
387
388   /**
389    * Will send the passed in JSON payload to Elasticsearch using the
390    * provided index name in an attempt to create the index.
391    *
392    * @param indexName - The name of the index to be created
393    * @param settingsAndMappings - The actual JSON object that will define the index
394    * @return - The operation result of writing into Elasticsearch
395    * @throws DocumentStoreOperationException
396    */
397   protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
398     OperationResult result = new OperationResult();
399     result.setResultCode(500);
400     result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
401
402     // Grab the current time so we can use it to generate a metrics log.
403     MdcOverride override = getStartTime(new MdcOverride());
404
405     String fullUrl = getFullUrl("/" + indexName + "/", false);
406     HttpURLConnection conn = initializeConnection(fullUrl);
407
408     try {
409       conn.setRequestMethod("PUT");
410     } catch (ProtocolException e) {
411       shutdownConnection(conn);
412       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
413     }
414
415     attachContent(conn, settingsAndMappings);
416     handleResponse(conn, result);
417
418     // Generate a metrics log so we can track how long the operation took.
419     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
420             new LogFields()
421                     .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
422                     .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
423             override,
424             indexName);
425
426     return result;
427   }
428
429   @Override
430   public DocumentOperationResult createDocument(String                  indexName,
431                                                 DocumentStoreDataEntity document,
432                                                 boolean                 allowImplicitIndexCreation)
433       throws DocumentStoreOperationException {
434
435     if(!allowImplicitIndexCreation) {
436
437       // Before we do anything, make sure that the specified index actually exists in the
438       // document store - we don't want to rely on ElasticSearch to fail the document
439       // create because it could be configured to implicitly create a non-existent index,
440       // which can lead to hard-to-debug behaviour with queries down the road.
441       OperationResult indexExistsResult = checkIndexExistence(indexName);
442       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
443
444         DocumentOperationResult opResult = new DocumentOperationResult();
445         opResult.setResultCode(HttpStatus.NOT_FOUND.value());
446         opResult.setResult("Document Index '" + indexName + "' does not exist.");
447         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
448         return opResult;
449       }
450     }
451
452     if (document.getId() == null || document.getId().isEmpty()) {
453       return createDocumentWithoutId(indexName, document);
454     } else {
455       return createDocumentWithId(indexName, document);
456     }
457   }
458
459   private DocumentOperationResult createDocumentWithId(String indexName,
460                                                        DocumentStoreDataEntity document)
461       throws DocumentStoreOperationException {
462     // check if the document already exists
463     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
464
465
466     if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
467       if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
468         opResult.setFailureCause("A document with the same id already exists.");
469       } else {
470         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
471       }
472       opResult.setResultCode(HttpStatus.CONFLICT.value());
473       return opResult;
474     }
475
476     opResult = new DocumentOperationResult();
477     // Initialize operation result with a failure codes / fault string
478     opResult.setResultCode(500);
479     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
480
481     // Grab the current time so we can use it to generate a metrics log.
482     MdcOverride override = getStartTime(new MdcOverride());
483
484     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
485         + "/" + document.getId(), false);
486     HttpURLConnection conn = initializeConnection(fullUrl);
487
488     try {
489       conn.setRequestMethod("PUT");
490     } catch (ProtocolException e) {
491       shutdownConnection(conn);
492       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
493     }
494
495     attachDocument(conn, document);
496
497     logger.debug("Sending 'PUT' request to: " + conn.getURL());
498
499     handleResponse(conn, opResult);
500     buildDocumentResult(opResult, indexName);
501
502     // Generate a metrics log so we can track how long the operation took.
503     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
504         new LogFields()
505             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
506             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
507         override,
508         indexName);
509
510     shutdownConnection(conn);
511
512     return opResult;
513
514   }
515
516   private DocumentOperationResult createDocumentWithoutId(String indexName,
517                                                           DocumentStoreDataEntity document)
518       throws DocumentStoreOperationException {
519
520     DocumentOperationResult response = new DocumentOperationResult();
521     // Initialize operation result with a failure codes / fault string
522     response.setResultCode(500);
523     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
524
525     // Grab the current time so we can use it to generate a metrics log.
526     MdcOverride override = getStartTime(new MdcOverride());
527
528     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
529     HttpURLConnection conn = initializeConnection(fullUrl);
530
531     try {
532       conn.setRequestMethod("POST");
533     } catch (ProtocolException e) {
534       shutdownConnection(conn);
535       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
536     }
537
538     attachDocument(conn, document);
539
540     logger.debug("Sending 'POST' request to: " + conn.getURL());
541
542     handleResponse(conn, response);
543     buildDocumentResult(response, indexName);
544
545     // Generate a metrics log so we can track how long the operation took.
546     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
547         new LogFields()
548             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
549             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
550         override,
551         indexName);
552
553     shutdownConnection(conn);
554
555     return response;
556   }
557
558   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
559       throws DocumentStoreOperationException {
560     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
561     conn.setRequestProperty("Connection", "Close");
562
563     attachContent(conn, doc.getContentInJson());
564   }
565
566   private DocumentOperationResult checkDocumentExistence(String indexName,
567                                                          String docId)
568       throws DocumentStoreOperationException {
569     DocumentOperationResult opResult = new DocumentOperationResult();
570
571     // Initialize operation result with a failure codes / fault string
572     opResult.setResultCode(500);
573
574     // Grab the current time so we can use it to generate a metrics log.
575     MdcOverride override = getStartTime(new MdcOverride());
576
577     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
578     HttpURLConnection conn = initializeConnection(fullUrl);
579
580     try {
581       conn.setRequestMethod("HEAD");
582     } catch (ProtocolException e) {
583       shutdownConnection(conn);
584       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
585     }
586
587     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
588
589     int resultCode;
590     try {
591       resultCode = conn.getResponseCode();
592     } catch (IOException e) {
593       shutdownConnection(conn);
594       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
595     }
596
597     logger.debug("Response Code : " + resultCode);
598
599     opResult.setResultCode(resultCode);
600
601     // Generate a metrics log so we can track how long the operation took.
602     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
603         new LogFields()
604             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
605             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
606         override,
607         indexName,
608         docId);
609
610     shutdownConnection(conn);
611
612     return opResult;
613   }
614
615   @Override
616   public DocumentOperationResult updateDocument(String                  indexName,
617                                                 DocumentStoreDataEntity document,
618                                                 boolean                 allowImplicitIndexCreation)
619       throws DocumentStoreOperationException {
620
621     if(!allowImplicitIndexCreation) {
622
623       // Before we do anything, make sure that the specified index actually exists in the
624       // document store - we don't want to rely on ElasticSearch to fail the document
625       // create because it could be configured to implicitly create a non-existent index,
626       // which can lead to hard-to-debug behaviour with queries down the road.
627       OperationResult indexExistsResult = checkIndexExistence(indexName);
628       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
629
630         DocumentOperationResult opResult = new DocumentOperationResult();
631         opResult.setResultCode(HttpStatus.NOT_FOUND.value());
632         opResult.setResult("Document Index '" + indexName + "' does not exist.");
633         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
634         return opResult;
635       }
636     }
637
638     DocumentOperationResult opResult = new DocumentOperationResult();
639
640     // Initialize operation result with a failure codes / fault string
641     opResult.setResultCode(500);
642     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
643
644     // Grab the current time so we can use it to generate a metrics log.
645     MdcOverride override = getStartTime(new MdcOverride());
646
647     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
648         + "?version=" + document.getVersion(), false);
649     HttpURLConnection conn = initializeConnection(fullUrl);
650
651     try {
652       conn.setRequestMethod("PUT");
653     } catch (ProtocolException e) {
654       shutdownConnection(conn);
655       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
656     }
657
658     attachDocument(conn, document);
659
660     logger.debug("Sending 'PUT' request to: " + conn.getURL());
661
662     handleResponse(conn, opResult);
663     buildDocumentResult(opResult, indexName);
664
665     // Generate a metrics log so we can track how long the operation took.
666     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
667         new LogFields()
668             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
669             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
670         override,
671         indexName,
672         document.getId());
673
674     shutdownConnection(conn);
675
676     return opResult;
677   }
678
679   @Override
680   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
681       throws DocumentStoreOperationException {
682     DocumentOperationResult opResult = new DocumentOperationResult();
683
684     // Initialize operation result with a failure codes / fault string
685     opResult.setResultCode(500);
686     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
687
688     // Grab the current time so we can use it to generate a metrics log.
689     MdcOverride override = getStartTime(new MdcOverride());
690
691     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
692         + "?version=" + document.getVersion(), false);
693     HttpURLConnection conn = initializeConnection(fullUrl);
694
695     try {
696       conn.setRequestMethod("DELETE");
697     } catch (ProtocolException e) {
698       shutdownConnection(conn);
699       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
700     }
701
702     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
703
704     handleResponse(conn, opResult);
705     buildDocumentResult(opResult, indexName);
706     //supress the etag and url in response for delete as they are not required
707     if (opResult.getDocument() != null) {
708       opResult.getDocument().setEtag(null);
709       opResult.getDocument().setUrl(null);
710     }
711
712     // Generate a metrics log so we can track how long the operation took.
713     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
714         new LogFields()
715             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
716             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
717         override,
718         indexName,
719         document.getId());
720
721     shutdownConnection(conn);
722
723     return opResult;
724   }
725
726   @Override
727   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
728       throws DocumentStoreOperationException {
729     DocumentOperationResult opResult = new DocumentOperationResult();
730
731     // Initialize operation result with a failure codes / fault string
732     opResult.setResultCode(500);
733     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
734
735     // Grab the current time so we can use it to generate a metrics log.
736     MdcOverride override = getStartTime(new MdcOverride());
737
738     String fullUrl = null;
739     if (document.getVersion() == null) {
740       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
741     } else {
742       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
743           + "?version=" + document.getVersion(), false);
744     }
745     HttpURLConnection conn = initializeConnection(fullUrl);
746
747     logger.debug("\nSending 'GET' request to: " + conn.getURL());
748
749     handleResponse(conn, opResult);
750     buildDocumentResult(opResult, indexName);
751
752     // Generate a metrics log so we can track how long the operation took.
753     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
754         new LogFields()
755             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
756             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
757         override,
758         indexName,
759         document.getId());
760
761     shutdownConnection(conn);
762
763     return opResult;
764   }
765
766   public SearchOperationResult search(String indexName, String queryString)
767       throws DocumentStoreOperationException {
768     SearchOperationResult opResult = new SearchOperationResult();
769
770     // Initialize operation result with a failure codes / fault string
771     opResult.setResultCode(500);
772     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
773
774     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
775
776     // Grab the current time so we can use it to generate a metrics log.
777     MdcOverride override = getStartTime(new MdcOverride());
778
779     HttpURLConnection conn = initializeConnection(fullUrl);
780
781     try {
782       conn.setRequestMethod("GET");
783     } catch (ProtocolException e) {
784       shutdownConnection(conn);
785       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
786     }
787
788     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
789
790     handleResponse(conn, opResult);
791     buildSearchResult(opResult, indexName);
792
793
794     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
795         new LogFields()
796             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
797             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
798         override,
799         indexName,
800         queryString);
801
802     return opResult;
803   }
804
805   public SearchOperationResult searchWithPayload(String indexName, String query)
806       throws DocumentStoreOperationException {
807     SearchOperationResult opResult = new SearchOperationResult();
808
809     if (logger.isDebugEnabled()) {
810       logger.debug("Querying index: " + indexName + " with query string: " + query);
811     }
812
813     // Initialize operation result with a failure codes / fault string
814     opResult.setResultCode(500);
815     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
816
817     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
818
819     // Grab the current time so we can use it to generate a metrics log.
820     MdcOverride override = getStartTime(new MdcOverride());
821
822     HttpURLConnection conn = initializeConnection(fullUrl);
823
824     try {
825       conn.setRequestMethod("POST");
826     } catch (ProtocolException e) {
827       shutdownConnection(conn);
828       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
829     }
830
831     attachContent(conn, query);
832
833     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
834     logger.debug("Request body =  Elasticsearch query = " + query);
835
836     handleResponse(conn, opResult);
837     buildSearchResult(opResult, indexName);
838
839     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
840         new LogFields()
841             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
842             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
843         override,
844         indexName,
845         query);
846
847     shutdownConnection(conn);
848
849     return opResult;
850   }
851
852
853   public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
854           throws DocumentStoreOperationException {
855
856     SearchOperationResult opResult = new SearchOperationResult();
857
858     if (logger.isDebugEnabled()) {
859       logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
860     }
861
862     // Initialize operation result with a failure codes / fault string
863     opResult.setResultCode(500);
864     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
865
866     String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
867
868     // Grab the current time so we can use it to generate a metrics log.
869     MdcOverride override = getStartTime(new MdcOverride());
870
871     HttpURLConnection conn = initializeConnection(fullUrl);
872
873     try {
874       conn.setRequestMethod("POST");
875     } catch (ProtocolException e) {
876       shutdownConnection(conn);
877       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
878     }
879
880     attachContent(conn, query);
881
882     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
883     logger.debug("Request body =  Elasticsearch query = " + query);
884
885     handleResponse(conn, opResult);
886     buildSuggestResult(opResult, indexName);
887
888     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
889             new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
890                     .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
891             override, indexName, query);
892
893     shutdownConnection(conn);
894
895     return opResult;
896   }
897
898   private void attachContent(HttpURLConnection conn, String content)
899       throws DocumentStoreOperationException {
900     OutputStream outputStream = null;
901     OutputStreamWriter out = null;
902
903     try {
904       outputStream = conn.getOutputStream();
905     } catch (IOException e) {
906       shutdownConnection(conn);
907       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
908     }
909
910     out = new OutputStreamWriter(outputStream);
911
912     try {
913       out.write(content);
914       out.close();
915     } catch (IOException e) {
916       shutdownConnection(conn);
917       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
918     }
919   }
920
921   private HttpURLConnection initializeConnection(String fullUrl)
922       throws DocumentStoreOperationException {
923     URL url = null;
924     HttpURLConnection conn = null;
925
926     try {
927       url = new URL(fullUrl);
928     } catch (MalformedURLException e) {
929       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
930     }
931
932     try {
933       conn = (HttpURLConnection) url.openConnection();
934       conn.setDoOutput(true);
935     } catch (IOException e) {
936       shutdownConnection(conn);
937       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
938     }
939
940     return conn;
941   }
942
943   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
944       throws DocumentStoreOperationException {
945     int resultCode = 200;
946
947     try {
948       resultCode = conn.getResponseCode();
949     } catch (IOException e) {
950       shutdownConnection(conn);
951       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
952     }
953
954     logger.debug("Response Code : " + resultCode);
955
956     InputStream inputStream = null;
957
958     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
959       inputStream = conn.getErrorStream();
960     } else {
961       try {
962         inputStream = conn.getInputStream();
963       } catch (IOException e) {
964         shutdownConnection(conn);
965         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
966       }
967     }
968
969     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
970     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
971
972     StringBuilder result = new StringBuilder(128);
973     String string = null;
974
975     try {
976       while ((string = bufferedreader.readLine()) != null) {
977         result.append(string).append("\n");
978       }
979     } catch (IOException e) {
980       shutdownConnection(conn);
981       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
982     }
983
984     if (resultCode == HttpStatus.CONFLICT.value()) {
985       opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
986     } else {
987       opResult.setResultCode(resultCode);
988     }
989     if (logger.isDebugEnabled()) {
990       logger.debug("Raw result string from ElasticSearch = " + result.toString());
991     }
992     opResult.setResult(result.toString());
993     opResult.setResultVersion(extractVersion(result.toString()));
994   }
995
996   private String extractVersion(String result) throws DocumentStoreOperationException {
997
998     JSONParser parser = new JSONParser();
999     String version = null;
1000     try {
1001       JSONObject root = (JSONObject) parser.parse(result);
1002       if (root.get("_version") != null) {
1003         version = root.get("_version").toString();
1004       }
1005
1006     } catch (ParseException e) {
1007
1008       // Not all responses from ElasticSearch include a version, so
1009       // if we don't get one back, just return an empty string rather
1010       // than trigger a false failure.
1011       version = "";
1012     }
1013     return version;
1014   }
1015
1016   /**
1017    * This convenience method gets the current system time and stores
1018    * it in an attribute in the supplied {@link MdcOverride} object so
1019    * that it can be used later by the metrics logger.
1020    *
1021    * @param override - The {@link MdcOverride} object to update.
1022    * @return - The supplied {@link MdcOverride} object.
1023    */
1024   private MdcOverride getStartTime(MdcOverride override) {
1025
1026     // Grab the current time...
1027     long startTimeInMs = System.currentTimeMillis();
1028
1029     // ...and add it as an attribute to the supplied MDC Override
1030     // object.
1031     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1032     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1033
1034     // Return the MdcOverride object that we were passed.
1035     // This looks odd, but it allows us to do stuff like:
1036     //
1037     //    MdcOverride ov = getStartTime(new MdcOverride())
1038     //
1039     // which is quite handy, but also allows us to pass in an existing
1040     // MdcOverride object which already has some attributes set.
1041     return override;
1042   }
1043
1044   private boolean isSuccess(OperationResult result) {
1045
1046     return isSuccessCode(result.getResultCode());
1047   }
1048
1049
1050   private boolean isSuccessCode(int statusCode) {
1051     return ((statusCode >= 200) && (statusCode < 300));
1052   }
1053
1054
1055   @Override
1056   public OperationResult performBulkOperations(BulkRequest[] requests)
1057       throws DocumentStoreOperationException {
1058
1059     if (logger.isDebugEnabled()) {
1060       String dbgString = "ESController: performBulkOperations - Operations: ";
1061
1062       for (BulkRequest request : requests) {
1063         dbgString += "[" + request.toString() + "] ";
1064       }
1065
1066       logger.debug(dbgString);
1067     }
1068
1069     // Grab the current time so we can use it to generate a metrics log.
1070     MdcOverride override = getStartTime(new MdcOverride());
1071
1072     // Parse the supplied set of operations.
1073     // Iterate over the list of operations which we were provided and
1074     // translate them into a format that ElasticSearh understands.
1075     int opCount = 0;
1076     StringBuilder esOperationSet = new StringBuilder(128);
1077     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1078     for (BulkRequest request : requests) {
1079
1080       // Convert the request to the syntax ElasticSearch likes.
1081       if (buildEsOperation(request, esOperationSet, rejected)) {
1082         opCount++;
1083       }
1084     }
1085
1086     ElasticSearchBulkOperationResult opResult = null;
1087     if (opCount > 0) {
1088
1089       // Open an HTTP connection to the ElasticSearch back end.
1090       String fullUrl = getFullUrl("/_bulk", false);
1091       URL url;
1092       HttpURLConnection conn;
1093       try {
1094
1095         url = new URL(fullUrl);
1096         conn = (HttpURLConnection) url.openConnection();
1097         conn.setRequestMethod("PUT");
1098         conn.setDoOutput(true);
1099         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1100         conn.setRequestProperty("Connection", "Close");
1101
1102       } catch (IOException e) {
1103
1104         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1105         if (logger.isDebugEnabled()) {
1106           logger.debug(Throwables.getStackTraceAsString(e));
1107         }
1108
1109         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
1110             + e.getMessage(), e);
1111       }
1112
1113       StringBuilder bulkResult = new StringBuilder(128);
1114       try {
1115         // Create an output stream to write our request to.
1116         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1117         ;
1118
1119         if (logger.isDebugEnabled()) {
1120           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1121           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1122               "\\n"));
1123         }
1124
1125         // Write the resulting request string to our output stream. (this sends the request to ES?)
1126         out.write(esOperationSet.toString());
1127         out.close();
1128
1129         // Open an input stream on our connection in order to read back the results.
1130         InputStream is = conn.getInputStream();
1131         InputStreamReader inputstreamreader = new InputStreamReader(is);
1132         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1133
1134         // Read the contents of the input stream into our result string...
1135         String esResponseString = null;
1136
1137         while ((esResponseString = bufferedreader.readLine()) != null) {
1138           bulkResult.append(esResponseString).append("\n");
1139         }
1140
1141       } catch (IOException e) {
1142
1143         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1144         if (logger.isDebugEnabled()) {
1145           StringWriter sw = new StringWriter();
1146           e.printStackTrace(new PrintWriter(sw));
1147           logger.debug(sw.toString());
1148         }
1149
1150         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1151             + e.getMessage(), e);
1152       }
1153
1154       if (logger.isDebugEnabled()) {
1155         logger.debug("ESController: Received result string from ElasticSearch: = "
1156             + bulkResult.toString());
1157       }
1158
1159       // ...and marshal the resulting string into a Java object.
1160       try {
1161         opResult = marshallEsBulkResult(bulkResult.toString());
1162
1163       } catch (IOException e) {
1164
1165         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1166         if (logger.isDebugEnabled()) {
1167           logger.debug(Throwables.getStackTraceAsString(e));
1168         }
1169
1170         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1171             + e.getMessage(), e);
1172       }
1173     }
1174
1175     // Finally, build the operation result and return it to the caller.
1176     OperationResult result = new OperationResult();
1177     result.setResultCode(207);
1178     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1179
1180     // In the success case we don't want the entire result string to be
1181     // dumped into the metrics log, so concatenate it.
1182     String resultStringForMetricsLog = result.getResult();
1183     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1184       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1185           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1186     }
1187
1188     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1189         new LogFields()
1190             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1191             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1192         override);
1193
1194     return result;
1195   }
1196
1197
1198   /**
1199    * This method converts a {@link BulkRequest} object into a json structure
1200    * which can be understood by ElasticSearch.
1201    *
1202    * @param request - The request to be performed.
1203    * @param sb      - The string builder to append the json data to
1204    * @throws DocumentStoreOperationException
1205    */
1206   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1207                                    List<ElasticSearchResultItem> fails)
1208       throws DocumentStoreOperationException {
1209
1210     boolean retVal = true;
1211     OperationResult indexExistsResult = null;
1212
1213     // What kind of operation are we performing?
1214     switch (request.getOperationType()) {
1215
1216       // Create a new document.
1217       case CREATE:
1218
1219         // Make sure that we were supplied a document payload.
1220         if (request.getOperation().getDocument() == null) {
1221
1222           fails.add(generateRejectionEntry(request.getOperationType(),
1223               "Missing document payload",
1224               request.getIndex(),
1225               request.getId(),
1226               400,
1227               request.getOperation().getMetaData().getUrl()));
1228           return false;
1229         }
1230
1231         // Make sure that the supplied document URL is formatted
1232         // correctly.
1233         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1234           fails.add(generateRejectionEntry(request.getOperationType(),
1235               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1236               request.getIndex(),
1237               "",
1238               400,
1239               request.getOperation().getMetaData().getUrl()));
1240           return false;
1241         }
1242
1243         // Validate that the specified index actually exists before we
1244         // try to perform the create.
1245         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1246
1247           fails.add(generateRejectionEntry(request.getOperationType(),
1248               "Specified resource does not exist: "
1249                   + request.getOperation().getMetaData().getUrl(),
1250               request.getIndex(),
1251               request.getId(),
1252               404,
1253               request.getOperation().getMetaData().getUrl()));
1254           return false;
1255         }
1256
1257         // If we were supplied an id for the new document, then
1258         // include it in the bulk operation to Elastic Search
1259         if (request.getId() == null) {
1260
1261           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1262               request.getIndex(),
1263               DEFAULT_TYPE));
1264
1265           // Otherwise, we just leave that parameter off and ElasticSearch
1266           // will generate one for us.
1267         } else {
1268           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1269               request.getIndex(),
1270               DEFAULT_TYPE,
1271               request.getId()));
1272         }
1273
1274         try {
1275           // Append the document that we want to create.
1276           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1277         } catch (JsonProcessingException e) {
1278           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1279         }
1280
1281         break;
1282
1283       // Update an existing document.
1284       case UPDATE:
1285
1286         // Make sure that we were supplied a document payload.
1287         if (request.getOperation().getDocument() == null) {
1288
1289           fails.add(generateRejectionEntry(request.getOperationType(),
1290               "Missing document payload",
1291               request.getIndex(),
1292               request.getId(),
1293               400,
1294               request.getOperation().getMetaData().getUrl()));
1295           return false;
1296         }
1297
1298         // Make sure that the supplied document URL is formatted
1299         // correctly.
1300         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1301           fails.add(generateRejectionEntry(request.getOperationType(),
1302               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1303               request.getIndex(),
1304               "",
1305               400,
1306               request.getOperation().getMetaData().getUrl()));
1307           return false;
1308         }
1309
1310         // Validate that the specified index actually exists before we
1311         // try to perform the update.
1312         if (!indexExists(request.getIndex())) {
1313
1314           fails.add(generateRejectionEntry(request.getOperationType(),
1315               "Specified resource does not exist: "
1316                   + request.getOperation().getMetaData().getUrl(),
1317               request.getIndex(),
1318               request.getId(),
1319               404,
1320               request.getOperation().getMetaData().getUrl()));
1321           return false;
1322         }
1323
1324         // Validate that the document we are trying to update actually
1325         // exists before we try to perform the update.
1326         if (!documentExists(request.getIndex(), request.getId())) {
1327
1328           fails.add(generateRejectionEntry(request.getOperationType(),
1329               "Specified resource does not exist: "
1330                   + request.getOperation().getMetaData().getUrl(),
1331               request.getIndex(),
1332               request.getId(),
1333               404,
1334               request.getOperation().getMetaData().getUrl()));
1335           return false;
1336         }
1337
1338         // It is mandatory that a version be supplied for an update operation,
1339         // so validate that now.
1340         if (request.getOperation().getMetaData().getEtag() == null) {
1341
1342           fails.add(generateRejectionEntry(request.getOperationType(),
1343               "Missing mandatory ETag field",
1344               request.getIndex(),
1345               request.getId(),
1346               400,
1347               request.getOperation().getMetaData().getUrl()));
1348           return false;
1349         }
1350
1351         // Generate the update request...
1352         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1353             request.getIndex(),
1354             DEFAULT_TYPE,
1355             request.getId(),
1356             request.getOperation().getMetaData().getEtag()));
1357
1358         // ...and append the document that we want to update.
1359         try {
1360           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1361         } catch (JsonProcessingException e) {
1362           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1363         }
1364         break;
1365
1366       // Delete an existing document.
1367       case DELETE:
1368
1369         // Make sure that the supplied document URL is formatted
1370         // correctly.
1371         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1372           fails.add(generateRejectionEntry(request.getOperationType(),
1373               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1374               request.getIndex(),
1375               "",
1376               400,
1377               request.getOperation().getMetaData().getUrl()));
1378           return false;
1379         }
1380
1381         // Validate that the specified index actually exists before we
1382         // try to perform the delete.
1383         if (!indexExists(request.getIndex())) {
1384
1385           fails.add(generateRejectionEntry(request.getOperationType(),
1386               "Specified resource does not exist: "
1387                   + request.getOperation().getMetaData().getUrl(),
1388               request.getIndex(),
1389               request.getId(),
1390               404,
1391               request.getOperation().getMetaData().getUrl()));
1392           return false;
1393         }
1394
1395         // Validate that the document we are trying to update actually
1396         // exists before we try to perform the delete.
1397         if (!documentExists(request.getIndex(), request.getId())) {
1398
1399           fails.add(generateRejectionEntry(request.getOperationType(),
1400               "Specified resource does not exist: "
1401                   + request.getOperation().getMetaData().getUrl(),
1402               request.getIndex(),
1403               request.getId(),
1404               404,
1405               request.getOperation().getMetaData().getUrl()));
1406           return false;
1407         }
1408
1409         // It is mandatory that a version be supplied for a delete operation,
1410         // so validate that now.
1411         if (request.getOperation().getMetaData().getEtag() == null) {
1412
1413           fails.add(generateRejectionEntry(request.getOperationType(),
1414               "Missing mandatory ETag field",
1415               request.getIndex(),
1416               request.getId(),
1417               400,
1418               request.getOperation().getMetaData().getUrl()));
1419           return false;
1420         }
1421
1422         // Generate the delete request.
1423         sb.append(String.format(BULK_DELETE_TEMPLATE,
1424             request.getIndex(),
1425             DEFAULT_TYPE,
1426             request.getId(),
1427             request.getOperation().getMetaData().getEtag()));
1428         break;
1429       default:
1430     }
1431
1432     return retVal;
1433   }
1434
1435   private boolean indexExists(String index) throws DocumentStoreOperationException {
1436
1437     OperationResult indexExistsResult = checkIndexExistence(index);
1438
1439     return ((indexExistsResult.getResultCode() >= 200)
1440         && (indexExistsResult.getResultCode() < 300));
1441   }
1442
1443   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1444
1445     OperationResult docExistsResult = checkDocumentExistence(index, id);
1446
1447     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1448   }
1449
1450   /**
1451    * This method constructs a status entry for a bulk operation which has
1452    * been rejected before even sending it to the document store.
1453    *
1454    * @param rejectReason - A message describing why the operation was rejected.
1455    * @param anId         - The identifier associated with the document being
1456    *                     acted on.
1457    * @param statusCode  - An HTTP status code.
1458    * @return - A result set item.
1459    */
1460   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1461                                                          String rejectReason,
1462                                                          String index,
1463                                                          String anId,
1464                                                          int statusCode,
1465                                                          String originalUrl) {
1466
1467     ElasticSearchError err = new ElasticSearchError();
1468     err.setReason(rejectReason);
1469
1470     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1471     op.setIndex(index);
1472     op.setId(anId);
1473     op.setStatus(statusCode);
1474     op.setError(err);
1475     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1476
1477     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1478
1479     switch (opType) {
1480       case CREATE:
1481         rejectionResult.setCreate(op);
1482         break;
1483       case UPDATE:
1484         rejectionResult.setIndex(op);
1485         break;
1486       case DELETE:
1487         rejectionResult.setDelete(op);
1488         break;
1489       default:
1490     }
1491
1492     return rejectionResult;
1493   }
1494
1495
1496   /**
1497    * This method takes the json structure returned from ElasticSearch in
1498    * response to a bulk operations request and marshals it into a Java
1499    * object.
1500    *
1501    * @param jsonResult - The bulk operations response returned from
1502    *                   ElasticSearch.
1503    * @return - The marshalled response.
1504    * @throws JsonParseException
1505    * @throws JsonMappingException
1506    * @throws IOException
1507    */
1508   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1509       throws JsonParseException, JsonMappingException, IOException {
1510
1511     if (jsonResult != null) {
1512       if (logger.isDebugEnabled()) {
1513         logger.debug("ESController: Marshalling ES result set from json: "
1514             + jsonResult.replaceAll("\n", ""));
1515       }
1516
1517       ObjectMapper mapper = new ObjectMapper();
1518       mapper.setSerializationInclusion(Include.NON_EMPTY);
1519
1520       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1521     }
1522
1523     return null;
1524   }
1525
1526
1527   /**
1528    * This method takes the marshalled ElasticSearch bulk response and
1529    * converts it into a generic response payload.
1530    *
1531    * @param esResult - ElasticSearch bulk operations response.
1532    * @return - A generic result set.
1533    */
1534   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1535                                            List<ElasticSearchResultItem> rejectedOps) {
1536
1537     int totalOps = 0;
1538     int totalSuccess = 0;
1539     int totalFails = 0;
1540
1541     if (logger.isDebugEnabled()) {
1542
1543       logger.debug("ESController: Build generic result set.  ES Results: "
1544           + ((esResult != null) ? esResult.toString() : "[]")
1545           + " Rejected Ops: " + rejectedOps.toString());
1546     }
1547
1548     // Build a combined list of result items from the results returned
1549     // from ElasticSearch and the list of operations that we rejected
1550     // without sending to ElasticSearch.
1551     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1552     if (esResult != null) {
1553       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1554     }
1555     combinedResults.addAll(rejectedOps);
1556
1557     // Iterate over the individual results in the resulting result set.
1558     StringBuilder resultsBuilder = new StringBuilder();
1559     AtomicBoolean firstItem = new AtomicBoolean(true);
1560     for (ElasticSearchResultItem item : combinedResults) {
1561
1562       // Increment the operation counts.
1563       totalOps++;
1564       if (isSuccessCode(item.operationStatus().getStatus())) {
1565         totalSuccess++;
1566       } else {
1567         totalFails++;
1568       }
1569
1570       // Prepend a comma to our response string unless this it the
1571       // first result in the set.
1572       if (!firstItem.compareAndSet(true, false)) {
1573         resultsBuilder.append(", ");
1574       }
1575
1576       // Append the current result as a generic json structure.
1577       resultsBuilder.append(item.toJson());
1578     }
1579
1580     // Now, build the result string and return it.
1581     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1582         + "\"total_success\": " + totalSuccess + ", "
1583         + "\"total_fails\": " + totalFails + ", "
1584         + "\"results\": ["
1585         + resultsBuilder.toString()
1586         + "]}";
1587
1588     return responseBody;
1589   }
1590
1591
1592   /**
1593    * This method queryies ElasticSearch to determine if the supplied
1594    * index is present in the document store.
1595    *
1596    * @param indexName - The index to look for.
1597    * @return - An operation result indicating the success or failure of
1598    * the check.
1599    * @throws DocumentStoreOperationException
1600    */
1601   public OperationResult checkIndexExistence(String indexName)
1602       throws DocumentStoreOperationException {
1603
1604     // Initialize operation result with a failure codes / fault string
1605     OperationResult opResult = new OperationResult();
1606     opResult.setResultCode(500);
1607
1608     // Grab the current time so we can use it to generate a metrics log.
1609     MdcOverride override = getStartTime(new MdcOverride());
1610
1611     String fullUrl = getFullUrl("/" + indexName, false);
1612     HttpURLConnection conn = initializeConnection(fullUrl);
1613
1614     try {
1615       conn.setRequestMethod("HEAD");
1616
1617     } catch (ProtocolException e) {
1618       shutdownConnection(conn);
1619       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1620     }
1621
1622     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1623
1624     int resultCode;
1625     try {
1626       resultCode = conn.getResponseCode();
1627     } catch (IOException e) {
1628       shutdownConnection(conn);
1629       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1630     }
1631     logger.debug("Response Code : " + resultCode);
1632
1633     opResult.setResultCode(resultCode);
1634
1635     // Generate a metrics log so we can track how long the operation took.
1636     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1637         new LogFields()
1638             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1639             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1640         override,
1641         indexName);
1642
1643     shutdownConnection(conn);
1644
1645     return opResult;
1646   }
1647
1648
1649   private void buildDocumentResult(DocumentOperationResult result, String index)
1650       throws DocumentStoreOperationException {
1651
1652     JSONParser parser = new JSONParser();
1653     JSONObject root;
1654     try {
1655       root = (JSONObject) parser.parse(result.getResult());
1656
1657       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1658         // Success response object
1659         Document doc = new Document();
1660         doc.setEtag(result.getResultVersion());
1661         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1662
1663         doc.setContent((JSONObject) root.get("_source"));
1664         result.setDocument(doc);
1665
1666       } else {
1667         // Error response object
1668         JSONObject error = (JSONObject) root.get("error");
1669         if (error != null) {
1670           result.setError(new ErrorResult(error.get("type").toString(),
1671               error.get("reason").toString()));
1672         }
1673
1674       }
1675     } catch (Exception e) {
1676       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1677           + result.getResult());
1678     }
1679
1680
1681   }
1682
1683   private String buildDocumentResponseUrl(String index, String id) {
1684     return ApiUtils.buildDocumentUri(index, id);
1685   }
1686
1687   private void buildSearchResult(SearchOperationResult result, String index)
1688       throws DocumentStoreOperationException {
1689
1690     JSONParser parser = new JSONParser();
1691     JSONObject root;
1692
1693     try {
1694       root = (JSONObject) parser.parse(result.getResult());
1695       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1696         JSONObject hits = (JSONObject) root.get("hits");
1697         JSONArray hitArray = (JSONArray) hits.get("hits");
1698         SearchHits searchHits = new SearchHits();
1699         searchHits.setTotalHits(hits.get("total").toString());
1700         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1701
1702         for (int i = 0; i < hitArray.size(); i++) {
1703           JSONObject hit = (JSONObject) hitArray.get(i);
1704           SearchHit searchHit = new SearchHit();
1705           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1706           Document doc = new Document();
1707           if (hit.get("_version") != null) {
1708             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1709           }
1710
1711           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1712               ? hit.get("_id").toString() : ""));
1713           doc.setContent((JSONObject) hit.get("_source"));
1714           searchHit.setDocument(doc);
1715           searchHitArray.add(searchHit);
1716         }
1717         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1718         result.setSearchResult(searchHits);
1719
1720         JSONObject aggregations = (JSONObject) root.get("aggregations");
1721         if (aggregations != null) {
1722           AggregationResult[] aggResults =
1723               AggregationParsingUtil.parseAggregationResults(aggregations);
1724           AggregationResults aggs = new AggregationResults();
1725           aggs.setAggregations(aggResults);
1726           result.setAggregationResult(aggs);
1727         }
1728
1729         // success
1730       } else {
1731         JSONObject error = (JSONObject) root.get("error");
1732         if (error != null) {
1733           result.setError(new ErrorResult(error.get("type").toString(),
1734               error.get("reason").toString()));
1735         }
1736       }
1737     } catch (Exception e) {
1738       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1739           + result.getResult());
1740     }
1741
1742   }
1743
1744   private void buildSuggestResult(SearchOperationResult result, String index)
1745           throws DocumentStoreOperationException {
1746
1747     JSONParser parser = new JSONParser ();
1748     JSONObject root;
1749     try {
1750       root = (JSONObject) parser.parse ( result.getResult () );
1751       if (result.getResultCode () >= 200 && result.getResultCode () <= 299) {
1752         JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" );
1753         JSONObject hitdata = (JSONObject) hitArray.get ( 0 );
1754         JSONArray optionsArray = (JSONArray) hitdata.get ( "options" );
1755         SuggestHits suggestHits = new SuggestHits ();
1756         suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) );
1757
1758         ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> ();
1759
1760         for (int i = 0; i < optionsArray.size (); i++) {
1761           JSONObject hit = (JSONObject) optionsArray.get ( i );
1762
1763           SuggestHit suggestHit = new SuggestHit ();
1764           suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" );
1765           suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" );
1766           Document doc = new Document ();
1767           if (hit.get ( "_version" ) != null) {
1768             doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" );
1769           }
1770           doc.setUrl ( buildDocumentResponseUrl ( index,
1771                   (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) );
1772
1773           doc.setContent ( (JSONObject) hit.get ( "payload" ) );
1774           suggestHit.setDocument ( doc );
1775           suggestHitArray.add ( suggestHit );
1776         }
1777         suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) );
1778         result.setSuggestResult ( suggestHits );
1779
1780         JSONObject aggregations = (JSONObject) root.get ( "aggregations" );
1781         if (aggregations != null) {
1782           AggregationResult[] aggResults =
1783                   AggregationParsingUtil.parseAggregationResults ( aggregations );
1784           AggregationResults aggs = new AggregationResults ();
1785           aggs.setAggregations ( aggResults );
1786           result.setAggregationResult ( aggs );
1787         }
1788
1789         // success
1790       } else {
1791         JSONObject error = (JSONObject) root.get ( "error" );
1792         if (error != null) {
1793           result.setError (
1794                   new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) );
1795         }
1796       }
1797     } catch (Exception e) {
1798       throw new DocumentStoreOperationException (
1799               "Failed to parse Elastic Search response." + result.getResult () );
1800     }
1801   }
1802
1803
1804   }