019d867a3a60f70bf26801d86703f32a51f89dea
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
22
23 import com.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import org.json.simple.JSONArray;
30 import org.json.simple.JSONObject;
31 import org.json.simple.parser.JSONParser;
32 import org.json.simple.parser.ParseException;
33 import org.onap.aai.sa.rest.AnalysisConfiguration;
34 import org.onap.aai.sa.rest.ApiUtils;
35 import org.onap.aai.sa.rest.BulkRequest;
36 import org.onap.aai.sa.rest.BulkRequest.OperationType;
37 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
39 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
41 import org.onap.aai.sa.searchdbabstraction.entity.Document;
42 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
43 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
48 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
49 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
50 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
51 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
52 import org.onap.aai.cl.api.LogFields;
53 import org.onap.aai.cl.api.LogLine;
54 import org.onap.aai.cl.api.Logger;
55 import org.onap.aai.cl.eelf.LoggerFactory;
56 import org.onap.aai.cl.mdc.MdcContext;
57 import org.onap.aai.cl.mdc.MdcOverride;
58 import org.onap.aai.sa.rest.DocumentSchema;
59 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
60 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
61
62 import java.io.BufferedReader;
63 import java.io.File;
64 import java.io.FileInputStream;
65 import java.io.IOException;
66 import java.io.InputStream;
67 import java.io.InputStreamReader;
68 import java.io.OutputStream;
69 import java.io.OutputStreamWriter;
70 import java.io.PrintWriter;
71 import java.io.StringWriter;
72 import java.net.HttpURLConnection;
73 import java.net.MalformedURLException;
74 import java.net.ProtocolException;
75 import java.net.URL;
76 import java.text.SimpleDateFormat;
77 import java.util.ArrayList;
78 import java.util.Arrays;
79 import java.util.List;
80 import java.util.Properties;
81 import java.util.concurrent.atomic.AtomicBoolean;
82 import javax.ws.rs.core.Response.Status;
83
84
85 /**
86  * This class has the Elasticsearch implementation of the
87  * DB operations defined in DocumentStoreInterface.
88  */
89 public class ElasticSearchHttpController implements DocumentStoreInterface {
90
91   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
92       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
93   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
94       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
95   private static final String BULK_IMPORT_INDEX_TEMPLATE =
96       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
97   private static final String BULK_DELETE_TEMPLATE =
98       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
99
100   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
101       "Internal Error: ElasticSearch operation fault occurred";
102   private static final Logger logger = LoggerFactory.getInstance()
103       .getLogger(ElasticSearchHttpController.class.getName());
104   private static final Logger metricsLogger = LoggerFactory.getInstance()
105       .getMetricsLogger(ElasticSearchHttpController.class.getName());
106   private final ElasticSearchConfig config;
107
108   private static final String DEFAULT_TYPE = "default";
109
110   private static ElasticSearchHttpController instance = null;
111
112   protected AnalysisConfiguration analysisConfig;
113
114   public static ElasticSearchHttpController getInstance() {
115
116     synchronized (ElasticSearchHttpController.class) {
117
118       if (instance == null) {
119
120         Properties properties = new Properties();
121         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
122         try {
123           properties.load(new FileInputStream(file));
124         } catch (Exception e) {
125           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
126             "ElasticSearchHTTPController.getInstance",
127             e.getLocalizedMessage());
128         }
129
130         ElasticSearchConfig config = new ElasticSearchConfig(properties);
131         instance = new ElasticSearchHttpController(config);
132       }
133     }
134
135     return instance;
136   }
137
138   public ElasticSearchHttpController(ElasticSearchConfig config) {
139     this.config = config;
140     analysisConfig = new AnalysisConfiguration();
141
142     try {
143       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
144       checkConnection();
145       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
146     } catch (Exception e) {
147       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e,
148           getFullUrl("", false), e.getMessage());
149     }
150   }
151
152
153   public AnalysisConfiguration getAnalysisConfig() {
154     return analysisConfig;
155   }
156
157   @Override
158   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
159
160     OperationResult result = new OperationResult();
161     result.setResultCode(500);
162
163     try {
164
165       // Submit the request to ElasticSearch to create the index using a
166       // default document type.
167       result = createTable(index,
168           DEFAULT_TYPE,
169           analysisConfig.getEsIndexSettings(),
170           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
171
172       // ElasticSearch will return us a 200 code on success when we
173       // want to report a 201, so translate the result here.
174       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
175       if (isSuccess(result)) {
176         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
177         //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}");
178       }
179
180     } catch (DocumentStoreOperationException e) {
181
182       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
183     }
184
185     return result;
186   }
187
188   @Override
189   public OperationResult createDynamicIndex(String index, String dynamicSchema) {
190     OperationResult result = new OperationResult();
191     result.setResultCode(500);
192
193     try {
194       result = createTable(index, dynamicSchema);
195
196       // ElasticSearch will return us a 200 code on success when we
197       // want to report a 201, so translate the result here.
198       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
199       if (isSuccess(result)) {
200         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
201       }
202     } catch (DocumentStoreOperationException e) {
203       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
204     }
205
206     return result;
207   }
208
209
210   @Override
211   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
212
213     //Initialize operation result with a failure codes / fault string
214     OperationResult opResult = new OperationResult();
215     opResult.setResultCode(500);
216     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
217
218     // Grab the current time so we can use it to generate a metrics log.
219     MdcOverride override = getStartTime(new MdcOverride());
220
221     String fullUrl = getFullUrl("/" + indexName + "/", false);
222     HttpURLConnection conn = initializeConnection(fullUrl);
223
224     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
225
226     try {
227       conn.setRequestMethod("DELETE");
228     } catch (ProtocolException e) {
229       shutdownConnection(conn);
230       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
231     }
232
233     handleResponse(conn, opResult);
234
235     // Generate a metrics log so we can track how long the operation took.
236     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
237         new LogFields()
238             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
239             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
240         override,
241         indexName);
242
243     shutdownConnection(conn);
244
245     return opResult;
246   }
247
248
249   private OperationResult checkConnection() throws Exception {
250
251     String fullUrl = getFullUrl("/_cluster/health", false);
252     URL url = null;
253     HttpURLConnection conn = null;
254
255     url = new URL(fullUrl);
256     conn = (HttpURLConnection) url.openConnection();
257     conn.setRequestMethod("GET");
258     conn.setDoOutput(true);
259     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
260
261     int resultCode = conn.getResponseCode();
262     logger.debug("getClusterHealth() response Code : " + resultCode);
263     OperationResult opResult = new OperationResult();
264     opResult.setResultCode(resultCode);
265
266     shutdownConnection(conn);
267
268     return opResult;
269   }
270
271   private String getFullUrl(String resourceUrl, boolean isSecure) {
272
273     final String host = config.getIpAddress();
274     final String port = config.getHttpPort();
275
276     if (isSecure) {
277       return String.format("https://%s:%s%s", host, port, resourceUrl);
278     } else {
279       return String.format("http://%s:%s%s", host, port, resourceUrl);
280     }
281   }
282
283   private void shutdownConnection(HttpURLConnection connection) {
284     if (connection == null) {
285       return;
286     }
287
288     InputStream inputstream = null;
289     OutputStream outputstream = null;
290
291     try {
292       inputstream = connection.getInputStream();
293     } catch (IOException e) {
294       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
295     } finally {
296       if (inputstream != null) {
297         try {
298           inputstream.close();
299         } catch (IOException e) {
300           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
301               e.getLocalizedMessage());
302         }
303       }
304     }
305
306     try {
307       outputstream = connection.getOutputStream();
308     } catch (IOException e) {
309       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
310     } finally {
311       if (outputstream != null) {
312         try {
313           outputstream.close();
314         } catch (IOException e) {
315           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
316               e.getLocalizedMessage());
317         }
318       }
319     }
320
321     connection.disconnect();
322   }
323
324   //@Override
325   protected OperationResult createTable(String indexName, String typeName,
326                                         String indexSettings, String indexMappings)
327       throws DocumentStoreOperationException {
328
329     if (indexSettings == null) {
330       logger.debug("No settings provided.");
331     }
332
333     if (indexMappings == null) {
334       logger.debug("No mappings provided.");
335     }
336
337     OperationResult opResult = new OperationResult();
338
339     // Initialize operation result with a failure codes / fault string
340     opResult.setResultCode(500);
341     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
342
343     // Grab the current time so we can use it to generate a metrics log.
344     MdcOverride override = getStartTime(new MdcOverride());
345
346     String fullUrl = getFullUrl("/" + indexName + "/", false);
347     HttpURLConnection conn = initializeConnection(fullUrl);
348
349     try {
350       conn.setRequestMethod("PUT");
351     } catch (ProtocolException e) {
352       shutdownConnection(conn);
353       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
354     }
355
356     StringBuilder sb = new StringBuilder(128);
357     sb.append("{ \"settings\" : ");
358     sb.append(indexSettings);
359     sb.append(",");
360
361     sb.append("\"mappings\" : {");
362     sb.append("\"" + typeName + "\" :");
363     sb.append(indexMappings);
364     sb.append("}}");
365
366     attachContent(conn, sb.toString());
367
368     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
369     logger.debug("Request content: " + sb.toString());
370
371     handleResponse(conn, opResult);
372
373     shutdownConnection(conn);
374
375     // Generate a metrics log so we can track how long the operation took.
376     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
377         new LogFields()
378             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
379             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
380         override,
381         indexName);
382
383     return opResult;
384   }
385
386   /**
387    * Will send the passed in JSON payload to Elasticsearch using the
388    * provided index name in an attempt to create the index.
389    *
390    * @param indexName - The name of the index to be created
391    * @param settingsAndMappings - The actual JSON object that will define the index
392    * @return - The operation result of writing into Elasticsearch
393    * @throws DocumentStoreOperationException
394    */
395   protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException {
396     OperationResult result = new OperationResult();
397     result.setResultCode(500);
398     result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
399
400     // Grab the current time so we can use it to generate a metrics log.
401     MdcOverride override = getStartTime(new MdcOverride());
402
403     String fullUrl = getFullUrl("/" + indexName + "/", false);
404     HttpURLConnection conn = initializeConnection(fullUrl);
405
406     try {
407       conn.setRequestMethod("PUT");
408     } catch (ProtocolException e) {
409       shutdownConnection(conn);
410       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
411     }
412
413     attachContent(conn, settingsAndMappings);
414     handleResponse(conn, result);
415
416     // Generate a metrics log so we can track how long the operation took.
417     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
418             new LogFields()
419                     .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
420                     .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
421             override,
422             indexName);
423
424     return result;
425   }
426
427   @Override
428   public DocumentOperationResult createDocument(String                  indexName,
429                                                 DocumentStoreDataEntity document,
430                                                 boolean                 allowImplicitIndexCreation)
431       throws DocumentStoreOperationException {
432
433     if(!allowImplicitIndexCreation) {
434
435       // Before we do anything, make sure that the specified index actually exists in the
436       // document store - we don't want to rely on ElasticSearch to fail the document
437       // create because it could be configured to implicitly create a non-existent index,
438       // which can lead to hard-to-debug behaviour with queries down the road.
439       OperationResult indexExistsResult = checkIndexExistence(indexName);
440       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
441
442         DocumentOperationResult opResult = new DocumentOperationResult();
443         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
444         opResult.setResult("Document Index '" + indexName + "' does not exist.");
445         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
446         return opResult;
447       }
448     }
449
450     if (document.getId() == null || document.getId().isEmpty()) {
451       return createDocumentWithoutId(indexName, document);
452     } else {
453       return createDocumentWithId(indexName, document);
454     }
455   }
456
457   private DocumentOperationResult createDocumentWithId(String indexName,
458                                                        DocumentStoreDataEntity document)
459       throws DocumentStoreOperationException {
460     // check if the document already exists
461     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
462
463
464     if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
465       if (opResult.getResultCode() == Status.OK.getStatusCode()) {
466         opResult.setFailureCause("A document with the same id already exists.");
467       } else {
468         opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
469       }
470       opResult.setResultCode(Status.CONFLICT.getStatusCode());
471       return opResult;
472     }
473
474     opResult = new DocumentOperationResult();
475     // Initialize operation result with a failure codes / fault string
476     opResult.setResultCode(500);
477     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
478
479     // Grab the current time so we can use it to generate a metrics log.
480     MdcOverride override = getStartTime(new MdcOverride());
481
482     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE
483         + "/" + document.getId(), false);
484     HttpURLConnection conn = initializeConnection(fullUrl);
485
486     try {
487       conn.setRequestMethod("PUT");
488     } catch (ProtocolException e) {
489       shutdownConnection(conn);
490       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
491     }
492
493     attachDocument(conn, document);
494
495     logger.debug("Sending 'PUT' request to: " + conn.getURL());
496
497     handleResponse(conn, opResult);
498     buildDocumentResult(opResult, indexName);
499
500     // Generate a metrics log so we can track how long the operation took.
501     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
502         new LogFields()
503             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
504             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
505         override,
506         indexName);
507
508     shutdownConnection(conn);
509
510     return opResult;
511
512   }
513
514   private DocumentOperationResult createDocumentWithoutId(String indexName,
515                                                           DocumentStoreDataEntity document)
516       throws DocumentStoreOperationException {
517
518     DocumentOperationResult response = new DocumentOperationResult();
519     // Initialize operation result with a failure codes / fault string
520     response.setResultCode(500);
521     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
522
523     // Grab the current time so we can use it to generate a metrics log.
524     MdcOverride override = getStartTime(new MdcOverride());
525
526     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
527     HttpURLConnection conn = initializeConnection(fullUrl);
528
529     try {
530       conn.setRequestMethod("POST");
531     } catch (ProtocolException e) {
532       shutdownConnection(conn);
533       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
534     }
535
536     attachDocument(conn, document);
537
538     logger.debug("Sending 'POST' request to: " + conn.getURL());
539
540     handleResponse(conn, response);
541     buildDocumentResult(response, indexName);
542
543     // Generate a metrics log so we can track how long the operation took.
544     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
545         new LogFields()
546             .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
547             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
548         override,
549         indexName);
550
551     shutdownConnection(conn);
552
553     return response;
554   }
555
556   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
557       throws DocumentStoreOperationException {
558     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
559     conn.setRequestProperty("Connection", "Close");
560
561     attachContent(conn, doc.getContentInJson());
562   }
563
564   private DocumentOperationResult checkDocumentExistence(String indexName,
565                                                          String docId)
566       throws DocumentStoreOperationException {
567     DocumentOperationResult opResult = new DocumentOperationResult();
568
569     // Initialize operation result with a failure codes / fault string
570     opResult.setResultCode(500);
571
572     // Grab the current time so we can use it to generate a metrics log.
573     MdcOverride override = getStartTime(new MdcOverride());
574
575     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
576     HttpURLConnection conn = initializeConnection(fullUrl);
577
578     try {
579       conn.setRequestMethod("HEAD");
580     } catch (ProtocolException e) {
581       shutdownConnection(conn);
582       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
583     }
584
585     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
586
587     int resultCode;
588     try {
589       resultCode = conn.getResponseCode();
590     } catch (IOException e) {
591       shutdownConnection(conn);
592       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
593     }
594
595     logger.debug("Response Code : " + resultCode);
596
597     opResult.setResultCode(resultCode);
598
599     // Generate a metrics log so we can track how long the operation took.
600     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
601         new LogFields()
602             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
603             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
604         override,
605         indexName,
606         docId);
607
608     shutdownConnection(conn);
609
610     return opResult;
611   }
612
613   @Override
614   public DocumentOperationResult updateDocument(String                  indexName,
615                                                 DocumentStoreDataEntity document,
616                                                 boolean                 allowImplicitIndexCreation)
617       throws DocumentStoreOperationException {
618
619     if(!allowImplicitIndexCreation) {
620
621       // Before we do anything, make sure that the specified index actually exists in the
622       // document store - we don't want to rely on ElasticSearch to fail the document
623       // create because it could be configured to implicitly create a non-existent index,
624       // which can lead to hard-to-debug behaviour with queries down the road.
625       OperationResult indexExistsResult = checkIndexExistence(indexName);
626       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
627
628         DocumentOperationResult opResult = new DocumentOperationResult();
629         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
630         opResult.setResult("Document Index '" + indexName + "' does not exist.");
631         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
632         return opResult;
633       }
634     }
635
636     DocumentOperationResult opResult = new DocumentOperationResult();
637
638     // Initialize operation result with a failure codes / fault string
639     opResult.setResultCode(500);
640     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
641
642     // Grab the current time so we can use it to generate a metrics log.
643     MdcOverride override = getStartTime(new MdcOverride());
644
645     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
646         + "?version=" + document.getVersion(), false);
647     HttpURLConnection conn = initializeConnection(fullUrl);
648
649     try {
650       conn.setRequestMethod("PUT");
651     } catch (ProtocolException e) {
652       shutdownConnection(conn);
653       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
654     }
655
656     attachDocument(conn, document);
657
658     logger.debug("Sending 'PUT' request to: " + conn.getURL());
659
660     handleResponse(conn, opResult);
661     buildDocumentResult(opResult, indexName);
662
663     // Generate a metrics log so we can track how long the operation took.
664     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
665         new LogFields()
666             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
667             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
668         override,
669         indexName,
670         document.getId());
671
672     shutdownConnection(conn);
673
674     return opResult;
675   }
676
677   @Override
678   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
679       throws DocumentStoreOperationException {
680     DocumentOperationResult opResult = new DocumentOperationResult();
681
682     // Initialize operation result with a failure codes / fault string
683     opResult.setResultCode(500);
684     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
685
686     // Grab the current time so we can use it to generate a metrics log.
687     MdcOverride override = getStartTime(new MdcOverride());
688
689     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
690         + "?version=" + document.getVersion(), false);
691     HttpURLConnection conn = initializeConnection(fullUrl);
692
693     try {
694       conn.setRequestMethod("DELETE");
695     } catch (ProtocolException e) {
696       shutdownConnection(conn);
697       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
698     }
699
700     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
701
702     handleResponse(conn, opResult);
703     buildDocumentResult(opResult, indexName);
704     //supress the etag and url in response for delete as they are not required
705     if (opResult.getDocument() != null) {
706       opResult.getDocument().setEtag(null);
707       opResult.getDocument().setUrl(null);
708     }
709
710     // Generate a metrics log so we can track how long the operation took.
711     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
712         new LogFields()
713             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
714             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
715         override,
716         indexName,
717         document.getId());
718
719     shutdownConnection(conn);
720
721     return opResult;
722   }
723
724   @Override
725   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
726       throws DocumentStoreOperationException {
727     DocumentOperationResult opResult = new DocumentOperationResult();
728
729     // Initialize operation result with a failure codes / fault string
730     opResult.setResultCode(500);
731     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
732
733     // Grab the current time so we can use it to generate a metrics log.
734     MdcOverride override = getStartTime(new MdcOverride());
735
736     String fullUrl = null;
737     if (document.getVersion() == null) {
738       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
739     } else {
740       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
741           + "?version=" + document.getVersion(), false);
742     }
743     HttpURLConnection conn = initializeConnection(fullUrl);
744
745     logger.debug("\nSending 'GET' request to: " + conn.getURL());
746
747     handleResponse(conn, opResult);
748     buildDocumentResult(opResult, indexName);
749
750     // Generate a metrics log so we can track how long the operation took.
751     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
752         new LogFields()
753             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
754             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
755         override,
756         indexName,
757         document.getId());
758
759     shutdownConnection(conn);
760
761     return opResult;
762   }
763
764   public SearchOperationResult search(String indexName, String queryString)
765       throws DocumentStoreOperationException {
766     SearchOperationResult opResult = new SearchOperationResult();
767
768     // Initialize operation result with a failure codes / fault string
769     opResult.setResultCode(500);
770     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
771
772     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
773
774     // Grab the current time so we can use it to generate a metrics log.
775     MdcOverride override = getStartTime(new MdcOverride());
776
777     HttpURLConnection conn = initializeConnection(fullUrl);
778
779     try {
780       conn.setRequestMethod("GET");
781     } catch (ProtocolException e) {
782       shutdownConnection(conn);
783       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
784     }
785
786     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
787
788     handleResponse(conn, opResult);
789     buildSearchResult(opResult, indexName);
790
791
792     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
793         new LogFields()
794             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
795             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
796         override,
797         indexName,
798         queryString);
799
800     return opResult;
801   }
802
803   public SearchOperationResult searchWithPayload(String indexName, String query)
804       throws DocumentStoreOperationException {
805     SearchOperationResult opResult = new SearchOperationResult();
806
807     if (logger.isDebugEnabled()) {
808       logger.debug("Querying index: " + indexName + " with query string: " + query);
809     }
810
811     // Initialize operation result with a failure codes / fault string
812     opResult.setResultCode(500);
813     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
814
815     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
816
817     // Grab the current time so we can use it to generate a metrics log.
818     MdcOverride override = getStartTime(new MdcOverride());
819
820     HttpURLConnection conn = initializeConnection(fullUrl);
821
822     try {
823       conn.setRequestMethod("POST");
824     } catch (ProtocolException e) {
825       shutdownConnection(conn);
826       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
827     }
828
829     attachContent(conn, query);
830
831     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
832     logger.debug("Request body =  Elasticsearch query = " + query);
833
834     handleResponse(conn, opResult);
835     buildSearchResult(opResult, indexName);
836
837     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
838         new LogFields()
839             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
840             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
841         override,
842         indexName,
843         query);
844
845     shutdownConnection(conn);
846
847     return opResult;
848   }
849
850
851   public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
852           throws DocumentStoreOperationException {
853
854     SearchOperationResult opResult = new SearchOperationResult();
855
856     if (logger.isDebugEnabled()) {
857       logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
858     }
859
860     // Initialize operation result with a failure codes / fault string
861     opResult.setResultCode(500);
862     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
863
864     String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
865
866     // Grab the current time so we can use it to generate a metrics log.
867     MdcOverride override = getStartTime(new MdcOverride());
868
869     HttpURLConnection conn = initializeConnection(fullUrl);
870
871     try {
872       conn.setRequestMethod("POST");
873     } catch (ProtocolException e) {
874       shutdownConnection(conn);
875       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
876     }
877
878     attachContent(conn, query);
879
880     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
881     logger.debug("Request body =  Elasticsearch query = " + query);
882
883     handleResponse(conn, opResult);
884     buildSuggestResult(opResult, indexName);
885
886     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
887             new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
888                     .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
889             override, indexName, query);
890
891     shutdownConnection(conn);
892
893     return opResult;
894   }
895
896   private void attachContent(HttpURLConnection conn, String content)
897       throws DocumentStoreOperationException {
898     OutputStream outputStream = null;
899     OutputStreamWriter out = null;
900
901     try {
902       outputStream = conn.getOutputStream();
903     } catch (IOException e) {
904       shutdownConnection(conn);
905       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
906     }
907
908     out = new OutputStreamWriter(outputStream);
909
910     try {
911       out.write(content);
912       out.close();
913     } catch (IOException e) {
914       shutdownConnection(conn);
915       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
916     }
917   }
918
919   private HttpURLConnection initializeConnection(String fullUrl)
920       throws DocumentStoreOperationException {
921     URL url = null;
922     HttpURLConnection conn = null;
923
924     try {
925       url = new URL(fullUrl);
926     } catch (MalformedURLException e) {
927       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
928     }
929
930     try {
931       conn = (HttpURLConnection) url.openConnection();
932       conn.setDoOutput(true);
933     } catch (IOException e) {
934       shutdownConnection(conn);
935       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
936     }
937
938     return conn;
939   }
940
941   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
942       throws DocumentStoreOperationException {
943     int resultCode = 200;
944
945     try {
946       resultCode = conn.getResponseCode();
947     } catch (IOException e) {
948       shutdownConnection(conn);
949       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
950     }
951
952     logger.debug("Response Code : " + resultCode);
953
954     InputStream inputStream = null;
955
956     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
957       inputStream = conn.getErrorStream();
958     } else {
959       try {
960         inputStream = conn.getInputStream();
961       } catch (IOException e) {
962         shutdownConnection(conn);
963         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
964       }
965     }
966
967     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
968     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
969
970     StringBuilder result = new StringBuilder(128);
971     String string = null;
972
973     try {
974       while ((string = bufferedreader.readLine()) != null) {
975         result.append(string).append("\n");
976       }
977     } catch (IOException e) {
978       shutdownConnection(conn);
979       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
980     }
981
982     if (resultCode == Status.CONFLICT.getStatusCode()) {
983       opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
984     } else {
985       opResult.setResultCode(resultCode);
986     }
987     if (logger.isDebugEnabled()) {
988       logger.debug("Raw result string from ElasticSearch = " + result.toString());
989     }
990     opResult.setResult(result.toString());
991     opResult.setResultVersion(extractVersion(result.toString()));
992   }
993
994   private String extractVersion(String result) throws DocumentStoreOperationException {
995
996     JSONParser parser = new JSONParser();
997     String version = null;
998     try {
999       JSONObject root = (JSONObject) parser.parse(result);
1000       if (root.get("_version") != null) {
1001         version = root.get("_version").toString();
1002       }
1003
1004     } catch (ParseException e) {
1005
1006       // Not all responses from ElasticSearch include a version, so
1007       // if we don't get one back, just return an empty string rather
1008       // than trigger a false failure.
1009       version = "";
1010     }
1011     return version;
1012   }
1013
1014   /**
1015    * This convenience method gets the current system time and stores
1016    * it in an attribute in the supplied {@link MdcOverride} object so
1017    * that it can be used later by the metrics logger.
1018    *
1019    * @param override - The {@link MdcOverride} object to update.
1020    * @return - The supplied {@link MdcOverride} object.
1021    */
1022   private MdcOverride getStartTime(MdcOverride override) {
1023
1024     // Grab the current time...
1025     long startTimeInMs = System.currentTimeMillis();
1026
1027     // ...and add it as an attribute to the supplied MDC Override
1028     // object.
1029     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1030     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1031
1032     // Return the MdcOverride object that we were passed.
1033     // This looks odd, but it allows us to do stuff like:
1034     //
1035     //    MdcOverride ov = getStartTime(new MdcOverride())
1036     //
1037     // which is quite handy, but also allows us to pass in an existing
1038     // MdcOverride object which already has some attributes set.
1039     return override;
1040   }
1041
1042   private boolean isSuccess(OperationResult result) {
1043
1044     return isSuccessCode(result.getResultCode());
1045   }
1046
1047
1048   private boolean isSuccessCode(int statusCode) {
1049     return ((statusCode >= 200) && (statusCode < 300));
1050   }
1051
1052
1053   @Override
1054   public OperationResult performBulkOperations(BulkRequest[] requests)
1055       throws DocumentStoreOperationException {
1056
1057     if (logger.isDebugEnabled()) {
1058       String dbgString = "ESController: performBulkOperations - Operations: ";
1059
1060       for (BulkRequest request : requests) {
1061         dbgString += "[" + request.toString() + "] ";
1062       }
1063
1064       logger.debug(dbgString);
1065     }
1066
1067     // Grab the current time so we can use it to generate a metrics log.
1068     MdcOverride override = getStartTime(new MdcOverride());
1069
1070     // Parse the supplied set of operations.
1071     // Iterate over the list of operations which we were provided and
1072     // translate them into a format that ElasticSearh understands.
1073     int opCount = 0;
1074     StringBuilder esOperationSet = new StringBuilder(128);
1075     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1076     for (BulkRequest request : requests) {
1077
1078       // Convert the request to the syntax ElasticSearch likes.
1079       if (buildEsOperation(request, esOperationSet, rejected)) {
1080         opCount++;
1081       }
1082     }
1083
1084     ElasticSearchBulkOperationResult opResult = null;
1085     if (opCount > 0) {
1086
1087       // Open an HTTP connection to the ElasticSearch back end.
1088       String fullUrl = getFullUrl("/_bulk", false);
1089       URL url;
1090       HttpURLConnection conn;
1091       try {
1092
1093         url = new URL(fullUrl);
1094         conn = (HttpURLConnection) url.openConnection();
1095         conn.setRequestMethod("PUT");
1096         conn.setDoOutput(true);
1097         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1098         conn.setRequestProperty("Connection", "Close");
1099
1100       } catch (IOException e) {
1101
1102         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1103         if (logger.isDebugEnabled()) {
1104           logger.debug(Throwables.getStackTraceAsString(e));
1105         }
1106
1107         throw new DocumentStoreOperationException("Failed to open connection to document store.  Cause: "
1108             + e.getMessage(), e);
1109       }
1110
1111       StringBuilder bulkResult = new StringBuilder(128);
1112       try {
1113         // Create an output stream to write our request to.
1114         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1115         ;
1116
1117         if (logger.isDebugEnabled()) {
1118           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1119           logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n",
1120               "\\n"));
1121         }
1122
1123         // Write the resulting request string to our output stream. (this sends the request to ES?)
1124         out.write(esOperationSet.toString());
1125         out.close();
1126
1127         // Open an input stream on our connection in order to read back the results.
1128         InputStream is = conn.getInputStream();
1129         InputStreamReader inputstreamreader = new InputStreamReader(is);
1130         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1131
1132         // Read the contents of the input stream into our result string...
1133         String esResponseString = null;
1134
1135         while ((esResponseString = bufferedreader.readLine()) != null) {
1136           bulkResult.append(esResponseString).append("\n");
1137         }
1138
1139       } catch (IOException e) {
1140
1141         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1142         if (logger.isDebugEnabled()) {
1143           StringWriter sw = new StringWriter();
1144           e.printStackTrace(new PrintWriter(sw));
1145           logger.debug(sw.toString());
1146         }
1147
1148         throw new DocumentStoreOperationException("Failure interacting with document store.  Cause: "
1149             + e.getMessage(), e);
1150       }
1151
1152       if (logger.isDebugEnabled()) {
1153         logger.debug("ESController: Received result string from ElasticSearch: = "
1154             + bulkResult.toString());
1155       }
1156
1157       // ...and marshal the resulting string into a Java object.
1158       try {
1159         opResult = marshallEsBulkResult(bulkResult.toString());
1160
1161       } catch (IOException e) {
1162
1163         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1164         if (logger.isDebugEnabled()) {
1165           logger.debug(Throwables.getStackTraceAsString(e));
1166         }
1167
1168         throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: "
1169             + e.getMessage(), e);
1170       }
1171     }
1172
1173     // Finally, build the operation result and return it to the caller.
1174     OperationResult result = new OperationResult();
1175     result.setResultCode(207);
1176     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1177
1178     // In the success case we don't want the entire result string to be
1179     // dumped into the metrics log, so concatenate it.
1180     String resultStringForMetricsLog = result.getResult();
1181     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1182       resultStringForMetricsLog = resultStringForMetricsLog.substring(0,
1183           Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1184     }
1185
1186     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1187         new LogFields()
1188             .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1189             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1190         override);
1191
1192     return result;
1193   }
1194
1195
1196   /**
1197    * This method converts a {@link BulkRequest} object into a json structure
1198    * which can be understood by ElasticSearch.
1199    *
1200    * @param request - The request to be performed.
1201    * @param sb      - The string builder to append the json data to
1202    * @throws DocumentStoreOperationException
1203    */
1204   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1205                                    List<ElasticSearchResultItem> fails)
1206       throws DocumentStoreOperationException {
1207
1208     boolean retVal = true;
1209     OperationResult indexExistsResult = null;
1210
1211     // What kind of operation are we performing?
1212     switch (request.getOperationType()) {
1213
1214       // Create a new document.
1215       case CREATE:
1216
1217         // Make sure that we were supplied a document payload.
1218         if (request.getOperation().getDocument() == null) {
1219
1220           fails.add(generateRejectionEntry(request.getOperationType(),
1221               "Missing document payload",
1222               request.getIndex(),
1223               request.getId(),
1224               400,
1225               request.getOperation().getMetaData().getUrl()));
1226           return false;
1227         }
1228
1229         // Make sure that the supplied document URL is formatted
1230         // correctly.
1231         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1232           fails.add(generateRejectionEntry(request.getOperationType(),
1233               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1234               request.getIndex(),
1235               "",
1236               400,
1237               request.getOperation().getMetaData().getUrl()));
1238           return false;
1239         }
1240
1241         // Validate that the specified index actually exists before we
1242         // try to perform the create.
1243         if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1244
1245           fails.add(generateRejectionEntry(request.getOperationType(),
1246               "Specified resource does not exist: "
1247                   + request.getOperation().getMetaData().getUrl(),
1248               request.getIndex(),
1249               request.getId(),
1250               404,
1251               request.getOperation().getMetaData().getUrl()));
1252           return false;
1253         }
1254
1255         // If we were supplied an id for the new document, then
1256         // include it in the bulk operation to Elastic Search
1257         if (request.getId() == null) {
1258
1259           sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE,
1260               request.getIndex(),
1261               DEFAULT_TYPE));
1262
1263           // Otherwise, we just leave that parameter off and ElasticSearch
1264           // will generate one for us.
1265         } else {
1266           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE,
1267               request.getIndex(),
1268               DEFAULT_TYPE,
1269               request.getId()));
1270         }
1271
1272         try {
1273           // Append the document that we want to create.
1274           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1275         } catch (JsonProcessingException e) {
1276           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1277         }
1278
1279         break;
1280
1281       // Update an existing document.
1282       case UPDATE:
1283
1284         // Make sure that we were supplied a document payload.
1285         if (request.getOperation().getDocument() == null) {
1286
1287           fails.add(generateRejectionEntry(request.getOperationType(),
1288               "Missing document payload",
1289               request.getIndex(),
1290               request.getId(),
1291               400,
1292               request.getOperation().getMetaData().getUrl()));
1293           return false;
1294         }
1295
1296         // Make sure that the supplied document URL is formatted
1297         // correctly.
1298         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1299           fails.add(generateRejectionEntry(request.getOperationType(),
1300               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1301               request.getIndex(),
1302               "",
1303               400,
1304               request.getOperation().getMetaData().getUrl()));
1305           return false;
1306         }
1307
1308         // Validate that the specified index actually exists before we
1309         // try to perform the update.
1310         if (!indexExists(request.getIndex())) {
1311
1312           fails.add(generateRejectionEntry(request.getOperationType(),
1313               "Specified resource does not exist: "
1314                   + request.getOperation().getMetaData().getUrl(),
1315               request.getIndex(),
1316               request.getId(),
1317               404,
1318               request.getOperation().getMetaData().getUrl()));
1319           return false;
1320         }
1321
1322         // Validate that the document we are trying to update actually
1323         // exists before we try to perform the update.
1324         if (!documentExists(request.getIndex(), request.getId())) {
1325
1326           fails.add(generateRejectionEntry(request.getOperationType(),
1327               "Specified resource does not exist: "
1328                   + request.getOperation().getMetaData().getUrl(),
1329               request.getIndex(),
1330               request.getId(),
1331               404,
1332               request.getOperation().getMetaData().getUrl()));
1333           return false;
1334         }
1335
1336         // It is mandatory that a version be supplied for an update operation,
1337         // so validate that now.
1338         if (request.getOperation().getMetaData().getEtag() == null) {
1339
1340           fails.add(generateRejectionEntry(request.getOperationType(),
1341               "Missing mandatory ETag field",
1342               request.getIndex(),
1343               request.getId(),
1344               400,
1345               request.getOperation().getMetaData().getUrl()));
1346           return false;
1347         }
1348
1349         // Generate the update request...
1350         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE,
1351             request.getIndex(),
1352             DEFAULT_TYPE,
1353             request.getId(),
1354             request.getOperation().getMetaData().getEtag()));
1355
1356         // ...and append the document that we want to update.
1357         try {
1358           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1359         } catch (JsonProcessingException e) {
1360           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1361         }
1362         break;
1363
1364       // Delete an existing document.
1365       case DELETE:
1366
1367         // Make sure that the supplied document URL is formatted
1368         // correctly.
1369         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1370           fails.add(generateRejectionEntry(request.getOperationType(),
1371               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1372               request.getIndex(),
1373               "",
1374               400,
1375               request.getOperation().getMetaData().getUrl()));
1376           return false;
1377         }
1378
1379         // Validate that the specified index actually exists before we
1380         // try to perform the delete.
1381         if (!indexExists(request.getIndex())) {
1382
1383           fails.add(generateRejectionEntry(request.getOperationType(),
1384               "Specified resource does not exist: "
1385                   + request.getOperation().getMetaData().getUrl(),
1386               request.getIndex(),
1387               request.getId(),
1388               404,
1389               request.getOperation().getMetaData().getUrl()));
1390           return false;
1391         }
1392
1393         // Validate that the document we are trying to update actually
1394         // exists before we try to perform the delete.
1395         if (!documentExists(request.getIndex(), request.getId())) {
1396
1397           fails.add(generateRejectionEntry(request.getOperationType(),
1398               "Specified resource does not exist: "
1399                   + request.getOperation().getMetaData().getUrl(),
1400               request.getIndex(),
1401               request.getId(),
1402               404,
1403               request.getOperation().getMetaData().getUrl()));
1404           return false;
1405         }
1406
1407         // It is mandatory that a version be supplied for a delete operation,
1408         // so validate that now.
1409         if (request.getOperation().getMetaData().getEtag() == null) {
1410
1411           fails.add(generateRejectionEntry(request.getOperationType(),
1412               "Missing mandatory ETag field",
1413               request.getIndex(),
1414               request.getId(),
1415               400,
1416               request.getOperation().getMetaData().getUrl()));
1417           return false;
1418         }
1419
1420         // Generate the delete request.
1421         sb.append(String.format(BULK_DELETE_TEMPLATE,
1422             request.getIndex(),
1423             DEFAULT_TYPE,
1424             request.getId(),
1425             request.getOperation().getMetaData().getEtag()));
1426         break;
1427       default:
1428     }
1429
1430     return retVal;
1431   }
1432
1433   private boolean indexExists(String index) throws DocumentStoreOperationException {
1434
1435     OperationResult indexExistsResult = checkIndexExistence(index);
1436
1437     return ((indexExistsResult.getResultCode() >= 200)
1438         && (indexExistsResult.getResultCode() < 300));
1439   }
1440
1441   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1442
1443     OperationResult docExistsResult = checkDocumentExistence(index, id);
1444
1445     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1446   }
1447
1448   /**
1449    * This method constructs a status entry for a bulk operation which has
1450    * been rejected before even sending it to the document store.
1451    *
1452    * @param rejectReason - A message describing why the operation was rejected.
1453    * @param anId         - The identifier associated with the document being
1454    *                     acted on.
1455    * @param statusCode  - An HTTP status code.
1456    * @return - A result set item.
1457    */
1458   private ElasticSearchResultItem generateRejectionEntry(OperationType opType,
1459                                                          String rejectReason,
1460                                                          String index,
1461                                                          String anId,
1462                                                          int statusCode,
1463                                                          String originalUrl) {
1464
1465     ElasticSearchError err = new ElasticSearchError();
1466     err.setReason(rejectReason);
1467
1468     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1469     op.setIndex(index);
1470     op.setId(anId);
1471     op.setStatus(statusCode);
1472     op.setError(err);
1473     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1474
1475     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1476
1477     switch (opType) {
1478       case CREATE:
1479         rejectionResult.setCreate(op);
1480         break;
1481       case UPDATE:
1482         rejectionResult.setIndex(op);
1483         break;
1484       case DELETE:
1485         rejectionResult.setDelete(op);
1486         break;
1487       default:
1488     }
1489
1490     return rejectionResult;
1491   }
1492
1493
1494   /**
1495    * This method takes the json structure returned from ElasticSearch in
1496    * response to a bulk operations request and marshals it into a Java
1497    * object.
1498    *
1499    * @param jsonResult - The bulk operations response returned from
1500    *                   ElasticSearch.
1501    * @return - The marshalled response.
1502    * @throws JsonParseException
1503    * @throws JsonMappingException
1504    * @throws IOException
1505    */
1506   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1507       throws JsonParseException, JsonMappingException, IOException {
1508
1509     if (jsonResult != null) {
1510       if (logger.isDebugEnabled()) {
1511         logger.debug("ESController: Marshalling ES result set from json: "
1512             + jsonResult.replaceAll("\n", ""));
1513       }
1514
1515       ObjectMapper mapper = new ObjectMapper();
1516       mapper.setSerializationInclusion(Include.NON_EMPTY);
1517
1518       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1519     }
1520
1521     return null;
1522   }
1523
1524
1525   /**
1526    * This method takes the marshalled ElasticSearch bulk response and
1527    * converts it into a generic response payload.
1528    *
1529    * @param esResult - ElasticSearch bulk operations response.
1530    * @return - A generic result set.
1531    */
1532   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1533                                            List<ElasticSearchResultItem> rejectedOps) {
1534
1535     int totalOps = 0;
1536     int totalSuccess = 0;
1537     int totalFails = 0;
1538
1539     if (logger.isDebugEnabled()) {
1540
1541       logger.debug("ESController: Build generic result set.  ES Results: "
1542           + ((esResult != null) ? esResult.toString() : "[]")
1543           + " Rejected Ops: " + rejectedOps.toString());
1544     }
1545
1546     // Build a combined list of result items from the results returned
1547     // from ElasticSearch and the list of operations that we rejected
1548     // without sending to ElasticSearch.
1549     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1550     if (esResult != null) {
1551       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1552     }
1553     combinedResults.addAll(rejectedOps);
1554
1555     // Iterate over the individual results in the resulting result set.
1556     StringBuilder resultsBuilder = new StringBuilder();
1557     AtomicBoolean firstItem = new AtomicBoolean(true);
1558     for (ElasticSearchResultItem item : combinedResults) {
1559
1560       // Increment the operation counts.
1561       totalOps++;
1562       if (isSuccessCode(item.operationStatus().getStatus())) {
1563         totalSuccess++;
1564       } else {
1565         totalFails++;
1566       }
1567
1568       // Prepend a comma to our response string unless this it the
1569       // first result in the set.
1570       if (!firstItem.compareAndSet(true, false)) {
1571         resultsBuilder.append(", ");
1572       }
1573
1574       // Append the current result as a generic json structure.
1575       resultsBuilder.append(item.toJson());
1576     }
1577
1578     // Now, build the result string and return it.
1579     String responseBody = "{ \"total_operations\": " + totalOps + ", "
1580         + "\"total_success\": " + totalSuccess + ", "
1581         + "\"total_fails\": " + totalFails + ", "
1582         + "\"results\": ["
1583         + resultsBuilder.toString()
1584         + "]}";
1585
1586     return responseBody;
1587   }
1588
1589
1590   /**
1591    * This method queryies ElasticSearch to determine if the supplied
1592    * index is present in the document store.
1593    *
1594    * @param indexName - The index to look for.
1595    * @return - An operation result indicating the success or failure of
1596    * the check.
1597    * @throws DocumentStoreOperationException
1598    */
1599   public OperationResult checkIndexExistence(String indexName)
1600       throws DocumentStoreOperationException {
1601
1602     // Initialize operation result with a failure codes / fault string
1603     OperationResult opResult = new OperationResult();
1604     opResult.setResultCode(500);
1605
1606     // Grab the current time so we can use it to generate a metrics log.
1607     MdcOverride override = getStartTime(new MdcOverride());
1608
1609     String fullUrl = getFullUrl("/" + indexName, false);
1610     HttpURLConnection conn = initializeConnection(fullUrl);
1611
1612     try {
1613       conn.setRequestMethod("HEAD");
1614
1615     } catch (ProtocolException e) {
1616       shutdownConnection(conn);
1617       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1618     }
1619
1620     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1621
1622     int resultCode;
1623     try {
1624       resultCode = conn.getResponseCode();
1625     } catch (IOException e) {
1626       shutdownConnection(conn);
1627       throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1628     }
1629     logger.debug("Response Code : " + resultCode);
1630
1631     opResult.setResultCode(resultCode);
1632
1633     // Generate a metrics log so we can track how long the operation took.
1634     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1635         new LogFields()
1636             .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1637             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1638         override,
1639         indexName);
1640
1641     shutdownConnection(conn);
1642
1643     return opResult;
1644   }
1645
1646
1647   private void buildDocumentResult(DocumentOperationResult result, String index)
1648       throws DocumentStoreOperationException {
1649
1650     JSONParser parser = new JSONParser();
1651     JSONObject root;
1652     try {
1653       root = (JSONObject) parser.parse(result.getResult());
1654
1655       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1656         // Success response object
1657         Document doc = new Document();
1658         doc.setEtag(result.getResultVersion());
1659         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1660
1661         doc.setContent((JSONObject) root.get("_source"));
1662         result.setDocument(doc);
1663
1664       } else {
1665         // Error response object
1666         JSONObject error = (JSONObject) root.get("error");
1667         if (error != null) {
1668           result.setError(new ErrorResult(error.get("type").toString(),
1669               error.get("reason").toString()));
1670         }
1671
1672       }
1673     } catch (Exception e) {
1674       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1675           + result.getResult());
1676     }
1677
1678
1679   }
1680
1681   private String buildDocumentResponseUrl(String index, String id) {
1682     return ApiUtils.buildDocumentUri(index, id);
1683   }
1684
1685   private void buildSearchResult(SearchOperationResult result, String index)
1686       throws DocumentStoreOperationException {
1687
1688     JSONParser parser = new JSONParser();
1689     JSONObject root;
1690
1691     try {
1692       root = (JSONObject) parser.parse(result.getResult());
1693       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1694         JSONObject hits = (JSONObject) root.get("hits");
1695         JSONArray hitArray = (JSONArray) hits.get("hits");
1696         SearchHits searchHits = new SearchHits();
1697         searchHits.setTotalHits(hits.get("total").toString());
1698         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1699
1700         for (int i = 0; i < hitArray.size(); i++) {
1701           JSONObject hit = (JSONObject) hitArray.get(i);
1702           SearchHit searchHit = new SearchHit();
1703           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1704           Document doc = new Document();
1705           if (hit.get("_version") != null) {
1706             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1707           }
1708
1709           doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null)
1710               ? hit.get("_id").toString() : ""));
1711           doc.setContent((JSONObject) hit.get("_source"));
1712           searchHit.setDocument(doc);
1713           searchHitArray.add(searchHit);
1714         }
1715         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1716         result.setSearchResult(searchHits);
1717
1718         JSONObject aggregations = (JSONObject) root.get("aggregations");
1719         if (aggregations != null) {
1720           AggregationResult[] aggResults =
1721               AggregationParsingUtil.parseAggregationResults(aggregations);
1722           AggregationResults aggs = new AggregationResults();
1723           aggs.setAggregations(aggResults);
1724           result.setAggregationResult(aggs);
1725         }
1726
1727         // success
1728       } else {
1729         JSONObject error = (JSONObject) root.get("error");
1730         if (error != null) {
1731           result.setError(new ErrorResult(error.get("type").toString(),
1732               error.get("reason").toString()));
1733         }
1734       }
1735     } catch (Exception e) {
1736       throw new DocumentStoreOperationException("Failed to parse Elastic Search response."
1737           + result.getResult());
1738     }
1739
1740   }
1741
1742   private void buildSuggestResult(SearchOperationResult result, String index)
1743           throws DocumentStoreOperationException {
1744
1745     JSONParser parser = new JSONParser ();
1746     JSONObject root;
1747     try {
1748       root = (JSONObject) parser.parse ( result.getResult () );
1749       if (result.getResultCode () >= 200 && result.getResultCode () <= 299) {
1750         JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" );
1751         JSONObject hitdata = (JSONObject) hitArray.get ( 0 );
1752         JSONArray optionsArray = (JSONArray) hitdata.get ( "options" );
1753         SuggestHits suggestHits = new SuggestHits ();
1754         suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) );
1755
1756         ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> ();
1757
1758         for (int i = 0; i < optionsArray.size (); i++) {
1759           JSONObject hit = (JSONObject) optionsArray.get ( i );
1760
1761           SuggestHit suggestHit = new SuggestHit ();
1762           suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" );
1763           suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" );
1764           Document doc = new Document ();
1765           if (hit.get ( "_version" ) != null) {
1766             doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" );
1767           }
1768           doc.setUrl ( buildDocumentResponseUrl ( index,
1769                   (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) );
1770
1771           doc.setContent ( (JSONObject) hit.get ( "payload" ) );
1772           suggestHit.setDocument ( doc );
1773           suggestHitArray.add ( suggestHit );
1774         }
1775         suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) );
1776         result.setSuggestResult ( suggestHits );
1777
1778         JSONObject aggregations = (JSONObject) root.get ( "aggregations" );
1779         if (aggregations != null) {
1780           AggregationResult[] aggResults =
1781                   AggregationParsingUtil.parseAggregationResults ( aggregations );
1782           AggregationResults aggs = new AggregationResults ();
1783           aggs.setAggregations ( aggResults );
1784           result.setAggregationResult ( aggs );
1785         }
1786
1787         // success
1788       } else {
1789         JSONObject error = (JSONObject) root.get ( "error" );
1790         if (error != null) {
1791           result.setError (
1792                   new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) );
1793         }
1794       }
1795     } catch (Exception e) {
1796       throw new DocumentStoreOperationException (
1797               "Failed to parse Elastic Search response." + result.getResult () );
1798     }
1799   }
1800
1801
1802   }