841f4775ec67003ae3af9629a032aab3558b671b
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import com.fasterxml.jackson.annotation.JsonInclude.Include;
25 import com.fasterxml.jackson.core.JsonParseException;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonMappingException;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import com.google.common.base.Throwables;
30 import java.io.BufferedReader;
31 import java.io.File;
32 import java.io.FileInputStream;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.InputStreamReader;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.io.PrintWriter;
39 import java.io.StringWriter;
40 import java.net.HttpURLConnection;
41 import java.net.MalformedURLException;
42 import java.net.ProtocolException;
43 import java.net.URL;
44 import java.text.SimpleDateFormat;
45 import java.util.ArrayList;
46 import java.util.Arrays;
47 import java.util.List;
48 import java.util.Properties;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import org.json.simple.JSONArray;
51 import org.json.simple.JSONObject;
52 import org.json.simple.parser.JSONParser;
53 import org.json.simple.parser.ParseException;
54 import org.onap.aai.cl.api.LogFields;
55 import org.onap.aai.cl.api.LogLine;
56 import org.onap.aai.cl.api.Logger;
57 import org.onap.aai.cl.eelf.LoggerFactory;
58 import org.onap.aai.cl.mdc.MdcContext;
59 import org.onap.aai.cl.mdc.MdcOverride;
60 import org.onap.aai.sa.rest.AnalysisConfiguration;
61 import org.onap.aai.sa.rest.ApiUtils;
62 import org.onap.aai.sa.rest.BulkRequest;
63 import org.onap.aai.sa.rest.BulkRequest.OperationType;
64 import org.onap.aai.sa.rest.DocumentSchema;
65 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
66 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
67 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
68 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
69 import org.onap.aai.sa.searchdbabstraction.entity.Document;
70 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
71 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
72 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
73 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
74 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
75 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
76 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
77 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
78 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
79 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
80 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
81 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
82 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
83 import org.springframework.http.HttpStatus;
84
85 /**
86  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
87  */
88 public class ElasticSearchHttpController implements DocumentStoreInterface {
89
90     private static ElasticSearchHttpController instance = null;
91
92     private static final Logger logger =
93             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
94     private static final Logger metricsLogger =
95             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
96
97     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
98             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
99     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
100             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
101     private static final String BULK_IMPORT_INDEX_TEMPLATE =
102             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
103     private static final String BULK_DELETE_TEMPLATE =
104             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
105
106     private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
107             "Internal Error: ElasticSearch operation fault occurred";
108     private final ElasticSearchConfig config;
109
110     private static final String DEFAULT_TYPE = "default";
111
112     protected AnalysisConfiguration analysisConfig;
113
114     public static ElasticSearchHttpController getInstance() {
115
116         synchronized (ElasticSearchHttpController.class) {
117
118             if (instance == null) {
119
120                 Properties properties = new Properties();
121                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
122                 try {
123                     properties.load(new FileInputStream(file));
124                 } catch (Exception e) {
125                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
126                             e.getLocalizedMessage());
127                 }
128
129                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
130                 instance = new ElasticSearchHttpController(config);
131             }
132         }
133
134         return instance;
135     }
136
137     public ElasticSearchHttpController(ElasticSearchConfig config) {
138         this.config = config;
139         analysisConfig = new AnalysisConfiguration();
140
141         try {
142             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
143             checkConnection();
144             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
145         } catch (Exception e) {
146             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
147                     e.getMessage());
148         }
149     }
150
151
152     public AnalysisConfiguration getAnalysisConfig() {
153         return analysisConfig;
154     }
155
156     @Override
157     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
158
159         OperationResult result = new OperationResult();
160         result.setResultCode(500);
161
162         try {
163
164             // Submit the request to ElasticSearch to create the index using a
165             // default document type.
166             result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
167                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
168
169             // ElasticSearch will return us a 200 code on success when we
170             // want to report a 201, so translate the result here.
171             result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
172             if (isSuccess(result)) {
173                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
174             }
175
176         } catch (DocumentStoreOperationException | IOException e) {
177
178             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
179         }
180
181         return result;
182     }
183
184     @Override
185     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
186         OperationResult result = new OperationResult();
187         result.setResultCode(500);
188
189         try {
190             result = createTable(index, dynamicSchema);
191
192             // ElasticSearch will return us a 200 code on success when we
193             // want to report a 201, so translate the result here.
194             result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode());
195             if (isSuccess(result)) {
196                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
197             }
198         } catch (DocumentStoreOperationException e) {
199             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
200         }
201
202         return result;
203     }
204
205
206     @Override
207     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
208
209         // Initialize operation result with a failure codes / fault string
210         OperationResult opResult = new OperationResult();
211         opResult.setResultCode(500);
212         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
213
214         // Grab the current time so we can use it to generate a metrics log.
215         MdcOverride override = getStartTime(new MdcOverride());
216
217         String fullUrl = getFullUrl("/" + indexName + "/", false);
218         HttpURLConnection conn = initializeConnection(fullUrl);
219
220         logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
221
222         try {
223             conn.setRequestMethod("DELETE");
224         } catch (ProtocolException e) {
225             shutdownConnection(conn);
226             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
227         }
228
229         handleResponse(conn, opResult);
230
231         // Generate a metrics log so we can track how long the operation took.
232         metricsLogger
233                 .info(SearchDbMsgs.DELETE_INDEX_TIME,
234                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
235                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
236                         override, indexName);
237
238         shutdownConnection(conn);
239
240         return opResult;
241     }
242
243
244     private OperationResult checkConnection() throws Exception {
245
246         String fullUrl = getFullUrl("/_cluster/health", false);
247         URL url = null;
248         HttpURLConnection conn = null;
249
250         url = new URL(fullUrl);
251         conn = (HttpURLConnection) url.openConnection();
252         conn.setRequestMethod("GET");
253         conn.setDoOutput(true);
254         logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
255
256         int resultCode = conn.getResponseCode();
257         logger.debug("getClusterHealth() response Code : " + resultCode);
258         OperationResult opResult = new OperationResult();
259         opResult.setResultCode(resultCode);
260
261         shutdownConnection(conn);
262
263         return opResult;
264     }
265
266     private String getFullUrl(String resourceUrl, boolean isSecure) {
267
268         final String host = config.getIpAddress();
269         final String port = config.getHttpPort();
270
271         if (isSecure) {
272             return String.format("https://%s:%s%s", host, port, resourceUrl);
273         } else {
274             return String.format("http://%s:%s%s", host, port, resourceUrl);
275         }
276     }
277
278     private void shutdownConnection(HttpURLConnection connection) {
279         if (connection == null) {
280             return;
281         }
282
283         InputStream inputstream = null;
284         OutputStream outputstream = null;
285
286         try {
287             inputstream = connection.getInputStream();
288         } catch (IOException e) {
289             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
290         } finally {
291             if (inputstream != null) {
292                 try {
293                     inputstream.close();
294                 } catch (IOException e) {
295                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
296                             e.getLocalizedMessage());
297                 }
298             }
299         }
300
301         try {
302             outputstream = connection.getOutputStream();
303         } catch (IOException e) {
304             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
305         } finally {
306             if (outputstream != null) {
307                 try {
308                     outputstream.close();
309                 } catch (IOException e) {
310                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
311                             e.getLocalizedMessage());
312                 }
313             }
314         }
315
316         connection.disconnect();
317     }
318
319     // @Override
320     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
321             throws DocumentStoreOperationException {
322
323         if (indexSettings == null) {
324             logger.debug("No settings provided.");
325         }
326
327         if (indexMappings == null) {
328             logger.debug("No mappings provided.");
329         }
330
331         OperationResult opResult = new OperationResult();
332
333         // Initialize operation result with a failure codes / fault string
334         opResult.setResultCode(500);
335         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
336
337         // Grab the current time so we can use it to generate a metrics log.
338         MdcOverride override = getStartTime(new MdcOverride());
339
340         String fullUrl = getFullUrl("/" + indexName + "/", false);
341         HttpURLConnection conn = initializeConnection(fullUrl);
342
343         try {
344             conn.setRequestMethod("PUT");
345             conn.setRequestProperty("Content-Type", "application/json");
346         } catch (ProtocolException e) {
347             shutdownConnection(conn);
348             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
349         }
350
351         StringBuilder sb = new StringBuilder(128);
352         sb.append("{ \"settings\" : ");
353         sb.append(indexSettings);
354         sb.append(",");
355
356         sb.append("\"mappings\" : {");
357         sb.append("\"" + typeName + "\" :");
358         sb.append(indexMappings);
359         sb.append("}}");
360
361         try {
362             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
363         } catch (IOException e) {
364             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
365             throw new DocumentStoreOperationException(e.getMessage(), e);
366         }
367
368         logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
369         logger.debug("Request content: " + sb.toString());
370
371         handleResponse(conn, opResult);
372
373         shutdownConnection(conn);
374
375         // Generate a metrics log so we can track how long the operation took.
376         metricsLogger
377                 .info(SearchDbMsgs.CREATE_INDEX_TIME,
378                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
379                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
380                         override, indexName);
381
382         return opResult;
383     }
384
385     /**
386      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
387      * index.
388      *
389      * @param indexName - The name of the index to be created
390      * @param settingsAndMappings - The actual JSON object that will define the index
391      * @return - The operation result of writing into Elasticsearch
392      * @throws DocumentStoreOperationException
393      */
394     protected OperationResult createTable(String indexName, String settingsAndMappings)
395             throws DocumentStoreOperationException {
396         OperationResult result = new OperationResult();
397         result.setResultCode(500);
398         result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
399
400         // Grab the current time so we can use it to generate a metrics log.
401         MdcOverride override = getStartTime(new MdcOverride());
402
403         String fullUrl = getFullUrl("/" + indexName + "/", false);
404         HttpURLConnection conn = initializeConnection(fullUrl);
405
406         try {
407             conn.setRequestMethod("PUT");
408             conn.setRequestProperty("Content-Type", "application/json");
409         } catch (ProtocolException e) {
410             shutdownConnection(conn);
411             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
412         }
413
414         try {
415             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
416         } catch (IOException e) {
417             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
418             throw new DocumentStoreOperationException(e.getMessage());
419         }
420         handleResponse(conn, result);
421
422         // Generate a metrics log so we can track how long the operation took.
423         metricsLogger
424                 .info(SearchDbMsgs.CREATE_INDEX_TIME,
425                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
426                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
427                         override, indexName);
428
429         return result;
430     }
431
432     @Override
433     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
434             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
435
436         if (!allowImplicitIndexCreation) {
437
438             // Before we do anything, make sure that the specified index actually exists in the
439             // document store - we don't want to rely on ElasticSearch to fail the document
440             // create because it could be configured to implicitly create a non-existent index,
441             // which can lead to hard-to-debug behaviour with queries down the road.
442             OperationResult indexExistsResult = checkIndexExistence(indexName);
443             if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
444
445                 DocumentOperationResult opResult = new DocumentOperationResult();
446                 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
447                 opResult.setResult("Document Index '" + indexName + "' does not exist.");
448                 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
449                 return opResult;
450             }
451         }
452
453         if (document.getId() == null || document.getId().isEmpty()) {
454             return createDocumentWithoutId(indexName, document);
455         } else {
456             return createDocumentWithId(indexName, document);
457         }
458     }
459
460     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
461             throws DocumentStoreOperationException {
462         // check if the document already exists
463         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
464
465
466         if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) {
467             if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) {
468                 opResult.setFailureCause("A document with the same id already exists.");
469             } else {
470                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
471             }
472             opResult.setResultCode(HttpStatus.CONFLICT.value());
473             return opResult;
474         }
475
476         opResult = new DocumentOperationResult();
477         // Initialize operation result with a failure codes / fault string
478         opResult.setResultCode(500);
479         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
480
481         // Grab the current time so we can use it to generate a metrics log.
482         MdcOverride override = getStartTime(new MdcOverride());
483
484         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
485         HttpURLConnection conn = initializeConnection(fullUrl);
486
487         try {
488             conn.setRequestMethod("PUT");
489         } catch (ProtocolException e) {
490             shutdownConnection(conn);
491             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
492         }
493
494         attachDocument(conn, document);
495
496         logger.debug("Sending 'PUT' request to: " + conn.getURL());
497
498         handleResponse(conn, opResult);
499         buildDocumentResult(opResult, indexName);
500
501         // Generate a metrics log so we can track how long the operation took.
502         metricsLogger
503                 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
504                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
505                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
506                         override, indexName);
507
508         shutdownConnection(conn);
509
510         return opResult;
511
512     }
513
514     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
515             throws DocumentStoreOperationException {
516
517         DocumentOperationResult response = new DocumentOperationResult();
518         // Initialize operation result with a failure codes / fault string
519         response.setResultCode(500);
520         response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
521
522         // Grab the current time so we can use it to generate a metrics log.
523         MdcOverride override = getStartTime(new MdcOverride());
524
525         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
526         HttpURLConnection conn = initializeConnection(fullUrl);
527
528         try {
529             conn.setRequestMethod("POST");
530         } catch (ProtocolException e) {
531             shutdownConnection(conn);
532             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
533         }
534
535         attachDocument(conn, document);
536
537         logger.debug("Sending 'POST' request to: " + conn.getURL());
538
539         handleResponse(conn, response);
540         buildDocumentResult(response, indexName);
541
542         // Generate a metrics log so we can track how long the operation took.
543         metricsLogger
544                 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
545                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
546                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
547                         override, indexName);
548
549         shutdownConnection(conn);
550
551         return response;
552     }
553
554     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
555             throws DocumentStoreOperationException {
556         conn.setRequestProperty("Content-Type", "application/json");
557         conn.setRequestProperty("Connection", "Close");
558         attachContent(conn, doc.getContentInJson());
559     }
560
561     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
562             throws DocumentStoreOperationException {
563         DocumentOperationResult opResult = new DocumentOperationResult();
564
565         // Initialize operation result with a failure codes / fault string
566         opResult.setResultCode(500);
567
568         // Grab the current time so we can use it to generate a metrics log.
569         MdcOverride override = getStartTime(new MdcOverride());
570
571         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
572         HttpURLConnection conn = initializeConnection(fullUrl);
573
574         try {
575             conn.setRequestMethod("HEAD");
576         } catch (ProtocolException e) {
577             shutdownConnection(conn);
578             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
579         }
580
581         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
582
583         int resultCode;
584         try {
585             resultCode = conn.getResponseCode();
586         } catch (IOException e) {
587             shutdownConnection(conn);
588             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
589         }
590
591         logger.debug("Response Code : " + resultCode);
592
593         opResult.setResultCode(resultCode);
594
595         // Generate a metrics log so we can track how long the operation took.
596         metricsLogger
597                 .info(SearchDbMsgs.GET_DOCUMENT_TIME,
598                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
599                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
600                         override, indexName, docId);
601
602         shutdownConnection(conn);
603
604         return opResult;
605     }
606
607     @Override
608     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
609             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
610
611         if (!allowImplicitIndexCreation) {
612             // Before we do anything, make sure that the specified index actually exists in the
613             // document store - we don't want to rely on ElasticSearch to fail the document
614             // create because it could be configured to implicitly create a non-existent index,
615             // which can lead to hard-to-debug behaviour with queries down the road.
616             OperationResult indexExistsResult = checkIndexExistence(indexName);
617             if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) {
618
619                 DocumentOperationResult opResult = new DocumentOperationResult();
620                 opResult.setResultCode(HttpStatus.NOT_FOUND.value());
621                 opResult.setResult("Document Index '" + indexName + "' does not exist.");
622                 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
623                 return opResult;
624             }
625         }
626
627         DocumentOperationResult opResult = new DocumentOperationResult();
628
629         // Initialize operation result with a failure codes / fault string
630         opResult.setResultCode(500);
631         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
632
633         // Grab the current time so we can use it to generate a metrics log.
634         MdcOverride override = getStartTime(new MdcOverride());
635
636         String fullUrl = getFullUrl(
637                 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
638                 false);
639         HttpURLConnection conn = initializeConnection(fullUrl);
640
641         try {
642             conn.setRequestMethod("PUT");
643             conn.setRequestProperty("Content-Type", "application/json");
644         } catch (ProtocolException e) {
645             shutdownConnection(conn);
646             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
647         }
648
649         attachDocument(conn, document);
650
651         logger.debug("Sending 'PUT' request to: " + conn.getURL());
652
653         handleResponse(conn, opResult);
654         buildDocumentResult(opResult, indexName);
655
656         // Generate a metrics log so we can track how long the operation took.
657         metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
658                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
659                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
660                 override, indexName, document.getId());
661
662         shutdownConnection(conn);
663
664         return opResult;
665     }
666
667     @Override
668     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
669             throws DocumentStoreOperationException {
670         DocumentOperationResult opResult = new DocumentOperationResult();
671
672         // Initialize operation result with a failure codes / fault string
673         opResult.setResultCode(500);
674         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
675
676         // Grab the current time so we can use it to generate a metrics log.
677         MdcOverride override = getStartTime(new MdcOverride());
678
679         String fullUrl = getFullUrl(
680                 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
681                 false);
682         HttpURLConnection conn = initializeConnection(fullUrl);
683
684         try {
685             conn.setRequestMethod("DELETE");
686         } catch (ProtocolException e) {
687             shutdownConnection(conn);
688             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
689         }
690
691         logger.debug("\nSending 'DELETE' request to " + conn.getURL());
692
693         handleResponse(conn, opResult);
694         buildDocumentResult(opResult, indexName);
695         // supress the etag and url in response for delete as they are not required
696         if (opResult.getDocument() != null) {
697             opResult.getDocument().setEtag(null);
698             opResult.getDocument().setUrl(null);
699         }
700
701         // Generate a metrics log so we can track how long the operation took.
702         metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
703                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
704                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
705                 override, indexName, document.getId());
706
707         shutdownConnection(conn);
708
709         return opResult;
710     }
711
712     @Override
713     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
714             throws DocumentStoreOperationException {
715         DocumentOperationResult opResult = new DocumentOperationResult();
716
717         // Initialize operation result with a failure codes / fault string
718         opResult.setResultCode(500);
719         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
720
721         // Grab the current time so we can use it to generate a metrics log.
722         MdcOverride override = getStartTime(new MdcOverride());
723
724         String fullUrl = null;
725         if (document.getVersion() == null) {
726             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
727         } else {
728             fullUrl = getFullUrl(
729                     "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
730                     false);
731         }
732         HttpURLConnection conn = initializeConnection(fullUrl);
733
734         logger.debug("\nSending 'GET' request to: " + conn.getURL());
735
736         handleResponse(conn, opResult);
737         buildDocumentResult(opResult, indexName);
738
739         // Generate a metrics log so we can track how long the operation took.
740         metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
741                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
742                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
743                 override, indexName, document.getId());
744
745         shutdownConnection(conn);
746
747         return opResult;
748     }
749
750     @Override
751     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
752         SearchOperationResult opResult = new SearchOperationResult();
753
754         // Initialize operation result with a failure codes / fault string
755         opResult.setResultCode(500);
756         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
757
758         String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
759
760         // Grab the current time so we can use it to generate a metrics log.
761         MdcOverride override = getStartTime(new MdcOverride());
762
763         HttpURLConnection conn = initializeConnection(fullUrl);
764
765         try {
766             conn.setRequestMethod("GET");
767         } catch (ProtocolException e) {
768             shutdownConnection(conn);
769             throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
770         }
771
772         logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
773
774         handleResponse(conn, opResult);
775         buildSearchResult(opResult, indexName);
776
777
778         metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
779                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
780                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
781                 override, indexName, queryString);
782
783         return opResult;
784     }
785
786     @Override
787     public SearchOperationResult searchWithPayload(String indexName, String query)
788             throws DocumentStoreOperationException {
789         SearchOperationResult opResult = new SearchOperationResult();
790
791         if (logger.isDebugEnabled()) {
792             logger.debug("Querying index: " + indexName + " with query string: " + query);
793         }
794
795         // Initialize operation result with a failure codes / fault string
796         opResult.setResultCode(500);
797         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
798
799         String fullUrl = getFullUrl("/" + indexName + "/_search", false);
800
801         // Grab the current time so we can use it to generate a metrics log.
802         MdcOverride override = getStartTime(new MdcOverride());
803
804         HttpURLConnection conn = initializeConnection(fullUrl);
805
806         try {
807             conn.setRequestMethod("POST");
808             conn.setRequestProperty("Content-Type", "application/json");
809         } catch (ProtocolException e) {
810             shutdownConnection(conn);
811             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
812         }
813
814         attachContent(conn, query);
815
816         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
817         logger.debug("Request body =  Elasticsearch query = " + query);
818
819         handleResponse(conn, opResult);
820         buildSearchResult(opResult, indexName);
821
822         metricsLogger
823                 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
824                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
825                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
826                         override, indexName, query);
827
828         shutdownConnection(conn);
829
830         return opResult;
831     }
832
833
834     @Override
835     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
836             throws DocumentStoreOperationException {
837
838         SearchOperationResult opResult = new SearchOperationResult();
839
840         if (logger.isDebugEnabled()) {
841             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
842         }
843
844         // Initialize operation result with a failure codes / fault string
845         opResult.setResultCode(500);
846         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
847
848         String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
849
850         // Grab the current time so we can use it to generate a metrics log.
851         MdcOverride override = getStartTime(new MdcOverride());
852
853         HttpURLConnection conn = initializeConnection(fullUrl);
854
855         try {
856             conn.setRequestMethod("POST");
857             conn.setRequestProperty("Content-Type", "application/json");
858         } catch (ProtocolException e) {
859             shutdownConnection(conn);
860             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
861         }
862
863         attachContent(conn, query);
864
865         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
866         logger.debug("Request body =  Elasticsearch query = " + query);
867
868         handleResponse(conn, opResult);
869         buildSuggestResult(opResult, indexName);
870
871         metricsLogger
872                 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
873                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
874                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
875                         override, indexName, query);
876
877         shutdownConnection(conn);
878
879         return opResult;
880     }
881
882     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
883         OutputStream outputStream = null;
884         OutputStreamWriter out = null;
885
886         try {
887             outputStream = conn.getOutputStream();
888         } catch (IOException e) {
889             shutdownConnection(conn);
890             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
891         }
892
893         out = new OutputStreamWriter(outputStream);
894
895         try {
896             out.write(content);
897             out.close();
898         } catch (IOException e) {
899             shutdownConnection(conn);
900             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
901         }
902     }
903
904     private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
905         URL url = null;
906         HttpURLConnection conn = null;
907
908         try {
909             url = new URL(fullUrl);
910         } catch (MalformedURLException e) {
911             throw new DocumentStoreOperationException("Error building a URL with " + url, e);
912         }
913
914         try {
915             conn = (HttpURLConnection) url.openConnection();
916             conn.setDoOutput(true);
917         } catch (IOException e) {
918             shutdownConnection(conn);
919             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
920         }
921
922         return conn;
923     }
924
925     private void handleResponse(HttpURLConnection conn, OperationResult opResult)
926             throws DocumentStoreOperationException {
927         int resultCode = 200;
928
929         try {
930             resultCode = conn.getResponseCode();
931         } catch (IOException e) {
932             shutdownConnection(conn);
933             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
934         }
935
936         logger.debug("Response Code : " + resultCode);
937
938         InputStream inputStream = null;
939
940         if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success
941             inputStream = conn.getErrorStream();
942         } else {
943             try {
944                 inputStream = conn.getInputStream();
945             } catch (IOException e) {
946                 shutdownConnection(conn);
947                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
948             }
949         }
950
951         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
952         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
953
954         StringBuilder result = new StringBuilder(128);
955         String string = null;
956
957         try {
958             while ((string = bufferedreader.readLine()) != null) {
959                 result.append(string).append("\n");
960             }
961         } catch (IOException e) {
962             shutdownConnection(conn);
963             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
964         }
965
966         if (resultCode == HttpStatus.CONFLICT.value()) {
967             opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value());
968         } else {
969             opResult.setResultCode(resultCode);
970         }
971         if (logger.isDebugEnabled()) {
972             logger.debug("Raw result string from ElasticSearch = " + result.toString());
973         }
974         opResult.setResult(result.toString());
975         opResult.setResultVersion(extractVersion(result.toString()));
976     }
977
978     private String extractVersion(String result) {
979         JSONParser parser = new JSONParser();
980         String version = null;
981         try {
982             JSONObject root = (JSONObject) parser.parse(result);
983             if (root.get("_version") != null) {
984                 version = root.get("_version").toString();
985             }
986         } catch (ParseException e) {
987             // Not all responses from ElasticSearch include a version, so
988             // if we don't get one back, just return an empty string rather
989             // than trigger a false failure.
990             version = "";
991         }
992         return version;
993     }
994
995     /**
996      * This convenience method gets the current system time and stores it in an attribute in the supplied
997      * {@link MdcOverride} object so that it can be used later by the metrics logger.
998      *
999      * @param override - The {@link MdcOverride} object to update.
1000      * @return - The supplied {@link MdcOverride} object.
1001      */
1002     private MdcOverride getStartTime(MdcOverride override) {
1003
1004         // Grab the current time...
1005         long startTimeInMs = System.currentTimeMillis();
1006
1007         // ...and add it as an attribute to the supplied MDC Override
1008         // object.
1009         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1010         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1011
1012         // Return the MdcOverride object that we were passed.
1013         // This looks odd, but it allows us to do stuff like:
1014         //
1015         // MdcOverride ov = getStartTime(new MdcOverride())
1016         //
1017         // which is quite handy, but also allows us to pass in an existing
1018         // MdcOverride object which already has some attributes set.
1019         return override;
1020     }
1021
1022     private boolean isSuccess(OperationResult result) {
1023
1024         return isSuccessCode(result.getResultCode());
1025     }
1026
1027
1028     private boolean isSuccessCode(int statusCode) {
1029         return ((statusCode >= 200) && (statusCode < 300));
1030     }
1031
1032
1033     @Override
1034     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
1035
1036         if (logger.isDebugEnabled()) {
1037             String dbgString = "ESController: performBulkOperations - Operations: ";
1038
1039             for (BulkRequest request : requests) {
1040                 dbgString += "[" + request.toString() + "] ";
1041             }
1042
1043             logger.debug(dbgString);
1044         }
1045
1046         // Grab the current time so we can use it to generate a metrics log.
1047         MdcOverride override = getStartTime(new MdcOverride());
1048
1049         // Parse the supplied set of operations.
1050         // Iterate over the list of operations which we were provided and
1051         // translate them into a format that ElasticSearh understands.
1052         int opCount = 0;
1053         StringBuilder esOperationSet = new StringBuilder(128);
1054         List<ElasticSearchResultItem> rejected = new ArrayList<>();
1055         for (BulkRequest request : requests) {
1056
1057             // Convert the request to the syntax ElasticSearch likes.
1058             if (buildEsOperation(request, esOperationSet, rejected)) {
1059                 opCount++;
1060             }
1061         }
1062
1063         ElasticSearchBulkOperationResult opResult = null;
1064         if (opCount > 0) {
1065
1066             // Open an HTTP connection to the ElasticSearch back end.
1067             String fullUrl = getFullUrl("/_bulk", false);
1068             URL url;
1069             HttpURLConnection conn;
1070             try {
1071
1072                 url = new URL(fullUrl);
1073                 conn = (HttpURLConnection) url.openConnection();
1074                 conn.setRequestMethod("PUT");
1075                 conn.setDoOutput(true);
1076                 conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
1077                 conn.setRequestProperty("Connection", "Close");
1078
1079             } catch (IOException e) {
1080
1081                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1082                 if (logger.isDebugEnabled()) {
1083                     logger.debug(Throwables.getStackTraceAsString(e));
1084                 }
1085
1086                 throw new DocumentStoreOperationException(
1087                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
1088             }
1089
1090             StringBuilder bulkResult = new StringBuilder(128);
1091             try {
1092                 // Create an output stream to write our request to.
1093                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1094
1095                 if (logger.isDebugEnabled()) {
1096                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1097                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1098                 }
1099
1100                 // Write the resulting request string to our output stream. (this sends the request to ES?)
1101                 out.write(esOperationSet.toString());
1102                 out.close();
1103
1104                 // Open an input stream on our connection in order to read back the results.
1105                 InputStream is = conn.getInputStream();
1106                 InputStreamReader inputstreamreader = new InputStreamReader(is);
1107                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1108
1109                 // Read the contents of the input stream into our result string...
1110                 String esResponseString = null;
1111
1112                 while ((esResponseString = bufferedreader.readLine()) != null) {
1113                     bulkResult.append(esResponseString).append("\n");
1114                 }
1115
1116             } catch (IOException e) {
1117
1118                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1119                 if (logger.isDebugEnabled()) {
1120                     StringWriter sw = new StringWriter();
1121                     e.printStackTrace(new PrintWriter(sw));
1122                     logger.debug(sw.toString());
1123                 }
1124
1125                 throw new DocumentStoreOperationException(
1126                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
1127             }
1128
1129             if (logger.isDebugEnabled()) {
1130                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1131             }
1132
1133             // ...and marshal the resulting string into a Java object.
1134             try {
1135                 opResult = marshallEsBulkResult(bulkResult.toString());
1136
1137             } catch (IOException e) {
1138
1139                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1140                 if (logger.isDebugEnabled()) {
1141                     logger.debug(Throwables.getStackTraceAsString(e));
1142                 }
1143
1144                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
1145                         e);
1146             }
1147         }
1148
1149         // Finally, build the operation result and return it to the caller.
1150         OperationResult result = new OperationResult();
1151         result.setResultCode(207);
1152         result.setResult(buildGenericBulkResultSet(opResult, rejected));
1153
1154         // In the success case we don't want the entire result string to be
1155         // dumped into the metrics log, so concatenate it.
1156         String resultStringForMetricsLog = result.getResult();
1157         if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) {
1158             resultStringForMetricsLog =
1159                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1160         }
1161
1162         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1163                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1164                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1165                 override);
1166
1167         return result;
1168     }
1169
1170
1171     /**
1172      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1173      *
1174      * @param request - The request to be performed.
1175      * @param sb - The string builder to append the json data to
1176      * @throws DocumentStoreOperationException
1177      */
1178     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1179             throws DocumentStoreOperationException {
1180
1181         boolean retVal = true;
1182         // What kind of operation are we performing?
1183         switch (request.getOperationType()) {
1184
1185             // Create a new document.
1186             case CREATE:
1187
1188                 // Make sure that we were supplied a document payload.
1189                 if (request.getOperation().getDocument() == null) {
1190
1191                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1192                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1193                     return false;
1194                 }
1195
1196                 // Make sure that the supplied document URL is formatted
1197                 // correctly.
1198                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1199                     fails.add(generateRejectionEntry(request.getOperationType(),
1200                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1201                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1202                     return false;
1203                 }
1204
1205                 // Validate that the specified index actually exists before we
1206                 // try to perform the create.
1207                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1208
1209                     fails.add(generateRejectionEntry(request.getOperationType(),
1210                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1211                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1212                     return false;
1213                 }
1214
1215                 // If we were supplied an id for the new document, then
1216                 // include it in the bulk operation to Elastic Search
1217                 if (request.getId() == null) {
1218
1219                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1220
1221                     // Otherwise, we just leave that parameter off and ElasticSearch
1222                     // will generate one for us.
1223                 } else {
1224                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1225                             request.getId()));
1226                 }
1227
1228                 try {
1229                     // Append the document that we want to create.
1230                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1231                 } catch (JsonProcessingException e) {
1232                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1233                 }
1234
1235                 break;
1236
1237             // Update an existing document.
1238             case UPDATE:
1239
1240                 // Make sure that we were supplied a document payload.
1241                 if (request.getOperation().getDocument() == null) {
1242
1243                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1244                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1245                     return false;
1246                 }
1247
1248                 // Make sure that the supplied document URL is formatted
1249                 // correctly.
1250                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1251                     fails.add(generateRejectionEntry(request.getOperationType(),
1252                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1253                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1254                     return false;
1255                 }
1256
1257                 // Validate that the specified index actually exists before we
1258                 // try to perform the update.
1259                 if (!indexExists(request.getIndex())) {
1260
1261                     fails.add(generateRejectionEntry(request.getOperationType(),
1262                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1263                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1264                     return false;
1265                 }
1266
1267                 // Validate that the document we are trying to update actually
1268                 // exists before we try to perform the update.
1269                 if (!documentExists(request.getIndex(), request.getId())) {
1270
1271                     fails.add(generateRejectionEntry(request.getOperationType(),
1272                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1273                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1274                     return false;
1275                 }
1276
1277                 // It is mandatory that a version be supplied for an update operation,
1278                 // so validate that now.
1279                 if (request.getOperation().getMetaData().getEtag() == null) {
1280
1281                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1282                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1283                     return false;
1284                 }
1285
1286                 // Generate the update request...
1287                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1288                         request.getOperation().getMetaData().getEtag()));
1289
1290                 // ...and append the document that we want to update.
1291                 try {
1292                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1293                 } catch (JsonProcessingException e) {
1294                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1295                 }
1296                 break;
1297
1298             // Delete an existing document.
1299             case DELETE:
1300
1301                 // Make sure that the supplied document URL is formatted
1302                 // correctly.
1303                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1304                     fails.add(generateRejectionEntry(request.getOperationType(),
1305                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1306                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1307                     return false;
1308                 }
1309
1310                 // Validate that the specified index actually exists before we
1311                 // try to perform the delete.
1312                 if (!indexExists(request.getIndex())) {
1313
1314                     fails.add(generateRejectionEntry(request.getOperationType(),
1315                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1316                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1317                     return false;
1318                 }
1319
1320                 // Validate that the document we are trying to update actually
1321                 // exists before we try to perform the delete.
1322                 if (!documentExists(request.getIndex(), request.getId())) {
1323
1324                     fails.add(generateRejectionEntry(request.getOperationType(),
1325                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1326                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1327                     return false;
1328                 }
1329
1330                 // It is mandatory that a version be supplied for a delete operation,
1331                 // so validate that now.
1332                 if (request.getOperation().getMetaData().getEtag() == null) {
1333
1334                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1335                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1336                     return false;
1337                 }
1338
1339                 // Generate the delete request.
1340                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1341                         request.getOperation().getMetaData().getEtag()));
1342                 break;
1343             default:
1344         }
1345
1346         return retVal;
1347     }
1348
1349     private boolean indexExists(String index) throws DocumentStoreOperationException {
1350
1351         OperationResult indexExistsResult = checkIndexExistence(index);
1352
1353         return ((indexExistsResult.getResultCode() >= 200) && (indexExistsResult.getResultCode() < 300));
1354     }
1355
1356     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1357
1358         OperationResult docExistsResult = checkDocumentExistence(index, id);
1359
1360         return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300));
1361     }
1362
1363     /**
1364      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1365      * document store.
1366      *
1367      * @param rejectReason - A message describing why the operation was rejected.
1368      * @param anId - The identifier associated with the document being acted on.
1369      * @param statusCode - An HTTP status code.
1370      * @return - A result set item.
1371      */
1372     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1373             String anId, int statusCode, String originalUrl) {
1374
1375         ElasticSearchError err = new ElasticSearchError();
1376         err.setReason(rejectReason);
1377
1378         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1379         op.setIndex(index);
1380         op.setId(anId);
1381         op.setStatus(statusCode);
1382         op.setError(err);
1383         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1384
1385         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1386
1387         switch (opType) {
1388             case CREATE:
1389                 rejectionResult.setCreate(op);
1390                 break;
1391             case UPDATE:
1392                 rejectionResult.setIndex(op);
1393                 break;
1394             case DELETE:
1395                 rejectionResult.setDelete(op);
1396                 break;
1397             default:
1398         }
1399
1400         return rejectionResult;
1401     }
1402
1403     /**
1404      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1405      * marshals it into a Java object.
1406      *
1407      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1408      * @return - The marshalled response.
1409      * @throws JsonParseException
1410      * @throws JsonMappingException
1411      * @throws IOException
1412      */
1413     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1414         if (jsonResult != null) {
1415             if (logger.isDebugEnabled()) {
1416                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1417             }
1418
1419             ObjectMapper mapper = new ObjectMapper();
1420             mapper.setSerializationInclusion(Include.NON_EMPTY);
1421
1422             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1423         }
1424
1425         return null;
1426     }
1427
1428     /**
1429      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1430      *
1431      * @param esResult - ElasticSearch bulk operations response.
1432      * @return - A generic result set.
1433      */
1434     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1435             List<ElasticSearchResultItem> rejectedOps) {
1436         int totalOps = 0;
1437         int totalSuccess = 0;
1438         int totalFails = 0;
1439
1440         if (logger.isDebugEnabled()) {
1441
1442             logger.debug("ESController: Build generic result set.  ES Results: "
1443                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1444         }
1445
1446         // Build a combined list of result items from the results returned
1447         // from ElasticSearch and the list of operations that we rejected
1448         // without sending to ElasticSearch.
1449         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1450         if (esResult != null) {
1451             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1452         }
1453         combinedResults.addAll(rejectedOps);
1454
1455         // Iterate over the individual results in the resulting result set.
1456         StringBuilder resultsBuilder = new StringBuilder();
1457         AtomicBoolean firstItem = new AtomicBoolean(true);
1458         for (ElasticSearchResultItem item : combinedResults) {
1459
1460             // Increment the operation counts.
1461             totalOps++;
1462             if (isSuccessCode(item.operationStatus().getStatus())) {
1463                 totalSuccess++;
1464             } else {
1465                 totalFails++;
1466             }
1467
1468             // Prepend a comma to our response string unless this it the
1469             // first result in the set.
1470             if (!firstItem.compareAndSet(true, false)) {
1471                 resultsBuilder.append(", ");
1472             }
1473
1474             // Append the current result as a generic json structure.
1475             resultsBuilder.append(item.toJson());
1476         }
1477
1478         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1479                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1480     }
1481
1482
1483     /**
1484      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1485      *
1486      * @param indexName - The index to look for.
1487      * @return - An operation result indicating the success or failure of the check.
1488      * @throws DocumentStoreOperationException
1489      */
1490     public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1491
1492         // Initialize operation result with a failure codes / fault string
1493         OperationResult opResult = new OperationResult();
1494         opResult.setResultCode(500);
1495
1496         // Grab the current time so we can use it to generate a metrics log.
1497         MdcOverride override = getStartTime(new MdcOverride());
1498
1499         String fullUrl = getFullUrl("/" + indexName, false);
1500         HttpURLConnection conn = initializeConnection(fullUrl);
1501
1502         try {
1503             conn.setRequestMethod("HEAD");
1504
1505         } catch (ProtocolException e) {
1506             shutdownConnection(conn);
1507             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1508         }
1509
1510         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1511
1512         int resultCode;
1513         try {
1514             resultCode = conn.getResponseCode();
1515         } catch (IOException e) {
1516             shutdownConnection(conn);
1517             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1518         }
1519         logger.debug("Response Code : " + resultCode);
1520
1521         opResult.setResultCode(resultCode);
1522
1523         // Generate a metrics log so we can track how long the operation took.
1524         metricsLogger
1525                 .info(SearchDbMsgs.CHECK_INDEX_TIME,
1526                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1527                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1528                         override, indexName);
1529
1530         shutdownConnection(conn);
1531
1532         return opResult;
1533     }
1534
1535
1536     private void buildDocumentResult(DocumentOperationResult result, String index)
1537             throws DocumentStoreOperationException {
1538
1539         JSONParser parser = new JSONParser();
1540         JSONObject root;
1541         try {
1542             root = (JSONObject) parser.parse(result.getResult());
1543
1544             if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1545                 // Success response object
1546                 Document doc = new Document();
1547                 doc.setEtag(result.getResultVersion());
1548                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1549
1550                 doc.setContent((JSONObject) root.get("_source"));
1551                 result.setDocument(doc);
1552
1553             } else {
1554                 // Error response object
1555                 JSONObject error = (JSONObject) root.get("error");
1556                 if (error != null) {
1557                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1558                 }
1559
1560             }
1561         } catch (Exception e) {
1562             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1563         }
1564
1565
1566     }
1567
1568     private String buildDocumentResponseUrl(String index, String id) {
1569         return ApiUtils.buildDocumentUri(index, id);
1570     }
1571
1572     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1573
1574         JSONParser parser = new JSONParser();
1575         JSONObject root;
1576
1577         try {
1578             root = (JSONObject) parser.parse(result.getResult());
1579             if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1580                 JSONObject hits = (JSONObject) root.get("hits");
1581                 JSONArray hitArray = (JSONArray) hits.get("hits");
1582                 SearchHits searchHits = new SearchHits();
1583                 searchHits.setTotalHits(hits.get("total").toString());
1584                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1585
1586                 for (int i = 0; i < hitArray.size(); i++) {
1587                     JSONObject hit = (JSONObject) hitArray.get(i);
1588                     SearchHit searchHit = new SearchHit();
1589                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1590                     Document doc = new Document();
1591                     if (hit.get("_version") != null) {
1592                         doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1593                     }
1594
1595                     doc.setUrl(
1596                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1597                     doc.setContent((JSONObject) hit.get("_source"));
1598                     searchHit.setDocument(doc);
1599                     searchHitArray.add(searchHit);
1600                 }
1601                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1602                 result.setSearchResult(searchHits);
1603
1604                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1605                 if (aggregations != null) {
1606                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1607                     AggregationResults aggs = new AggregationResults();
1608                     aggs.setAggregations(aggResults);
1609                     result.setAggregationResult(aggs);
1610                 }
1611
1612                 // success
1613             } else {
1614                 JSONObject error = (JSONObject) root.get("error");
1615                 if (error != null) {
1616                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1617                 }
1618             }
1619         } catch (Exception e) {
1620             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1621         }
1622
1623     }
1624
1625     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1626         JSONParser parser = new JSONParser();
1627         JSONObject root;
1628         try {
1629             root = (JSONObject) parser.parse(result.getResult());
1630             if (result.getResultCode() >= 200 && result.getResultCode() <= 299) {
1631                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1632                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1633                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1634                 SuggestHits suggestHits = new SuggestHits();
1635                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1636
1637                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1638
1639                 for (int i = 0; i < optionsArray.size(); i++) {
1640                     JSONObject hit = (JSONObject) optionsArray.get(i);
1641
1642                     SuggestHit suggestHit = new SuggestHit();
1643                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1644                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1645                     Document doc = new Document();
1646                     if (hit.get("_version") != null) {
1647                         doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1648                     }
1649                     doc.setUrl(
1650                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1651
1652                     doc.setContent((JSONObject) hit.get("payload"));
1653                     suggestHit.setDocument(doc);
1654                     suggestHitArray.add(suggestHit);
1655                 }
1656                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1657                 result.setSuggestResult(suggestHits);
1658
1659                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1660                 if (aggregations != null) {
1661                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1662                     AggregationResults aggs = new AggregationResults();
1663                     aggs.setAggregations(aggResults);
1664                     result.setAggregationResult(aggs);
1665                 }
1666
1667                 // success
1668             } else {
1669                 JSONObject error = (JSONObject) root.get("error");
1670                 if (error != null) {
1671                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1672                 }
1673             }
1674         } catch (Exception e) {
1675             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1676         }
1677     }
1678 }