00f66e0aa65cd7773400ae2de32e8c808224f756
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
22
23 import com.att.aft.dme2.internal.google.common.base.Throwables;
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import edu.emory.mathcs.backport.java.util.Arrays;
30 import org.json.simple.JSONArray;
31 import org.json.simple.JSONObject;
32 import org.json.simple.parser.JSONParser;
33 import org.json.simple.parser.ParseException;
34 import org.onap.aai.sa.rest.AnalysisConfiguration;
35 import org.onap.aai.sa.rest.ApiUtils;
36 import org.onap.aai.sa.rest.BulkRequest;
37 import org.onap.aai.sa.rest.BulkRequest.OperationType;
38 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
39 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
40 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
41 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
42 import org.onap.aai.sa.searchdbabstraction.entity.Document;
43 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
44 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
45 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
46 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
47 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
48 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
49 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
50 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
51 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
52 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
53 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
54 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
55 import org.onap.aai.cl.api.LogFields;
56 import org.onap.aai.cl.api.LogLine;
57 import org.onap.aai.cl.api.Logger;
58 import org.onap.aai.cl.eelf.LoggerFactory;
59 import org.onap.aai.cl.mdc.MdcContext;
60 import org.onap.aai.cl.mdc.MdcOverride;
61 import org.onap.aai.sa.rest.DocumentSchema;
62
63 import java.io.BufferedReader;
64 import java.io.File;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.io.InputStreamReader;
69 import java.io.OutputStream;
70 import java.io.OutputStreamWriter;
71 import java.io.PrintWriter;
72 import java.io.StringWriter;
73 import java.net.HttpURLConnection;
74 import java.net.MalformedURLException;
75 import java.net.ProtocolException;
76 import java.net.URL;
77 import java.text.SimpleDateFormat;
78 import java.util.ArrayList;
79 import java.util.List;
80 import java.util.Properties;
81 import java.util.concurrent.atomic.AtomicBoolean;
82 import javax.ws.rs.core.Response.Status;
83
84 /**
85  * This class has the Elasticsearch implementation of the DB operations defined in
86  * DocumentStoreInterface.
87  */
88 public class ElasticSearchHttpController implements DocumentStoreInterface {
89
90   private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
91       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
92   private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
93       "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
94   private static final String BULK_IMPORT_INDEX_TEMPLATE =
95       "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
96   private static final String BULK_DELETE_TEMPLATE =
97       "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
98
99   private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
100       "Internal Error: ElasticSearch operation fault occurred";
101   private static final Logger logger =
102       LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
103   private static final Logger metricsLogger =
104       LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
105   private final ElasticSearchConfig config;
106
107   private static final String DEFAULT_TYPE = "default";
108
109   private static ElasticSearchHttpController instance = null;
110
111   protected AnalysisConfiguration analysisConfig;
112
113   public static ElasticSearchHttpController getInstance() {
114
115     synchronized (ElasticSearchHttpController.class) {
116
117       if (instance == null) {
118
119         Properties properties = new Properties();
120         File file = new File(SearchDbConstants.ES_CONFIG_FILE);
121         try {
122           properties.load(new FileInputStream(file));
123         } catch (Exception e) {
124           logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL,
125               "ElasticSearchHTTPController.getInstance", e.getLocalizedMessage());
126         }
127
128         ElasticSearchConfig config = new ElasticSearchConfig(properties);
129         instance = new ElasticSearchHttpController(config);
130       }
131     }
132
133     return instance;
134   }
135
136   public ElasticSearchHttpController(ElasticSearchConfig config) {
137     this.config = config;
138     analysisConfig = new AnalysisConfiguration();
139
140     try {
141       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
142       checkConnection();
143       logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
144     } catch (Exception e) {
145       logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
146           e.getMessage());
147     }
148   }
149
150   public AnalysisConfiguration getAnalysisConfig() {
151     return analysisConfig;
152   }
153
154   @Override
155   public OperationResult createIndex(String index, DocumentSchema documentSchema) {
156
157     OperationResult result = new OperationResult();
158     result.setResultCode(500);
159
160     try {
161
162       // Submit the request to ElasticSearch to create the index using a
163       // default document type.
164       result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
165           DocumentSchemaUtil.generateDocumentMappings(documentSchema));
166
167       // ElasticSearch will return us a 200 code on success when we
168       // want to report a 201, so translate the result here.
169       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
170       if (isSuccess(result)) {
171         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
172         // result.setResult("{\"index\": \"" + index + ", \"type\": \""
173         // + DEFAULT_TYPE + "\"}");
174       }
175
176     } catch (DocumentStoreOperationException e) {
177
178       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
179     }
180
181     return result;
182   }
183
184   @Override
185   public OperationResult createDynamicIndex(String index, String dynamicSchema) {
186     OperationResult result = new OperationResult();
187     result.setResultCode(500);
188
189     try {
190       result = createTable(index, dynamicSchema);
191
192       // ElasticSearch will return us a 200 code on success when we
193       // want to report a 201, so translate the result here.
194       result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
195       if (isSuccess(result)) {
196         result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
197       }
198     } catch (DocumentStoreOperationException e) {
199       result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
200     }
201
202     return result;
203   }
204
205   @Override
206   public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
207
208     // Initialize operation result with a failure codes / fault string
209     OperationResult opResult = new OperationResult();
210     opResult.setResultCode(500);
211     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
212
213     // Grab the current time so we can use it to generate a metrics log.
214     MdcOverride override = getStartTime(new MdcOverride());
215
216     String fullUrl = getFullUrl("/" + indexName + "/", false);
217     HttpURLConnection conn = initializeConnection(fullUrl);
218
219     logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
220
221     try {
222       conn.setRequestMethod("DELETE");
223     } catch (ProtocolException e) {
224       shutdownConnection(conn);
225       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
226     }
227
228     handleResponse(conn, opResult);
229
230     // Generate a metrics log so we can track how long the operation took.
231     metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME,
232         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
233             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
234         override, indexName);
235
236     shutdownConnection(conn);
237
238     return opResult;
239   }
240
241   private OperationResult checkConnection() throws Exception {
242
243     String fullUrl = getFullUrl("/_cluster/health", false);
244     URL url = null;
245     HttpURLConnection conn = null;
246
247     url = new URL(fullUrl);
248     conn = (HttpURLConnection) url.openConnection();
249     conn.setRequestMethod("GET");
250     conn.setDoOutput(true);
251     logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
252
253     int resultCode = conn.getResponseCode();
254     logger.debug("getClusterHealth() response Code : " + resultCode);
255     OperationResult opResult = new OperationResult();
256     opResult.setResultCode(resultCode);
257
258     shutdownConnection(conn);
259
260     return opResult;
261   }
262
263   private String getFullUrl(String resourceUrl, boolean isSecure) {
264
265     final String host = config.getIpAddress();
266     final String port = config.getHttpPort();
267
268     if (isSecure) {
269       return String.format("https://%s:%s%s", host, port, resourceUrl);
270     } else {
271       return String.format("http://%s:%s%s", host, port, resourceUrl);
272     }
273   }
274
275   private void shutdownConnection(HttpURLConnection connection) {
276     if (connection == null) {
277       return;
278     }
279
280     InputStream inputstream = null;
281     OutputStream outputstream = null;
282
283     try {
284       inputstream = connection.getInputStream();
285     } catch (IOException e) {
286       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
287           e.getLocalizedMessage());
288     } finally {
289       if (inputstream != null) {
290         try {
291           inputstream.close();
292         } catch (IOException e) {
293           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
294               e.getLocalizedMessage());
295         }
296       }
297     }
298
299     try {
300       outputstream = connection.getOutputStream();
301     } catch (IOException e) {
302       logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
303           e.getLocalizedMessage());
304     } finally {
305       if (outputstream != null) {
306         try {
307           outputstream.close();
308         } catch (IOException e) {
309           logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
310               e.getLocalizedMessage());
311         }
312       }
313     }
314
315     connection.disconnect();
316   }
317
318   // @Override
319   protected OperationResult createTable(String indexName, String typeName, String indexSettings,
320       String indexMappings) throws DocumentStoreOperationException {
321
322     if (indexSettings == null) {
323       logger.debug("No settings provided.");
324     }
325
326     if (indexMappings == null) {
327       logger.debug("No mappings provided.");
328     }
329
330     OperationResult opResult = new OperationResult();
331
332     // Initialize operation result with a failure codes / fault string
333     opResult.setResultCode(500);
334     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
335
336     // Grab the current time so we can use it to generate a metrics log.
337     MdcOverride override = getStartTime(new MdcOverride());
338
339     String fullUrl = getFullUrl("/" + indexName + "/", false);
340     HttpURLConnection conn = initializeConnection(fullUrl);
341
342     try {
343       conn.setRequestMethod("PUT");
344     } catch (ProtocolException e) {
345       shutdownConnection(conn);
346       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
347     }
348
349     StringBuilder sb = new StringBuilder(128);
350     sb.append("{ \"settings\" : ");
351     sb.append(indexSettings);
352     sb.append(",");
353
354     sb.append("\"mappings\" : {");
355     sb.append("\"" + typeName + "\" :");
356     sb.append(indexMappings);
357     sb.append("}}");
358
359     attachContent(conn, sb.toString());
360
361     logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
362     logger.debug("Request content: " + sb.toString());
363
364     handleResponse(conn, opResult);
365
366     shutdownConnection(conn);
367
368     // Generate a metrics log so we can track how long the operation took.
369     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
370         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
371             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
372         override, indexName);
373
374     return opResult;
375   }
376
377   /**
378    * Will send the passed in JSON payload to Elasticsearch using the provided index name in an
379    * attempt to create the index.
380    * 
381    * @param indexName - The name of the index to be created
382    * @param settingsAndMappings - The actual JSON object that will define the index
383    * @return - The operation result of writing into Elasticsearch
384    * @throws DocumentStoreOperationException
385    */
386   protected OperationResult createTable(String indexName, String settingsAndMappings)
387       throws DocumentStoreOperationException {
388     OperationResult result = new OperationResult();
389     result.setResultCode(500);
390     result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
391
392     // Grab the current time so we can use it to generate a metrics log.
393     MdcOverride override = getStartTime(new MdcOverride());
394
395     String fullUrl = getFullUrl("/" + indexName + "/", false);
396     HttpURLConnection conn = initializeConnection(fullUrl);
397
398     try {
399       conn.setRequestMethod("PUT");
400     } catch (ProtocolException e) {
401       shutdownConnection(conn);
402       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
403     }
404
405     attachContent(conn, settingsAndMappings);
406     handleResponse(conn, result);
407
408     // Generate a metrics log so we can track how long the operation took.
409     metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME,
410         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
411             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
412         override, indexName);
413
414     shutdownConnection(conn);
415
416     return result;
417   }
418
419   @Override
420   public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
421       boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
422
423     if (!allowImplicitIndexCreation) {
424
425       // Before we do anything, make sure that the specified index
426       // actually exists in the
427       // document store - we don't want to rely on ElasticSearch to fail
428       // the document
429       // create because it could be configured to implicitly create a
430       // non-existent index,
431       // which can lead to hard-to-debug behaviour with queries down the
432       // road.
433       OperationResult indexExistsResult = checkIndexExistence(indexName);
434       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
435
436         DocumentOperationResult opResult = new DocumentOperationResult();
437         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
438         opResult.setResult("Document Index '" + indexName + "' does not exist.");
439         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
440         return opResult;
441       }
442     }
443
444     if (document.getId() == null || document.getId().isEmpty()) {
445       return createDocumentWithoutId(indexName, document);
446     } else {
447       return createDocumentWithId(indexName, document);
448     }
449   }
450
451   private DocumentOperationResult createDocumentWithId(String indexName,
452       DocumentStoreDataEntity document) throws DocumentStoreOperationException {
453     // check if the document already exists
454     DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
455
456     if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
457       if (opResult.getResultCode() == Status.OK.getStatusCode()) {
458         opResult.setFailureCause("A document with the same id already exists.");
459       } else {
460         opResult.setFailureCause(
461             "Failed to verify a document with the specified id does not already exist.");
462       }
463       opResult.setResultCode(Status.CONFLICT.getStatusCode());
464       return opResult;
465     }
466
467     opResult = new DocumentOperationResult();
468     // Initialize operation result with a failure codes / fault string
469     opResult.setResultCode(500);
470     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
471
472     // Grab the current time so we can use it to generate a metrics log.
473     MdcOverride override = getStartTime(new MdcOverride());
474
475     String fullUrl =
476         getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
477     HttpURLConnection conn = initializeConnection(fullUrl);
478
479     try {
480       conn.setRequestMethod("PUT");
481     } catch (ProtocolException e) {
482       shutdownConnection(conn);
483       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
484     }
485
486     attachDocument(conn, document);
487
488     logger.debug("Sending 'PUT' request to: " + conn.getURL());
489
490     handleResponse(conn, opResult);
491     buildDocumentResult(opResult, indexName);
492
493     // Generate a metrics log so we can track how long the operation took.
494     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
495         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
496             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
497         override, indexName);
498
499     shutdownConnection(conn);
500
501     return opResult;
502
503   }
504
505   private DocumentOperationResult createDocumentWithoutId(String indexName,
506       DocumentStoreDataEntity document) throws DocumentStoreOperationException {
507
508     DocumentOperationResult response = new DocumentOperationResult();
509     // Initialize operation result with a failure codes / fault string
510     response.setResultCode(500);
511     response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
512
513     // Grab the current time so we can use it to generate a metrics log.
514     MdcOverride override = getStartTime(new MdcOverride());
515
516     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
517     HttpURLConnection conn = initializeConnection(fullUrl);
518
519     try {
520       conn.setRequestMethod("POST");
521     } catch (ProtocolException e) {
522       shutdownConnection(conn);
523       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
524     }
525
526     attachDocument(conn, document);
527
528     logger.debug("Sending 'POST' request to: " + conn.getURL());
529
530     handleResponse(conn, response);
531     buildDocumentResult(response, indexName);
532
533     // Generate a metrics log so we can track how long the operation took.
534     metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
535         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
536             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
537         override, indexName);
538
539     shutdownConnection(conn);
540
541     return response;
542   }
543
544   private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
545       throws DocumentStoreOperationException {
546     conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
547     conn.setRequestProperty("Connection", "Close");
548
549     attachContent(conn, doc.getContentInJson());
550   }
551
552   private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
553       throws DocumentStoreOperationException {
554     DocumentOperationResult opResult = new DocumentOperationResult();
555
556     // Initialize operation result with a failure codes / fault string
557     opResult.setResultCode(500);
558
559     // Grab the current time so we can use it to generate a metrics log.
560     MdcOverride override = getStartTime(new MdcOverride());
561
562     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
563     HttpURLConnection conn = initializeConnection(fullUrl);
564
565     try {
566       conn.setRequestMethod("HEAD");
567     } catch (ProtocolException e) {
568       shutdownConnection(conn);
569       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
570     }
571
572     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
573
574     int resultCode;
575     try {
576       resultCode = conn.getResponseCode();
577     } catch (IOException e) {
578       shutdownConnection(conn);
579       throw new DocumentStoreOperationException(
580           "Failed to get the response code from the connection.", e);
581     }
582
583     logger.debug("Response Code : " + resultCode);
584
585     opResult.setResultCode(resultCode);
586
587     // Generate a metrics log so we can track how long the operation took.
588     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
589         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
590             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
591         override, indexName, docId);
592
593     shutdownConnection(conn);
594
595     return opResult;
596   }
597
598   @Override
599   public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
600       boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
601
602     if (!allowImplicitIndexCreation) {
603
604       // Before we do anything, make sure that the specified index
605       // actually exists in the
606       // document store - we don't want to rely on ElasticSearch to fail
607       // the document
608       // create because it could be configured to implicitly create a
609       // non-existent index,
610       // which can lead to hard-to-debug behaviour with queries down the
611       // road.
612       OperationResult indexExistsResult = checkIndexExistence(indexName);
613       if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
614
615         DocumentOperationResult opResult = new DocumentOperationResult();
616         opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
617         opResult.setResult("Document Index '" + indexName + "' does not exist.");
618         opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
619         return opResult;
620       }
621     }
622
623     DocumentOperationResult opResult = new DocumentOperationResult();
624
625     // Initialize operation result with a failure codes / fault string
626     opResult.setResultCode(500);
627     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
628
629     // Grab the current time so we can use it to generate a metrics log.
630     MdcOverride override = getStartTime(new MdcOverride());
631
632     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
633         + "?version=" + document.getVersion(), false);
634     HttpURLConnection conn = initializeConnection(fullUrl);
635
636     try {
637       conn.setRequestMethod("PUT");
638     } catch (ProtocolException e) {
639       shutdownConnection(conn);
640       throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
641     }
642
643     attachDocument(conn, document);
644
645     logger.debug("Sending 'PUT' request to: " + conn.getURL());
646
647     handleResponse(conn, opResult);
648     buildDocumentResult(opResult, indexName);
649
650     // Generate a metrics log so we can track how long the operation took.
651     metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
652         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
653             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
654         override, indexName, document.getId());
655
656     shutdownConnection(conn);
657
658     return opResult;
659   }
660
661   @Override
662   public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
663       throws DocumentStoreOperationException {
664     DocumentOperationResult opResult = new DocumentOperationResult();
665
666     // Initialize operation result with a failure codes / fault string
667     opResult.setResultCode(500);
668     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
669
670     // Grab the current time so we can use it to generate a metrics log.
671     MdcOverride override = getStartTime(new MdcOverride());
672
673     String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
674         + "?version=" + document.getVersion(), false);
675     HttpURLConnection conn = initializeConnection(fullUrl);
676
677     try {
678       conn.setRequestMethod("DELETE");
679     } catch (ProtocolException e) {
680       shutdownConnection(conn);
681       throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
682     }
683
684     logger.debug("\nSending 'DELETE' request to " + conn.getURL());
685
686     handleResponse(conn, opResult);
687     buildDocumentResult(opResult, indexName);
688     // supress the etag and url in response for delete as they are not
689     // required
690     if (opResult.getDocument() != null) {
691       opResult.getDocument().setEtag(null);
692       opResult.getDocument().setUrl(null);
693     }
694
695     // Generate a metrics log so we can track how long the operation took.
696     metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
697         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
698             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
699         override, indexName, document.getId());
700
701     shutdownConnection(conn);
702
703     return opResult;
704   }
705
706   @Override
707   public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
708       throws DocumentStoreOperationException {
709     DocumentOperationResult opResult = new DocumentOperationResult();
710
711     // Initialize operation result with a failure codes / fault string
712     opResult.setResultCode(500);
713     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
714
715     // Grab the current time so we can use it to generate a metrics log.
716     MdcOverride override = getStartTime(new MdcOverride());
717
718     String fullUrl = null;
719     if (document.getVersion() == null) {
720       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
721     } else {
722       fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId()
723           + "?version=" + document.getVersion(), false);
724     }
725     HttpURLConnection conn = initializeConnection(fullUrl);
726
727     logger.debug("\nSending 'GET' request to: " + conn.getURL());
728
729     handleResponse(conn, opResult);
730     buildDocumentResult(opResult, indexName);
731
732     // Generate a metrics log so we can track how long the operation took.
733     metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
734         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
735             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
736         override, indexName, document.getId());
737
738     shutdownConnection(conn);
739
740     return opResult;
741   }
742
743   public SearchOperationResult search(String indexName, String queryString)
744       throws DocumentStoreOperationException {
745     SearchOperationResult opResult = new SearchOperationResult();
746
747     // Initialize operation result with a failure codes / fault string
748     opResult.setResultCode(500);
749     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
750
751     String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
752
753     // Grab the current time so we can use it to generate a metrics log.
754     MdcOverride override = getStartTime(new MdcOverride());
755
756     HttpURLConnection conn = initializeConnection(fullUrl);
757
758     try {
759       conn.setRequestMethod("GET");
760     } catch (ProtocolException e) {
761       shutdownConnection(conn);
762       throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
763     }
764
765     logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
766
767     handleResponse(conn, opResult);
768     buildSearchResult(opResult, indexName);
769
770     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
771         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
772             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
773         override, indexName, queryString);
774
775     return opResult;
776   }
777
778   public SearchOperationResult searchWithPayload(String indexName, String query)
779       throws DocumentStoreOperationException {
780     SearchOperationResult opResult = new SearchOperationResult();
781
782     if (logger.isDebugEnabled()) {
783       logger.debug("Querying index: " + indexName + " with query string: " + query);
784     }
785
786     // Initialize operation result with a failure codes / fault string
787     opResult.setResultCode(500);
788     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
789
790     String fullUrl = getFullUrl("/" + indexName + "/_search", false);
791
792     // Grab the current time so we can use it to generate a metrics log.
793     MdcOverride override = getStartTime(new MdcOverride());
794
795     HttpURLConnection conn = initializeConnection(fullUrl);
796
797     try {
798       conn.setRequestMethod("POST");
799     } catch (ProtocolException e) {
800       shutdownConnection(conn);
801       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
802     }
803
804     attachContent(conn, query);
805
806     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
807     logger.debug("Request body =  Elasticsearch query = " + query);
808
809     handleResponse(conn, opResult);
810     buildSearchResult(opResult, indexName);
811
812     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
813         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
814             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
815         override, indexName, query);
816
817     shutdownConnection(conn);
818
819     return opResult;
820   }
821
822   public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
823       throws DocumentStoreOperationException {
824
825     SearchOperationResult opResult = new SearchOperationResult();
826
827     if (logger.isDebugEnabled()) {
828       logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
829     }
830
831     // Initialize operation result with a failure codes / fault string
832     opResult.setResultCode(500);
833     opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
834
835     String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
836
837     // Grab the current time so we can use it to generate a metrics log.
838     MdcOverride override = getStartTime(new MdcOverride());
839
840     HttpURLConnection conn = initializeConnection(fullUrl);
841
842     try {
843       conn.setRequestMethod("POST");
844     } catch (ProtocolException e) {
845       shutdownConnection(conn);
846       throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
847     }
848
849     attachContent(conn, query);
850
851     logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
852     logger.debug("Request body =  Elasticsearch query = " + query);
853
854     handleResponse(conn, opResult);
855     buildSuggestResult(opResult, indexName);
856
857     metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
858         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
859             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
860         override, indexName, query);
861
862     shutdownConnection(conn);
863
864     return opResult;
865   }
866
867   private void attachContent(HttpURLConnection conn, String content)
868       throws DocumentStoreOperationException {
869     OutputStream outputStream = null;
870     OutputStreamWriter out = null;
871
872     try {
873       outputStream = conn.getOutputStream();
874     } catch (IOException e) {
875       shutdownConnection(conn);
876       throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
877     }
878
879     out = new OutputStreamWriter(outputStream);
880
881     try {
882       out.write(content);
883       out.close();
884     } catch (IOException e) {
885       shutdownConnection(conn);
886       throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
887     }
888   }
889
890   private HttpURLConnection initializeConnection(String fullUrl)
891       throws DocumentStoreOperationException {
892     URL url = null;
893     HttpURLConnection conn = null;
894
895     try {
896       url = new URL(fullUrl);
897     } catch (MalformedURLException e) {
898       throw new DocumentStoreOperationException("Error building a URL with " + url, e);
899     }
900
901     try {
902       conn = (HttpURLConnection) url.openConnection();
903       conn.setDoOutput(true);
904     } catch (IOException e) {
905       shutdownConnection(conn);
906       throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
907     }
908
909     return conn;
910   }
911
912   private void handleResponse(HttpURLConnection conn, OperationResult opResult)
913       throws DocumentStoreOperationException {
914     int resultCode = 200;
915
916     try {
917       resultCode = conn.getResponseCode();
918     } catch (IOException e) {
919       shutdownConnection(conn);
920       throw new DocumentStoreOperationException(
921           "Failed to get the response code from the connection.", e);
922     }
923
924     logger.debug("Response Code : " + resultCode);
925
926     InputStream inputStream = null;
927
928     if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response
929                                                      // indicates success
930       inputStream = conn.getErrorStream();
931     } else {
932       try {
933         inputStream = conn.getInputStream();
934       } catch (IOException e) {
935         shutdownConnection(conn);
936         throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
937       }
938     }
939
940     InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
941     BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
942
943     StringBuilder result = new StringBuilder(128);
944     String string = null;
945
946     try {
947       while ((string = bufferedreader.readLine()) != null) {
948         result.append(string).append("\n");
949       }
950     } catch (IOException e) {
951       shutdownConnection(conn);
952       throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
953     }
954
955     if (resultCode == Status.CONFLICT.getStatusCode()) {
956       opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
957     } else {
958       opResult.setResultCode(resultCode);
959     }
960     if (logger.isDebugEnabled()) {
961       logger.debug("Raw result string from ElasticSearch = " + result.toString());
962     }
963     opResult.setResult(result.toString());
964     opResult.setResultVersion(extractVersion(result.toString()));
965   }
966
967   private String extractVersion(String result) throws DocumentStoreOperationException {
968
969     JSONParser parser = new JSONParser();
970     String version = null;
971     try {
972       JSONObject root = (JSONObject) parser.parse(result);
973       if (root.get("_version") != null) {
974         version = root.get("_version").toString();
975       }
976
977     } catch (ParseException e) {
978
979       // Not all responses from ElasticSearch include a version, so
980       // if we don't get one back, just return an empty string rather
981       // than trigger a false failure.
982       version = "";
983     }
984     return version;
985   }
986
987   /**
988    * This convenience method gets the current system time and stores it in an attribute in the
989    * supplied {@link MdcOverride} object so that it can be used later by the metrics logger.
990    *
991    * @param override - The {@link MdcOverride} object to update.
992    * @return - The supplied {@link MdcOverride} object.
993    */
994   private MdcOverride getStartTime(MdcOverride override) {
995
996     // Grab the current time...
997     long startTimeInMs = System.currentTimeMillis();
998
999     // ...and add it as an attribute to the supplied MDC Override
1000     // object.
1001     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1002     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1003
1004     // Return the MdcOverride object that we were passed.
1005     // This looks odd, but it allows us to do stuff like:
1006     //
1007     // MdcOverride ov = getStartTime(new MdcOverride())
1008     //
1009     // which is quite handy, but also allows us to pass in an existing
1010     // MdcOverride object which already has some attributes set.
1011     return override;
1012   }
1013
1014   private boolean isSuccess(OperationResult result) {
1015
1016     return isSuccessCode(result.getResultCode());
1017   }
1018
1019   private boolean isSuccessCode(int statusCode) {
1020     return ((statusCode >= 200) && (statusCode < 300));
1021   }
1022
1023   @Override
1024   public OperationResult performBulkOperations(BulkRequest[] requests)
1025       throws DocumentStoreOperationException {
1026
1027     if (logger.isDebugEnabled()) {
1028       String dbgString = "ESController: performBulkOperations - Operations: ";
1029
1030       for (BulkRequest request : requests) {
1031         dbgString += "[" + request.toString() + "] ";
1032       }
1033
1034       logger.debug(dbgString);
1035     }
1036
1037     // Grab the current time so we can use it to generate a metrics log.
1038     MdcOverride override = getStartTime(new MdcOverride());
1039
1040     // Parse the supplied set of operations.
1041     // Iterate over the list of operations which we were provided and
1042     // translate them into a format that ElasticSearh understands.
1043     int opCount = 0;
1044     StringBuilder esOperationSet = new StringBuilder(128);
1045     List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>();
1046     for (BulkRequest request : requests) {
1047
1048       // Convert the request to the syntax ElasticSearch likes.
1049       if (buildEsOperation(request, esOperationSet, rejected)) {
1050         opCount++;
1051       }
1052     }
1053
1054     ElasticSearchBulkOperationResult opResult = null;
1055     if (opCount > 0) {
1056
1057       // Open an HTTP connection to the ElasticSearch back end.
1058       String fullUrl = getFullUrl("/_bulk", false);
1059       URL url;
1060       HttpURLConnection conn;
1061       try {
1062
1063         url = new URL(fullUrl);
1064         conn = (HttpURLConnection) url.openConnection();
1065         conn.setRequestMethod("PUT");
1066         conn.setDoOutput(true);
1067         conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1068         conn.setRequestProperty("Connection", "Close");
1069
1070       } catch (IOException e) {
1071
1072         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1073         if (logger.isDebugEnabled()) {
1074           logger.debug(Throwables.getStackTraceAsString(e));
1075         }
1076
1077         throw new DocumentStoreOperationException(
1078             "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
1079       }
1080
1081       StringBuilder bulkResult = new StringBuilder(128);
1082       try {
1083         // Create an output stream to write our request to.
1084         OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());;
1085
1086         if (logger.isDebugEnabled()) {
1087           logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1088           logger.debug(
1089               "ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1090         }
1091
1092         // Write the resulting request string to our output stream.
1093         // (this sends the request to ES?)
1094         out.write(esOperationSet.toString());
1095         out.close();
1096
1097         // Open an input stream on our connection in order to read back
1098         // the results.
1099         InputStream is = conn.getInputStream();
1100         InputStreamReader inputstreamreader = new InputStreamReader(is);
1101         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1102
1103         // Read the contents of the input stream into our result
1104         // string...
1105         String esResponseString = null;
1106
1107         while ((esResponseString = bufferedreader.readLine()) != null) {
1108           bulkResult.append(esResponseString).append("\n");
1109         }
1110
1111       } catch (IOException e) {
1112
1113         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1114         if (logger.isDebugEnabled()) {
1115           StringWriter sw = new StringWriter();
1116           e.printStackTrace(new PrintWriter(sw));
1117           logger.debug(sw.toString());
1118         }
1119
1120         throw new DocumentStoreOperationException(
1121             "Failure interacting with document store.  Cause: " + e.getMessage(), e);
1122       }
1123
1124       if (logger.isDebugEnabled()) {
1125         logger.debug(
1126             "ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1127       }
1128
1129       // ...and marshal the resulting string into a Java object.
1130       try {
1131         opResult = marshallEsBulkResult(bulkResult.toString());
1132
1133       } catch (IOException e) {
1134
1135         logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1136         if (logger.isDebugEnabled()) {
1137           logger.debug(Throwables.getStackTraceAsString(e));
1138         }
1139
1140         throw new DocumentStoreOperationException(
1141             "Failed to marshal response body.  Cause: " + e.getMessage(), e);
1142       }
1143     }
1144
1145     // Finally, build the operation result and return it to the caller.
1146     OperationResult result = new OperationResult();
1147     result.setResultCode(207);
1148     result.setResult(buildGenericBulkResultSet(opResult, rejected));
1149
1150     // In the success case we don't want the entire result string to be
1151     // dumped into the metrics log, so concatenate it.
1152     String resultStringForMetricsLog = result.getResult();
1153     if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1154       resultStringForMetricsLog =
1155           resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85))
1156               + "...";
1157     }
1158
1159     metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1160         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1161             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1162         override);
1163
1164     return result;
1165   }
1166
1167   /**
1168    * This method converts a {@link BulkRequest} object into a json structure which can be understood
1169    * by ElasticSearch.
1170    *
1171    * @param request - The request to be performed.
1172    * @param sb - The string builder to append the json data to
1173    * @throws DocumentStoreOperationException
1174    */
1175   private boolean buildEsOperation(BulkRequest request, StringBuilder sb,
1176       List<ElasticSearchResultItem> fails) throws DocumentStoreOperationException {
1177
1178     boolean retVal = true;
1179     OperationResult indexExistsResult = null;
1180
1181     // What kind of operation are we performing?
1182     switch (request.getOperationType()) {
1183
1184       // Create a new document.
1185       case CREATE:
1186
1187         // Make sure that we were supplied a document payload.
1188         if (request.getOperation().getDocument() == null) {
1189
1190           fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1191               request.getIndex(), request.getId(), 400,
1192               request.getOperation().getMetaData().getUrl()));
1193           return false;
1194         }
1195
1196         // Make sure that the supplied document URL is formatted
1197         // correctly.
1198         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1199           fails.add(generateRejectionEntry(request.getOperationType(),
1200               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1201               request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1202           return false;
1203         }
1204
1205         // Validate that the specified index actually exists before we
1206         // try to perform the create.
1207         if (!indexExists(
1208             ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1209
1210           fails
1211               .add(generateRejectionEntry(request.getOperationType(),
1212                   "Specified resource does not exist: "
1213                       + request.getOperation().getMetaData().getUrl(),
1214                   request.getIndex(), request.getId(), 404,
1215                   request.getOperation().getMetaData().getUrl()));
1216           return false;
1217         }
1218
1219         // If we were supplied an id for the new document, then
1220         // include it in the bulk operation to Elastic Search
1221         if (request.getId() == null) {
1222
1223           sb.append(
1224               String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1225
1226           // Otherwise, we just leave that parameter off and ElasticSearch
1227           // will generate one for us.
1228         } else {
1229           sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1230               request.getId()));
1231         }
1232
1233         try {
1234           // Append the document that we want to create.
1235           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1236         } catch (JsonProcessingException e) {
1237           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1238         }
1239
1240         break;
1241
1242       // Update an existing document.
1243       case UPDATE:
1244
1245         // Make sure that we were supplied a document payload.
1246         if (request.getOperation().getDocument() == null) {
1247
1248           fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1249               request.getIndex(), request.getId(), 400,
1250               request.getOperation().getMetaData().getUrl()));
1251           return false;
1252         }
1253
1254         // Make sure that the supplied document URL is formatted
1255         // correctly.
1256         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1257           fails.add(generateRejectionEntry(request.getOperationType(),
1258               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1259               request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1260           return false;
1261         }
1262
1263         // Validate that the specified index actually exists before we
1264         // try to perform the update.
1265         if (!indexExists(request.getIndex())) {
1266
1267           fails
1268               .add(generateRejectionEntry(request.getOperationType(),
1269                   "Specified resource does not exist: "
1270                       + request.getOperation().getMetaData().getUrl(),
1271                   request.getIndex(), request.getId(), 404,
1272                   request.getOperation().getMetaData().getUrl()));
1273           return false;
1274         }
1275
1276         // Validate that the document we are trying to update actually
1277         // exists before we try to perform the update.
1278         if (!documentExists(request.getIndex(), request.getId())) {
1279
1280           fails
1281               .add(generateRejectionEntry(request.getOperationType(),
1282                   "Specified resource does not exist: "
1283                       + request.getOperation().getMetaData().getUrl(),
1284                   request.getIndex(), request.getId(), 404,
1285                   request.getOperation().getMetaData().getUrl()));
1286           return false;
1287         }
1288
1289         // It is mandatory that a version be supplied for an update
1290         // operation,
1291         // so validate that now.
1292         if (request.getOperation().getMetaData().getEtag() == null) {
1293
1294           fails.add(generateRejectionEntry(request.getOperationType(),
1295               "Missing mandatory ETag field", request.getIndex(), request.getId(), 400,
1296               request.getOperation().getMetaData().getUrl()));
1297           return false;
1298         }
1299
1300         // Generate the update request...
1301         sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1302             request.getId(), request.getOperation().getMetaData().getEtag()));
1303
1304         // ...and append the document that we want to update.
1305         try {
1306           sb.append(request.getOperation().getDocument().toJson()).append("\n");
1307         } catch (JsonProcessingException e) {
1308           throw new DocumentStoreOperationException("Failure parsing document to json", e);
1309         }
1310         break;
1311
1312       // Delete an existing document.
1313       case DELETE:
1314
1315         // Make sure that the supplied document URL is formatted
1316         // correctly.
1317         if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1318           fails.add(generateRejectionEntry(request.getOperationType(),
1319               "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1320               request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1321           return false;
1322         }
1323
1324         // Validate that the specified index actually exists before we
1325         // try to perform the delete.
1326         if (!indexExists(request.getIndex())) {
1327
1328           fails
1329               .add(generateRejectionEntry(request.getOperationType(),
1330                   "Specified resource does not exist: "
1331                       + request.getOperation().getMetaData().getUrl(),
1332                   request.getIndex(), request.getId(), 404,
1333                   request.getOperation().getMetaData().getUrl()));
1334           return false;
1335         }
1336
1337         // Validate that the document we are trying to update actually
1338         // exists before we try to perform the delete.
1339         if (!documentExists(request.getIndex(), request.getId())) {
1340
1341           fails
1342               .add(generateRejectionEntry(request.getOperationType(),
1343                   "Specified resource does not exist: "
1344                       + request.getOperation().getMetaData().getUrl(),
1345                   request.getIndex(), request.getId(), 404,
1346                   request.getOperation().getMetaData().getUrl()));
1347           return false;
1348         }
1349
1350         // It is mandatory that a version be supplied for a delete
1351         // operation,
1352         // so validate that now.
1353         if (request.getOperation().getMetaData().getEtag() == null) {
1354
1355           fails.add(generateRejectionEntry(request.getOperationType(),
1356               "Missing mandatory ETag field", request.getIndex(), request.getId(), 400,
1357               request.getOperation().getMetaData().getUrl()));
1358           return false;
1359         }
1360
1361         // Generate the delete request.
1362         sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1363             request.getId(), request.getOperation().getMetaData().getEtag()));
1364         break;
1365       default:
1366     }
1367
1368     return retVal;
1369   }
1370
1371   private boolean indexExists(String index) throws DocumentStoreOperationException {
1372
1373     OperationResult indexExistsResult = checkIndexExistence(index);
1374
1375     return ((indexExistsResult.getResultCode() >= 200)
1376         && (indexExistsResult.getResultCode() < 300));
1377   }
1378
1379   private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1380
1381     OperationResult docExistsResult = checkDocumentExistence(index, id);
1382
1383     return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1384   }
1385
1386   /**
1387    * This method constructs a status entry for a bulk operation which has been rejected before even
1388    * sending it to the document store.
1389    *
1390    * @param rejectReason - A message describing why the operation was rejected.
1391    * @param anId - The identifier associated with the document being acted on.
1392    * @param statusCode - An HTTP status code.
1393    * @return - A result set item.
1394    */
1395   private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason,
1396       String index, String anId, int statusCode, String originalUrl) {
1397
1398     ElasticSearchError err = new ElasticSearchError();
1399     err.setReason(rejectReason);
1400
1401     ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1402     op.setIndex(index);
1403     op.setId(anId);
1404     op.setStatus(statusCode);
1405     op.setError(err);
1406     op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1407
1408     ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1409
1410     switch (opType) {
1411       case CREATE:
1412         rejectionResult.setCreate(op);
1413         break;
1414       case UPDATE:
1415         rejectionResult.setIndex(op);
1416         break;
1417       case DELETE:
1418         rejectionResult.setDelete(op);
1419         break;
1420       default:
1421     }
1422
1423     return rejectionResult;
1424   }
1425
1426   /**
1427    * This method takes the json structure returned from ElasticSearch in response to a bulk
1428    * operations request and marshals it into a Java object.
1429    *
1430    * @param jsonResult - The bulk operations response returned from ElasticSearch.
1431    * @return - The marshalled response.
1432    * @throws JsonParseException
1433    * @throws JsonMappingException
1434    * @throws IOException
1435    */
1436   private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult)
1437       throws JsonParseException, JsonMappingException, IOException {
1438
1439     if (jsonResult != null) {
1440       if (logger.isDebugEnabled()) {
1441         logger.debug("ESController: Marshalling ES result set from json: "
1442             + jsonResult.replaceAll("\n", ""));
1443       }
1444
1445       ObjectMapper mapper = new ObjectMapper();
1446       mapper.setSerializationInclusion(Include.NON_EMPTY);
1447
1448       return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1449     }
1450
1451     return null;
1452   }
1453
1454   /**
1455    * This method takes the marshalled ElasticSearch bulk response and converts it into a generic
1456    * response payload.
1457    *
1458    * @param esResult - ElasticSearch bulk operations response.
1459    * @return - A generic result set.
1460    */
1461   private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1462       List<ElasticSearchResultItem> rejectedOps) {
1463
1464     int totalOps = 0;
1465     int totalSuccess = 0;
1466     int totalFails = 0;
1467
1468     if (logger.isDebugEnabled()) {
1469
1470       logger.debug("ESController: Build generic result set.  ES Results: "
1471           + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: "
1472           + rejectedOps.toString());
1473     }
1474
1475     // Build a combined list of result items from the results returned
1476     // from ElasticSearch and the list of operations that we rejected
1477     // without sending to ElasticSearch.
1478     List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>();
1479     if (esResult != null) {
1480       combinedResults.addAll(Arrays.asList(esResult.getItems()));
1481     }
1482     combinedResults.addAll(rejectedOps);
1483
1484     // Iterate over the individual results in the resulting result set.
1485     StringBuilder resultsBuilder = new StringBuilder();
1486     AtomicBoolean firstItem = new AtomicBoolean(true);
1487     for (ElasticSearchResultItem item : combinedResults) {
1488
1489       // Increment the operation counts.
1490       totalOps++;
1491       if (isSuccessCode(item.operationStatus().getStatus())) {
1492         totalSuccess++;
1493       } else {
1494         totalFails++;
1495       }
1496
1497       // Prepend a comma to our response string unless this it the
1498       // first result in the set.
1499       if (!firstItem.compareAndSet(true, false)) {
1500         resultsBuilder.append(", ");
1501       }
1502
1503       // Append the current result as a generic json structure.
1504       resultsBuilder.append(item.toJson());
1505     }
1506
1507     // Now, build the result string and return it.
1508     String responseBody = "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": "
1509         + totalSuccess + ", " + "\"total_fails\": " + totalFails + ", " + "\"results\": ["
1510         + resultsBuilder.toString() + "]}";
1511
1512     return responseBody;
1513   }
1514
1515   /**
1516    * This method queryies ElasticSearch to determine if the supplied index is present in the
1517    * document store.
1518    *
1519    * @param indexName - The index to look for.
1520    * @return - An operation result indicating the success or failure of the check.
1521    * @throws DocumentStoreOperationException
1522    */
1523   public OperationResult checkIndexExistence(String indexName)
1524       throws DocumentStoreOperationException {
1525
1526     // Initialize operation result with a failure codes / fault string
1527     OperationResult opResult = new OperationResult();
1528     opResult.setResultCode(500);
1529
1530     // Grab the current time so we can use it to generate a metrics log.
1531     MdcOverride override = getStartTime(new MdcOverride());
1532
1533     String fullUrl = getFullUrl("/" + indexName, false);
1534     HttpURLConnection conn = initializeConnection(fullUrl);
1535
1536     try {
1537       conn.setRequestMethod("HEAD");
1538
1539     } catch (ProtocolException e) {
1540       shutdownConnection(conn);
1541       throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1542     }
1543
1544     logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1545
1546     int resultCode;
1547     try {
1548       resultCode = conn.getResponseCode();
1549     } catch (IOException e) {
1550       shutdownConnection(conn);
1551       throw new DocumentStoreOperationException(
1552           "Failed to get the response code from the connection.", e);
1553     }
1554     logger.debug("Response Code : " + resultCode);
1555
1556     opResult.setResultCode(resultCode);
1557
1558     // Generate a metrics log so we can track how long the operation took.
1559     metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME,
1560         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1561             .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1562         override, indexName);
1563
1564     shutdownConnection(conn);
1565
1566     return opResult;
1567   }
1568
1569   private void buildDocumentResult(DocumentOperationResult result, String index)
1570       throws DocumentStoreOperationException {
1571
1572     JSONParser parser = new JSONParser();
1573     JSONObject root;
1574     try {
1575       root = (JSONObject) parser.parse(result.getResult());
1576
1577       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1578         // Success response object
1579         Document doc = new Document();
1580         doc.setEtag(result.getResultVersion());
1581         doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1582
1583         doc.setContent((JSONObject) root.get("_source"));
1584         result.setDocument(doc);
1585
1586       } else {
1587         // Error response object
1588         JSONObject error = (JSONObject) root.get("error");
1589         if (error != null) {
1590           result.setError(
1591               new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1592         }
1593
1594       }
1595     } catch (Exception e) {
1596       throw new DocumentStoreOperationException(
1597           "Failed to parse Elastic Search response." + result.getResult());
1598     }
1599
1600   }
1601
1602   private String buildDocumentResponseUrl(String index, String id) {
1603     return ApiUtils.buildDocumentUri(index, id);
1604   }
1605
1606   private void buildSearchResult(SearchOperationResult result, String index)
1607       throws DocumentStoreOperationException {
1608
1609     JSONParser parser = new JSONParser();
1610     JSONObject root;
1611
1612     try {
1613       root = (JSONObject) parser.parse(result.getResult());
1614       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1615         JSONObject hits = (JSONObject) root.get("hits");
1616         JSONArray hitArray = (JSONArray) hits.get("hits");
1617         SearchHits searchHits = new SearchHits();
1618         searchHits.setTotalHits(hits.get("total").toString());
1619         ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>();
1620
1621         for (int i = 0; i < hitArray.size(); i++) {
1622           JSONObject hit = (JSONObject) hitArray.get(i);
1623           SearchHit searchHit = new SearchHit();
1624           searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1625           Document doc = new Document();
1626           if (hit.get("_version") != null) {
1627             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1628           }
1629
1630           doc.setUrl(buildDocumentResponseUrl(index,
1631               (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1632           doc.setContent((JSONObject) hit.get("_source"));
1633           searchHit.setDocument(doc);
1634           searchHitArray.add(searchHit);
1635         }
1636         searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1637         result.setSearchResult(searchHits);
1638
1639         JSONObject aggregations = (JSONObject) root.get("aggregations");
1640         if (aggregations != null) {
1641           AggregationResult[] aggResults =
1642               AggregationParsingUtil.parseAggregationResults(aggregations);
1643           AggregationResults aggs = new AggregationResults();
1644           aggs.setAggregations(aggResults);
1645           result.setAggregationResult(aggs);
1646         }
1647
1648         // success
1649       } else {
1650         JSONObject error = (JSONObject) root.get("error");
1651         if (error != null) {
1652           result.setError(
1653               new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1654         }
1655       }
1656     } catch (Exception e) {
1657       throw new DocumentStoreOperationException(
1658           "Failed to parse Elastic Search response." + result.getResult());
1659     }
1660
1661   }
1662
1663   private void buildSuggestResult(SearchOperationResult result, String index)
1664       throws DocumentStoreOperationException {
1665
1666     JSONParser parser = new JSONParser();
1667     JSONObject root;
1668     try {
1669       root = (JSONObject) parser.parse(result.getResult());
1670       if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1671         JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1672         JSONObject hitdata = (JSONObject) hitArray.get(0);
1673         JSONArray optionsArray = (JSONArray) hitdata.get("options");
1674         SuggestHits suggestHits = new SuggestHits();
1675         suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1676
1677         ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit>();
1678
1679         for (int i = 0; i < optionsArray.size(); i++) {
1680           JSONObject hit = (JSONObject) optionsArray.get(i);
1681
1682           SuggestHit suggestHit = new SuggestHit();
1683           suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1684           suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1685           Document doc = new Document();
1686           if (hit.get("_version") != null) {
1687             doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1688           }
1689           doc.setUrl(buildDocumentResponseUrl(index,
1690               (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1691
1692           doc.setContent((JSONObject) hit.get("payload"));
1693           suggestHit.setDocument(doc);
1694           suggestHitArray.add(suggestHit);
1695         }
1696         suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1697         result.setSuggestResult(suggestHits);
1698
1699         JSONObject aggregations = (JSONObject) root.get("aggregations");
1700         if (aggregations != null) {
1701           AggregationResult[] aggResults =
1702               AggregationParsingUtil.parseAggregationResults(aggregations);
1703           AggregationResults aggs = new AggregationResults();
1704           aggs.setAggregations(aggResults);
1705           result.setAggregationResult(aggs);
1706         }
1707
1708         // success
1709       } else {
1710         JSONObject error = (JSONObject) root.get("error");
1711         if (error != null) {
1712           result.setError(
1713               new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1714         }
1715       }
1716     } catch (Exception e) {
1717       throw new DocumentStoreOperationException(
1718           "Failed to parse Elastic Search response." + result.getResult());
1719     }
1720
1721   }
1722
1723 }