a93274e025dc7f3f3a517ea028df4afca5d7953d
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import com.google.common.base.Throwables;
30 import java.io.BufferedReader;
31 import java.io.File;
32 import java.io.FileInputStream;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.InputStreamReader;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.io.PrintWriter;
39 import java.io.StringWriter;
40 import java.net.HttpURLConnection;
41 import java.net.MalformedURLException;
42 import java.net.ProtocolException;
43 import java.net.URL;
44 import java.text.SimpleDateFormat;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.List;
48 import java.util.Properties;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import org.json.simple.JSONArray;
51 import org.json.simple.JSONObject;
52 import org.json.simple.parser.JSONParser;
53 import org.json.simple.parser.ParseException;
54 import org.onap.aai.cl.api.LogFields;
55 import org.onap.aai.cl.api.LogLine;
56 import org.onap.aai.cl.api.Logger;
57 import org.onap.aai.cl.eelf.LoggerFactory;
58 import org.onap.aai.cl.mdc.MdcContext;
59 import org.onap.aai.cl.mdc.MdcOverride;
60 import org.onap.aai.sa.rest.AnalysisConfiguration;
61 import org.onap.aai.sa.rest.ApiUtils;
62 import org.onap.aai.sa.rest.BulkRequest;
63 import org.onap.aai.sa.rest.BulkRequest.OperationType;
64 import org.onap.aai.sa.rest.DocumentSchema;
65 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
66 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
67 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
68 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
69 import org.onap.aai.sa.searchdbabstraction.entity.Document;
70 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
71 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
72 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
73 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
74 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
75 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
76 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
77 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
78 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
79 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
80 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
81 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
82 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
83 import org.springframework.http.HttpStatus;
84
85 /**
86  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
87  */
88 public class ElasticSearchHttpController implements DocumentStoreInterface {
89
90     private static ElasticSearchHttpController instance = null;
91
92     private static final Logger logger =
93             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
94     private static final Logger metricsLogger =
95             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
96
97     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
98             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
99     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
100             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
101     private static final String BULK_IMPORT_INDEX_TEMPLATE =
102             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
103     private static final String BULK_DELETE_TEMPLATE =
104             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
105
106     private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
107             "Internal Error: ElasticSearch operation fault occurred";
108     private final ElasticSearchConfig config;
109
110     private static final String DEFAULT_TYPE = "default";
111
112     protected AnalysisConfiguration analysisConfig;
113
114     public static ElasticSearchHttpController getInstance() {
115
116         synchronized (ElasticSearchHttpController.class) {
117
118             if (instance == null) {
119
120                 Properties properties = new Properties();
121                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
122                 try {
123                     properties.load(new FileInputStream(file));
124                 } catch (Exception e) {
125                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
126                             e.getLocalizedMessage());
127                 }
128
129                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
130                 instance = new ElasticSearchHttpController(config);
131             }
132         }
133
134         return instance;
135     }
136
137     public ElasticSearchHttpController(ElasticSearchConfig config) {
138         this.config = config;
139         analysisConfig = new AnalysisConfiguration();
140
141         try {
142             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
143             checkConnection();
144             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
145         } catch (Exception e) {
146             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
147                     e.getMessage());
148         }
149     }
150
151
152     public AnalysisConfiguration getAnalysisConfig() {
153         return analysisConfig;
154     }
155
156     @Override
157     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
158
159         OperationResult result = new OperationResult();
160         result.setResultCode(500);
161
162         try {
163
164             // Submit the request to ElasticSearch to create the index using a
165             // default document type.
166             result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
167                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
168
169             // ElasticSearch will return us a 200 code on success when we
170             // want to report a 201, so translate the result here.
171             result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
172             if (isSuccess(result)) {
173                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
174             }
175
176         } catch (DocumentStoreOperationException | IOException e) {
177
178             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
179         }
180
181         return result;
182     }
183
184     @Override
185     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
186         OperationResult result = new OperationResult();
187         result.setResultCode(500);
188
189         try {
190             result = createTable(index, dynamicSchema);
191
192             // ElasticSearch will return us a 200 code on success when we
193             // want to report a 201, so translate the result here.
194             result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
195             if (isSuccess(result)) {
196                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
197             }
198         } catch (DocumentStoreOperationException e) {
199             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
200         }
201
202         return result;
203     }
204
205
206     @Override
207     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
208
209         // Initialize operation result with a failure codes / fault string
210         OperationResult opResult = new OperationResult();
211         opResult.setResultCode(500);
212         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
213
214         // Grab the current time so we can use it to generate a metrics log.
215         MdcOverride override = getStartTime(new MdcOverride());
216
217         String fullUrl = getFullUrl("/" + indexName + "/", false);
218         HttpURLConnection conn = initializeConnection(fullUrl);
219
220         logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
221
222         try {
223             conn.setRequestMethod("DELETE");
224         } catch (ProtocolException e) {
225             shutdownConnection(conn);
226             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
227         }
228
229         handleResponse(conn, opResult);
230
231         // Generate a metrics log so we can track how long the operation took.
232         metricsLogger
233                 .info(SearchDbMsgs.DELETE_INDEX_TIME,
234                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
235                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
236                         override, indexName);
237
238         shutdownConnection(conn);
239
240         return opResult;
241     }
242
243
244     private OperationResult checkConnection() throws Exception {
245
246         String fullUrl = getFullUrl("/_cluster/health", false);
247         URL url = null;
248         HttpURLConnection conn = null;
249
250         url = new URL(fullUrl);
251         conn = (HttpURLConnection) url.openConnection();
252         conn.setRequestMethod("GET");
253         conn.setDoOutput(true);
254         logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
255
256         int resultCode = conn.getResponseCode();
257         logger.debug("getClusterHealth() response Code : " + resultCode);
258         OperationResult opResult = new OperationResult();
259         opResult.setResultCode(resultCode);
260
261         shutdownConnection(conn);
262
263         return opResult;
264     }
265
266     private String getFullUrl(String resourceUrl, boolean isSecure) {
267
268         final String host = config.getIpAddress();
269         final String port = config.getHttpPort();
270
271         if (isSecure) {
272             return String.format("https://%s:%s%s", host, port, resourceUrl);
273         } else {
274             return String.format("http://%s:%s%s", host, port, resourceUrl);
275         }
276     }
277
278     private void shutdownConnection(HttpURLConnection connection) {
279         if (connection == null) {
280             return;
281         }
282
283         InputStream inputstream = null;
284         OutputStream outputstream = null;
285
286         try {
287             inputstream = connection.getInputStream();
288         } catch (IOException e) {
289             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
290         } finally {
291             if (inputstream != null) {
292                 try {
293                     inputstream.close();
294                 } catch (IOException e) {
295                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
296                             e.getLocalizedMessage());
297                 }
298             }
299         }
300
301         try {
302             outputstream = connection.getOutputStream();
303         } catch (IOException e) {
304             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
305         } finally {
306             if (outputstream != null) {
307                 try {
308                     outputstream.close();
309                 } catch (IOException e) {
310                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
311                             e.getLocalizedMessage());
312                 }
313             }
314         }
315
316         connection.disconnect();
317     }
318
319     // @Override
320     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
321             throws DocumentStoreOperationException {
322
323         if (indexSettings == null) {
324             logger.debug("No settings provided.");
325         }
326
327         if (indexMappings == null) {
328             logger.debug("No mappings provided.");
329         }
330
331         OperationResult opResult = new OperationResult();
332
333         // Initialize operation result with a failure codes / fault string
334         opResult.setResultCode(500);
335         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
336
337         // Grab the current time so we can use it to generate a metrics log.
338         MdcOverride override = getStartTime(new MdcOverride());
339
340         String fullUrl = getFullUrl("/" + indexName + "/", false);
341         HttpURLConnection conn = initializeConnection(fullUrl);
342
343         try {
344             conn.setRequestMethod("PUT");
345         } catch (ProtocolException e) {
346             shutdownConnection(conn);
347             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
348         }
349
350         StringBuilder sb = new StringBuilder(128);
351         sb.append("{ \"settings\" : ");
352         sb.append(indexSettings);
353         sb.append(",");
354
355         sb.append("\"mappings\" : {");
356         sb.append("\"" + typeName + "\" :");
357         sb.append(indexMappings);
358         sb.append("}}");
359
360         try {
361             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
362         } catch (IOException e) {
363             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
364             throw new DocumentStoreOperationException(e.getMessage(), e);
365         }
366
367         logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
368         logger.debug("Request content: " + sb.toString());
369
370         handleResponse(conn, opResult);
371
372         shutdownConnection(conn);
373
374         // Generate a metrics log so we can track how long the operation took.
375         metricsLogger
376                 .info(SearchDbMsgs.CREATE_INDEX_TIME,
377                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
378                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
379                         override, indexName);
380
381         return opResult;
382     }
383
384     /**
385      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
386      * index.
387      *
388      * @param indexName - The name of the index to be created
389      * @param settingsAndMappings - The actual JSON object that will define the index
390      * @return - The operation result of writing into Elasticsearch
391      * @throws DocumentStoreOperationException
392      */
393     protected OperationResult createTable(String indexName, String settingsAndMappings)
394             throws DocumentStoreOperationException {
395         OperationResult result = new OperationResult();
396         result.setResultCode(500);
397         result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
398
399         // Grab the current time so we can use it to generate a metrics log.
400         MdcOverride override = getStartTime(new MdcOverride());
401
402         String fullUrl = getFullUrl("/" + indexName + "/", false);
403         HttpURLConnection conn = initializeConnection(fullUrl);
404
405         try {
406             conn.setRequestMethod("PUT");
407         } catch (ProtocolException e) {
408             shutdownConnection(conn);
409             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
410         }
411
412         try {
413             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
414         } catch (IOException e) {
415             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
416             throw new DocumentStoreOperationException(e.getMessage());
417         }
418         handleResponse(conn, result);
419
420         // Generate a metrics log so we can track how long the operation took.
421         metricsLogger
422                 .info(SearchDbMsgs.CREATE_INDEX_TIME,
423                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
424                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
425                         override, indexName);
426
427         return result;
428     }
429
430     @Override
431     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
432             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
433
434         if (!allowImplicitIndexCreation) {
435
436             // Before we do anything, make sure that the specified index actually exists in the
437             // document store - we don't want to rely on ElasticSearch to fail the document
438             // create because it could be configured to implicitly create a non-existent index,
439             // which can lead to hard-to-debug behaviour with queries down the road.
440             OperationResult indexExistsResult = checkIndexExistence(indexName);
441             if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
442
443                 DocumentOperationResult opResult = new DocumentOperationResult();
444                 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
445                 opResult.setResult("Document Index '" + indexName + "' does not exist.");
446                 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
447                 return opResult;
448             }
449         }
450
451         if (document.getId() == null || document.getId().isEmpty()) {
452             return createDocumentWithoutId(indexName, document);
453         } else {
454             return createDocumentWithId(indexName, document);
455         }
456     }
457
458     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
459             throws DocumentStoreOperationException {
460         // check if the document already exists
461         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
462
463
464         if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
465             if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
466                 opResult.setFailureCause("A document with the same id already exists.");
467             } else {
468                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
469             }
470             opResult.setResultCode(HttpStatus.CONFLICT.value());
471             return opResult;
472         }
473
474         opResult = new DocumentOperationResult();
475         // Initialize operation result with a failure codes / fault string
476         opResult.setResultCode(500);
477         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
478
479         // Grab the current time so we can use it to generate a metrics log.
480         MdcOverride override = getStartTime(new MdcOverride());
481
482         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
483         HttpURLConnection conn = initializeConnection(fullUrl);
484
485         try {
486             conn.setRequestMethod("PUT");
487         } catch (ProtocolException e) {
488             shutdownConnection(conn);
489             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
490         }
491
492         attachDocument(conn, document);
493
494         logger.debug("Sending 'PUT' request to: " + conn.getURL());
495
496         handleResponse(conn, opResult);
497         buildDocumentResult(opResult, indexName);
498
499         // Generate a metrics log so we can track how long the operation took.
500         metricsLogger
501                 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
502                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
503                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
504                         override, indexName);
505
506         shutdownConnection(conn);
507
508         return opResult;
509
510     }
511
512     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
513             throws DocumentStoreOperationException {
514
515         DocumentOperationResult response = new DocumentOperationResult();
516         // Initialize operation result with a failure codes / fault string
517         response.setResultCode(500);
518         response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
519
520         // Grab the current time so we can use it to generate a metrics log.
521         MdcOverride override = getStartTime(new MdcOverride());
522
523         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
524         HttpURLConnection conn = initializeConnection(fullUrl);
525
526         try {
527             conn.setRequestMethod("POST");
528         } catch (ProtocolException e) {
529             shutdownConnection(conn);
530             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
531         }
532
533         attachDocument(conn, document);
534
535         logger.debug("Sending 'POST' request to: " + conn.getURL());
536
537         handleResponse(conn, response);
538         buildDocumentResult(response, indexName);
539
540         // Generate a metrics log so we can track how long the operation took.
541         metricsLogger
542                 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
543                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
544                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
545                         override, indexName);
546
547         shutdownConnection(conn);
548
549         return response;
550     }
551
552     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
553             throws DocumentStoreOperationException {
554         conn.setRequestProperty("Connection", "Close");
555         attachContent(conn, doc.getContentInJson());
556     }
557
558     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
559             throws DocumentStoreOperationException {
560         DocumentOperationResult opResult = new DocumentOperationResult();
561
562         // Initialize operation result with a failure codes / fault string
563         opResult.setResultCode(500);
564
565         // Grab the current time so we can use it to generate a metrics log.
566         MdcOverride override = getStartTime(new MdcOverride());
567
568         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
569         HttpURLConnection conn = initializeConnection(fullUrl);
570
571         try {
572             conn.setRequestMethod("HEAD");
573         } catch (ProtocolException e) {
574             shutdownConnection(conn);
575             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
576         }
577
578         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
579
580         int resultCode;
581         try {
582             resultCode = conn.getResponseCode();
583         } catch (IOException e) {
584             shutdownConnection(conn);
585             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
586         }
587
588         logger.debug("Response Code : " + resultCode);
589
590         opResult.setResultCode(resultCode);
591
592         // Generate a metrics log so we can track how long the operation took.
593         metricsLogger
594                 .info(SearchDbMsgs.GET_DOCUMENT_TIME,
595                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
596                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
597                         override, indexName, docId);
598
599         shutdownConnection(conn);
600
601         return opResult;
602     }
603
604     @Override
605     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
606             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
607
608         if (!allowImplicitIndexCreation) {
609             // Before we do anything, make sure that the specified index actually exists in the
610             // document store - we don't want to rely on ElasticSearch to fail the document
611             // create because it could be configured to implicitly create a non-existent index,
612             // which can lead to hard-to-debug behaviour with queries down the road.
613             OperationResult indexExistsResult = checkIndexExistence(indexName);
614             if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
615
616                 DocumentOperationResult opResult = new DocumentOperationResult();
617                 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
618                 opResult.setResult("Document Index '" + indexName + "' does not exist.");
619                 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
620                 return opResult;
621             }
622         }
623
624         DocumentOperationResult opResult = new DocumentOperationResult();
625
626         // Initialize operation result with a failure codes / fault string
627         opResult.setResultCode(500);
628         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
629
630         // Grab the current time so we can use it to generate a metrics log.
631         MdcOverride override = getStartTime(new MdcOverride());
632
633         String fullUrl = getFullUrl(
634                 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
635                 false);
636         HttpURLConnection conn = initializeConnection(fullUrl);
637
638         try {
639             conn.setRequestMethod("PUT");
640         } catch (ProtocolException e) {
641             shutdownConnection(conn);
642             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
643         }
644
645         attachDocument(conn, document);
646
647         logger.debug("Sending 'PUT' request to: " + conn.getURL());
648
649         handleResponse(conn, opResult);
650         buildDocumentResult(opResult, indexName);
651
652         // Generate a metrics log so we can track how long the operation took.
653         metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
654                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
655                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
656                 override, indexName, document.getId());
657
658         shutdownConnection(conn);
659
660         return opResult;
661     }
662
663     @Override
664     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
665             throws DocumentStoreOperationException {
666         DocumentOperationResult opResult = new DocumentOperationResult();
667
668         // Initialize operation result with a failure codes / fault string
669         opResult.setResultCode(500);
670         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
671
672         // Grab the current time so we can use it to generate a metrics log.
673         MdcOverride override = getStartTime(new MdcOverride());
674
675         String fullUrl = getFullUrl(
676                 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
677                 false);
678         HttpURLConnection conn = initializeConnection(fullUrl);
679
680         try {
681             conn.setRequestMethod("DELETE");
682         } catch (ProtocolException e) {
683             shutdownConnection(conn);
684             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
685         }
686
687         logger.debug("\nSending 'DELETE' request to " + conn.getURL());
688
689         handleResponse(conn, opResult);
690         buildDocumentResult(opResult, indexName);
691         // supress the etag and url in response for delete as they are not required
692         if (opResult.getDocument() != null) {
693             opResult.getDocument().setEtag(null);
694             opResult.getDocument().setUrl(null);
695         }
696
697         // Generate a metrics log so we can track how long the operation took.
698         metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
699                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
700                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
701                 override, indexName, document.getId());
702
703         shutdownConnection(conn);
704
705         return opResult;
706     }
707
708     @Override
709     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
710             throws DocumentStoreOperationException {
711         DocumentOperationResult opResult = new DocumentOperationResult();
712
713         // Initialize operation result with a failure codes / fault string
714         opResult.setResultCode(500);
715         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
716
717         // Grab the current time so we can use it to generate a metrics log.
718         MdcOverride override = getStartTime(new MdcOverride());
719
720         String fullUrl = null;
721         if (document.getVersion() == null) {
722             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
723         } else {
724             fullUrl = getFullUrl(
725                     "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
726                     false);
727         }
728         HttpURLConnection conn = initializeConnection(fullUrl);
729
730         logger.debug("\nSending 'GET' request to: " + conn.getURL());
731
732         handleResponse(conn, opResult);
733         buildDocumentResult(opResult, indexName);
734
735         // Generate a metrics log so we can track how long the operation took.
736         metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
737                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
738                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
739                 override, indexName, document.getId());
740
741         shutdownConnection(conn);
742
743         return opResult;
744     }
745
746     @Override
747     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
748         SearchOperationResult opResult = new SearchOperationResult();
749
750         // Initialize operation result with a failure codes / fault string
751         opResult.setResultCode(500);
752         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
753
754         String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
755
756         // Grab the current time so we can use it to generate a metrics log.
757         MdcOverride override = getStartTime(new MdcOverride());
758
759         HttpURLConnection conn = initializeConnection(fullUrl);
760
761         try {
762             conn.setRequestMethod("GET");
763         } catch (ProtocolException e) {
764             shutdownConnection(conn);
765             throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
766         }
767
768         logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
769
770         handleResponse(conn, opResult);
771         buildSearchResult(opResult, indexName);
772
773
774         metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
775                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
776                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
777                 override, indexName, queryString);
778
779         return opResult;
780     }
781
782     @Override
783     public SearchOperationResult searchWithPayload(String indexName, String query)
784             throws DocumentStoreOperationException {
785         SearchOperationResult opResult = new SearchOperationResult();
786
787         if (logger.isDebugEnabled()) {
788             logger.debug("Querying index: " + indexName + " with query string: " + query);
789         }
790
791         // Initialize operation result with a failure codes / fault string
792         opResult.setResultCode(500);
793         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
794
795         String fullUrl = getFullUrl("/" + indexName + "/_search", false);
796
797         // Grab the current time so we can use it to generate a metrics log.
798         MdcOverride override = getStartTime(new MdcOverride());
799
800         HttpURLConnection conn = initializeConnection(fullUrl);
801
802         try {
803             conn.setRequestMethod("POST");
804         } catch (ProtocolException e) {
805             shutdownConnection(conn);
806             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
807         }
808
809         attachContent(conn, query);
810
811         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
812         logger.debug("Request body =  Elasticsearch query = " + query);
813
814         handleResponse(conn, opResult);
815         buildSearchResult(opResult, indexName);
816
817         metricsLogger
818                 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
819                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
820                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
821                         override, indexName, query);
822
823         shutdownConnection(conn);
824
825         return opResult;
826     }
827
828
829     @Override
830     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
831             throws DocumentStoreOperationException {
832
833         SearchOperationResult opResult = new SearchOperationResult();
834
835         if (logger.isDebugEnabled()) {
836             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
837         }
838
839         // Initialize operation result with a failure codes / fault string
840         opResult.setResultCode(500);
841         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
842
843         String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
844
845         // Grab the current time so we can use it to generate a metrics log.
846         MdcOverride override = getStartTime(new MdcOverride());
847
848         HttpURLConnection conn = initializeConnection(fullUrl);
849
850         try {
851             conn.setRequestMethod("POST");
852         } catch (ProtocolException e) {
853             shutdownConnection(conn);
854             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
855         }
856
857         attachContent(conn, query);
858
859         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
860         logger.debug("Request body =  Elasticsearch query = " + query);
861
862         handleResponse(conn, opResult);
863         buildSuggestResult(opResult, indexName);
864
865         metricsLogger
866                 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
867                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
868                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
869                         override, indexName, query);
870
871         shutdownConnection(conn);
872
873         return opResult;
874     }
875
876     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
877         OutputStream outputStream = null;
878         OutputStreamWriter out = null;
879
880         try {
881             outputStream = conn.getOutputStream();
882         } catch (IOException e) {
883             shutdownConnection(conn);
884             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
885         }
886
887         out = new OutputStreamWriter(outputStream);
888
889         try {
890             out.write(content);
891             out.close();
892         } catch (IOException e) {
893             shutdownConnection(conn);
894             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
895         }
896     }
897
898     private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
899         URL url = null;
900         HttpURLConnection conn = null;
901
902         try {
903             url = new URL(fullUrl);
904         } catch (MalformedURLException e) {
905             throw new DocumentStoreOperationException("Error building a URL with " + url, e);
906         }
907
908         try {
909             conn = (HttpURLConnection) url.openConnection();
910             conn.setRequestProperty("Content-Type", "application/json");
911             conn.setDoOutput(true);
912         } catch (IOException e) {
913             shutdownConnection(conn);
914             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
915         }
916
917         return conn;
918     }
919
920     private void handleResponse(HttpURLConnection conn, OperationResult opResult)
921             throws DocumentStoreOperationException {
922         int resultCode = 200;
923
924         try {
925             resultCode = conn.getResponseCode();
926         } catch (IOException e) {
927             shutdownConnection(conn);
928             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
929         }
930
931         logger.debug("Response Code : " + resultCode);
932
933         InputStream inputStream = null;
934
935         if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
936             inputStream = conn.getErrorStream();
937         } else {
938             try {
939                 inputStream = conn.getInputStream();
940             } catch (IOException e) {
941                 shutdownConnection(conn);
942                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
943             }
944         }
945
946         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
947         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
948
949         StringBuilder result = new StringBuilder(128);
950         String string = null;
951
952         try {
953             while ((string = bufferedreader.readLine()) != null) {
954                 result.append(string).append("\n");
955             }
956         } catch (IOException e) {
957             shutdownConnection(conn);
958             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
959         }
960
961         if (resultCode == HttpStatus.CONFLICT.value()) {
962             opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
963         } else {
964             opResult.setResultCode(resultCode);
965         }
966         if (logger.isDebugEnabled()) {
967             logger.debug("Raw result string from ElasticSearch = " + result.toString());
968         }
969         opResult.setResult(result.toString());
970         opResult.setResultVersion(extractVersion(result.toString()));
971     }
972
973     private String extractVersion(String result) {
974         JSONParser parser = new JSONParser();
975         String version = null;
976         try {
977             JSONObject root = (JSONObject) parser.parse(result);
978             if (root.get("_version") != null) {
979                 version = root.get("_version").toString();
980             }
981         } catch (ParseException e) {
982             // Not all responses from ElasticSearch include a version, so
983             // if we don't get one back, just return an empty string rather
984             // than trigger a false failure.
985             version = "";
986         }
987         return version;
988     }
989
990     /**
991      * This convenience method gets the current system time and stores it in an attribute in the supplied
992      * {@link MdcOverride} object so that it can be used later by the metrics logger.
993      *
994      * @param override - The {@link MdcOverride} object to update.
995      * @return - The supplied {@link MdcOverride} object.
996      */
997     private MdcOverride getStartTime(MdcOverride override) {
998
999         // Grab the current time...
1000         long startTimeInMs = System.currentTimeMillis();
1001
1002         // ...and add it as an attribute to the supplied MDC Override
1003         // object.
1004         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1005         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1006
1007         // Return the MdcOverride object that we were passed.
1008         // This looks odd, but it allows us to do stuff like:
1009         //
1010         // MdcOverride ov = getStartTime(new MdcOverride())
1011         //
1012         // which is quite handy, but also allows us to pass in an existing
1013         // MdcOverride object which already has some attributes set.
1014         return override;
1015     }
1016
1017     private boolean isSuccess(OperationResult result) {
1018
1019         return isSuccessCode(result.getResultCode());
1020     }
1021
1022
1023     private boolean isSuccessCode(int statusCode) {
1024         return ((statusCode >= 200) && (statusCode < 300));
1025     }
1026
1027
1028     @Override
1029     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
1030
1031         if (logger.isDebugEnabled()) {
1032             String dbgString = "ESController: performBulkOperations - Operations: ";
1033
1034             for (BulkRequest request : requests) {
1035                 dbgString += "[" + request.toString() + "] ";
1036             }
1037
1038             logger.debug(dbgString);
1039         }
1040
1041         // Grab the current time so we can use it to generate a metrics log.
1042         MdcOverride override = getStartTime(new MdcOverride());
1043
1044         // Parse the supplied set of operations.
1045         // Iterate over the list of operations which we were provided and
1046         // translate them into a format that ElasticSearh understands.
1047         int opCount = 0;
1048         StringBuilder esOperationSet = new StringBuilder(128);
1049         List<ElasticSearchResultItem> rejected = new ArrayList<>();
1050         for (BulkRequest request : requests) {
1051
1052             // Convert the request to the syntax ElasticSearch likes.
1053             if (buildEsOperation(request, esOperationSet, rejected)) {
1054                 opCount++;
1055             }
1056         }
1057
1058         ElasticSearchBulkOperationResult opResult = null;
1059         if (opCount > 0) {
1060
1061             // Open an HTTP connection to the ElasticSearch back end.
1062             String fullUrl = getFullUrl("/_bulk", false);
1063             URL url;
1064             HttpURLConnection conn;
1065             try {
1066
1067                 url = new URL(fullUrl);
1068                 conn = (HttpURLConnection) url.openConnection();
1069                 conn.setRequestMethod("PUT");
1070                 conn.setDoOutput(true);
1071                 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1072                 conn.setRequestProperty("Connection", "Close");
1073
1074             } catch (IOException e) {
1075
1076                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1077                 if (logger.isDebugEnabled()) {
1078                     logger.debug(Throwables.getStackTraceAsString(e));
1079                 }
1080
1081                 throw new DocumentStoreOperationException(
1082                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
1083             }
1084
1085             StringBuilder bulkResult = new StringBuilder(128);
1086             try {
1087                 // Create an output stream to write our request to.
1088                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1089
1090                 if (logger.isDebugEnabled()) {
1091                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1092                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1093                 }
1094
1095                 // Write the resulting request string to our output stream. (this sends the request to ES?)
1096                 out.write(esOperationSet.toString());
1097                 out.close();
1098
1099                 // Open an input stream on our connection in order to read back the results.
1100                 InputStream is = conn.getInputStream();
1101                 InputStreamReader inputstreamreader = new InputStreamReader(is);
1102                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1103
1104                 // Read the contents of the input stream into our result string...
1105                 String esResponseString = null;
1106
1107                 while ((esResponseString = bufferedreader.readLine()) != null) {
1108                     bulkResult.append(esResponseString).append("\n");
1109                 }
1110
1111             } catch (IOException e) {
1112
1113                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1114                 if (logger.isDebugEnabled()) {
1115                     StringWriter sw = new StringWriter();
1116                     e.printStackTrace(new PrintWriter(sw));
1117                     logger.debug(sw.toString());
1118                 }
1119
1120                 throw new DocumentStoreOperationException(
1121                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
1122             }
1123
1124             if (logger.isDebugEnabled()) {
1125                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1126             }
1127
1128             // ...and marshal the resulting string into a Java object.
1129             try {
1130                 opResult = marshallEsBulkResult(bulkResult.toString());
1131
1132             } catch (IOException e) {
1133
1134                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1135                 if (logger.isDebugEnabled()) {
1136                     logger.debug(Throwables.getStackTraceAsString(e));
1137                 }
1138
1139                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
1140                         e);
1141             }
1142         }
1143
1144         // Finally, build the operation result and return it to the caller.
1145         OperationResult result = new OperationResult();
1146         result.setResultCode(207);
1147         result.setResult(buildGenericBulkResultSet(opResult, rejected));
1148
1149         // In the success case we don't want the entire result string to be
1150         // dumped into the metrics log, so concatenate it.
1151         String resultStringForMetricsLog = result.getResult();
1152         if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1153             resultStringForMetricsLog =
1154                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1155         }
1156
1157         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1158                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1159                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1160                 override);
1161
1162         return result;
1163     }
1164
1165
1166     /**
1167      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1168      *
1169      * @param request - The request to be performed.
1170      * @param sb - The string builder to append the json data to
1171      * @throws DocumentStoreOperationException
1172      */
1173     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1174             throws DocumentStoreOperationException {
1175
1176         boolean retVal = true;
1177         // What kind of operation are we performing?
1178         switch (request.getOperationType()) {
1179
1180             // Create a new document.
1181             case CREATE:
1182
1183                 // Make sure that we were supplied a document payload.
1184                 if (request.getOperation().getDocument() == null) {
1185
1186                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1187                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1188                     return false;
1189                 }
1190
1191                 // Make sure that the supplied document URL is formatted
1192                 // correctly.
1193                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1194                     fails.add(generateRejectionEntry(request.getOperationType(),
1195                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1196                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1197                     return false;
1198                 }
1199
1200                 // Validate that the specified index actually exists before we
1201                 // try to perform the create.
1202                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1203
1204                     fails.add(generateRejectionEntry(request.getOperationType(),
1205                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1206                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1207                     return false;
1208                 }
1209
1210                 // If we were supplied an id for the new document, then
1211                 // include it in the bulk operation to Elastic Search
1212                 if (request.getId() == null) {
1213
1214                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1215
1216                     // Otherwise, we just leave that parameter off and ElasticSearch
1217                     // will generate one for us.
1218                 } else {
1219                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1220                             request.getId()));
1221                 }
1222
1223                 try {
1224                     // Append the document that we want to create.
1225                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1226                 } catch (JsonProcessingException e) {
1227                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1228                 }
1229
1230                 break;
1231
1232             // Update an existing document.
1233             case UPDATE:
1234
1235                 // Make sure that we were supplied a document payload.
1236                 if (request.getOperation().getDocument() == null) {
1237
1238                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1239                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1240                     return false;
1241                 }
1242
1243                 // Make sure that the supplied document URL is formatted
1244                 // correctly.
1245                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1246                     fails.add(generateRejectionEntry(request.getOperationType(),
1247                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1248                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1249                     return false;
1250                 }
1251
1252                 // Validate that the specified index actually exists before we
1253                 // try to perform the update.
1254                 if (!indexExists(request.getIndex())) {
1255
1256                     fails.add(generateRejectionEntry(request.getOperationType(),
1257                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1258                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1259                     return false;
1260                 }
1261
1262                 // Validate that the document we are trying to update actually
1263                 // exists before we try to perform the update.
1264                 if (!documentExists(request.getIndex(), request.getId())) {
1265
1266                     fails.add(generateRejectionEntry(request.getOperationType(),
1267                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1268                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1269                     return false;
1270                 }
1271
1272                 // It is mandatory that a version be supplied for an update operation,
1273                 // so validate that now.
1274                 if (request.getOperation().getMetaData().getEtag() == null) {
1275
1276                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1277                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1278                     return false;
1279                 }
1280
1281                 // Generate the update request...
1282                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1283                         request.getOperation().getMetaData().getEtag()));
1284
1285                 // ...and append the document that we want to update.
1286                 try {
1287                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1288                 } catch (JsonProcessingException e) {
1289                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1290                 }
1291                 break;
1292
1293             // Delete an existing document.
1294             case DELETE:
1295
1296                 // Make sure that the supplied document URL is formatted
1297                 // correctly.
1298                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1299                     fails.add(generateRejectionEntry(request.getOperationType(),
1300                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1301                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1302                     return false;
1303                 }
1304
1305                 // Validate that the specified index actually exists before we
1306                 // try to perform the delete.
1307                 if (!indexExists(request.getIndex())) {
1308
1309                     fails.add(generateRejectionEntry(request.getOperationType(),
1310                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1311                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1312                     return false;
1313                 }
1314
1315                 // Validate that the document we are trying to update actually
1316                 // exists before we try to perform the delete.
1317                 if (!documentExists(request.getIndex(), request.getId())) {
1318
1319                     fails.add(generateRejectionEntry(request.getOperationType(),
1320                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1321                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1322                     return false;
1323                 }
1324
1325                 // It is mandatory that a version be supplied for a delete operation,
1326                 // so validate that now.
1327                 if (request.getOperation().getMetaData().getEtag() == null) {
1328
1329                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1330                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1331                     return false;
1332                 }
1333
1334                 // Generate the delete request.
1335                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1336                         request.getOperation().getMetaData().getEtag()));
1337                 break;
1338             default:
1339         }
1340
1341         return retVal;
1342     }
1343
1344     private boolean indexExists(String index) throws DocumentStoreOperationException {
1345
1346         OperationResult indexExistsResult = checkIndexExistence(index);
1347
1348         return ((indexExistsResult.getResultCode() >= 200) && (indexExistsResult.getResultCode() < 300));
1349     }
1350
1351     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1352
1353         OperationResult docExistsResult = checkDocumentExistence(index, id);
1354
1355         return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1356     }
1357
1358     /**
1359      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1360      * document store.
1361      *
1362      * @param rejectReason - A message describing why the operation was rejected.
1363      * @param anId - The identifier associated with the document being acted on.
1364      * @param statusCode - An HTTP status code.
1365      * @return - A result set item.
1366      */
1367     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1368             String anId, int statusCode, String originalUrl) {
1369
1370         ElasticSearchError err = new ElasticSearchError();
1371         err.setReason(rejectReason);
1372
1373         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1374         op.setIndex(index);
1375         op.setId(anId);
1376         op.setStatus(statusCode);
1377         op.setError(err);
1378         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1379
1380         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1381
1382         switch (opType) {
1383             case CREATE:
1384                 rejectionResult.setCreate(op);
1385                 break;
1386             case UPDATE:
1387                 rejectionResult.setIndex(op);
1388                 break;
1389             case DELETE:
1390                 rejectionResult.setDelete(op);
1391                 break;
1392             default:
1393         }
1394
1395         return rejectionResult;
1396     }
1397
1398     /**
1399      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1400      * marshals it into a Java object.
1401      *
1402      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1403      * @return - The marshalled response.
1404      * @throws JsonParseException
1405      * @throws JsonMappingException
1406      * @throws IOException
1407      */
1408     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1409         if (jsonResult != null) {
1410             if (logger.isDebugEnabled()) {
1411                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1412             }
1413
1414             ObjectMapper mapper = new ObjectMapper();
1415             mapper.setSerializationInclusion(Include.NON_EMPTY);
1416
1417             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1418         }
1419
1420         return null;
1421     }
1422
1423     /**
1424      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1425      *
1426      * @param esResult - ElasticSearch bulk operations response.
1427      * @return - A generic result set.
1428      */
1429     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1430             List<ElasticSearchResultItem> rejectedOps) {
1431         int totalOps = 0;
1432         int totalSuccess = 0;
1433         int totalFails = 0;
1434
1435         if (logger.isDebugEnabled()) {
1436
1437             logger.debug("ESController: Build generic result set.  ES Results: "
1438                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1439         }
1440
1441         // Build a combined list of result items from the results returned
1442         // from ElasticSearch and the list of operations that we rejected
1443         // without sending to ElasticSearch.
1444         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1445         if (esResult != null) {
1446             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1447         }
1448         combinedResults.addAll(rejectedOps);
1449
1450         // Iterate over the individual results in the resulting result set.
1451         StringBuilder resultsBuilder = new StringBuilder();
1452         AtomicBoolean firstItem = new AtomicBoolean(true);
1453         for (ElasticSearchResultItem item : combinedResults) {
1454
1455             // Increment the operation counts.
1456             totalOps++;
1457             if (isSuccessCode(item.operationStatus().getStatus())) {
1458                 totalSuccess++;
1459             } else {
1460                 totalFails++;
1461             }
1462
1463             // Prepend a comma to our response string unless this it the
1464             // first result in the set.
1465             if (!firstItem.compareAndSet(true, false)) {
1466                 resultsBuilder.append(", ");
1467             }
1468
1469             // Append the current result as a generic json structure.
1470             resultsBuilder.append(item.toJson());
1471         }
1472
1473         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1474                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1475     }
1476
1477
1478     /**
1479      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1480      *
1481      * @param indexName - The index to look for.
1482      * @return - An operation result indicating the success or failure of the check.
1483      * @throws DocumentStoreOperationException
1484      */
1485     public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1486
1487         // Initialize operation result with a failure codes / fault string
1488         OperationResult opResult = new OperationResult();
1489         opResult.setResultCode(500);
1490
1491         // Grab the current time so we can use it to generate a metrics log.
1492         MdcOverride override = getStartTime(new MdcOverride());
1493
1494         String fullUrl = getFullUrl("/" + indexName, false);
1495         HttpURLConnection conn = initializeConnection(fullUrl);
1496
1497         try {
1498             conn.setRequestMethod("HEAD");
1499
1500         } catch (ProtocolException e) {
1501             shutdownConnection(conn);
1502             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1503         }
1504
1505         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1506
1507         int resultCode;
1508         try {
1509             resultCode = conn.getResponseCode();
1510         } catch (IOException e) {
1511             shutdownConnection(conn);
1512             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1513         }
1514         logger.debug("Response Code : " + resultCode);
1515
1516         opResult.setResultCode(resultCode);
1517
1518         // Generate a metrics log so we can track how long the operation took.
1519         metricsLogger
1520                 .info(SearchDbMsgs.CHECK_INDEX_TIME,
1521                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1522                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1523                         override, indexName);
1524
1525         shutdownConnection(conn);
1526
1527         return opResult;
1528     }
1529
1530
1531     private void buildDocumentResult(DocumentOperationResult result, String index)
1532             throws DocumentStoreOperationException {
1533
1534         JSONParser parser = new JSONParser();
1535         JSONObject root;
1536         try {
1537             root = (JSONObject) parser.parse(result.getResult());
1538
1539             if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1540                 // Success response object
1541                 Document doc = new Document();
1542                 doc.setEtag(result.getResultVersion());
1543                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1544
1545                 doc.setContent((JSONObject) root.get("_source"));
1546                 result.setDocument(doc);
1547
1548             } else {
1549                 // Error response object
1550                 JSONObject error = (JSONObject) root.get("error");
1551                 if (error != null) {
1552                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1553                 }
1554
1555             }
1556         } catch (Exception e) {
1557             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1558         }
1559
1560
1561     }
1562
1563     private String buildDocumentResponseUrl(String index, String id) {
1564         return ApiUtils.buildDocumentUri(index, id);
1565     }
1566
1567     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1568
1569         JSONParser parser = new JSONParser();
1570         JSONObject root;
1571
1572         try {
1573             root = (JSONObject) parser.parse(result.getResult());
1574             if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1575                 JSONObject hits = (JSONObject) root.get("hits");
1576                 JSONArray hitArray = (JSONArray) hits.get("hits");
1577                 SearchHits searchHits = new SearchHits();
1578                 searchHits.setTotalHits(hits.get("total").toString());
1579                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1580
1581                 for (int i = 0; i < hitArray.size(); i++) {
1582                     JSONObject hit = (JSONObject) hitArray.get(i);
1583                     SearchHit searchHit = new SearchHit();
1584                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1585                     Document doc = new Document();
1586                     if (hit.get("_version") != null) {
1587                         doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1588                     }
1589
1590                     doc.setUrl(
1591                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1592                     doc.setContent((JSONObject) hit.get("_source"));
1593                     searchHit.setDocument(doc);
1594                     searchHitArray.add(searchHit);
1595                 }
1596                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1597                 result.setSearchResult(searchHits);
1598
1599                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1600                 if (aggregations != null) {
1601                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1602                     AggregationResults aggs = new AggregationResults();
1603                     aggs.setAggregations(aggResults);
1604                     result.setAggregationResult(aggs);
1605                 }
1606
1607                 // success
1608             } else {
1609                 JSONObject error = (JSONObject) root.get("error");
1610                 if (error != null) {
1611                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1612                 }
1613             }
1614         } catch (Exception e) {
1615             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1616         }
1617
1618     }
1619
1620     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1621         JSONParser parser = new JSONParser();
1622         JSONObject root;
1623         try {
1624             root = (JSONObject) parser.parse(result.getResult());
1625             if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1626                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1627                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1628                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1629                 SuggestHits suggestHits = new SuggestHits();
1630                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1631
1632                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1633
1634                 for (int i = 0; i < optionsArray.size(); i++) {
1635                     JSONObject hit = (JSONObject) optionsArray.get(i);
1636
1637                     SuggestHit suggestHit = new SuggestHit();
1638                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1639                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1640                     Document doc = new Document();
1641                     if (hit.get("_version") != null) {
1642                         doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1643                     }
1644                     doc.setUrl(
1645                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1646
1647                     doc.setContent((JSONObject) hit.get("payload"));
1648                     suggestHit.setDocument(doc);
1649                     suggestHitArray.add(suggestHit);
1650                 }
1651                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1652                 result.setSuggestResult(suggestHits);
1653
1654                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1655                 if (aggregations != null) {
1656                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1657                     AggregationResults aggs = new AggregationResults();
1658                     aggs.setAggregations(aggResults);
1659                     result.setAggregationResult(aggs);
1660                 }
1661
1662                 // success
1663             } else {
1664                 JSONObject error = (JSONObject) root.get("error");
1665                 if (error != null) {
1666                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1667                 }
1668             }
1669         } catch (Exception e) {
1670             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1671         }
1672     }
1673 }