Replace 2xx result codes with Java constants
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.json.simple.JSONArray;
57 import org.json.simple.JSONObject;
58 import org.json.simple.parser.JSONParser;
59 import org.json.simple.parser.ParseException;
60 import org.onap.aai.cl.api.LogFields;
61 import org.onap.aai.cl.api.LogLine;
62 import org.onap.aai.cl.api.Logger;
63 import org.onap.aai.cl.eelf.LoggerFactory;
64 import org.onap.aai.cl.mdc.MdcContext;
65 import org.onap.aai.cl.mdc.MdcOverride;
66 import org.onap.aai.sa.rest.AnalysisConfiguration;
67 import org.onap.aai.sa.rest.ApiUtils;
68 import org.onap.aai.sa.rest.BulkRequest;
69 import org.onap.aai.sa.rest.BulkRequest.OperationType;
70 import org.onap.aai.sa.rest.DocumentSchema;
71 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
73 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
75 import org.onap.aai.sa.searchdbabstraction.entity.Document;
76 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
77 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
80 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
81 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
82 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
84 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
85 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
86 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
87 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
88 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
89
90 /**
91  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
92  */
93 public class ElasticSearchHttpController implements DocumentStoreInterface {
94
95     private static ElasticSearchHttpController instance = null;
96
97     private static final Logger logger =
98             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
99     private static final Logger metricsLogger =
100             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
101
102     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
103             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
104     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
105             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
106     private static final String BULK_IMPORT_INDEX_TEMPLATE =
107             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
108     private static final String BULK_DELETE_TEMPLATE =
109             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
110
111     private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT =
112             "Internal Error: ElasticSearch operation fault occurred";
113     private final ElasticSearchConfig config;
114
115     private static final String DEFAULT_TYPE = "default";
116
117     protected AnalysisConfiguration analysisConfig;
118
119     public static ElasticSearchHttpController getInstance() {
120
121         synchronized (ElasticSearchHttpController.class) {
122
123             if (instance == null) {
124                 Properties properties = new Properties();
125                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
126                 try {
127                     properties.load(new FileInputStream(file));
128                 } catch (Exception e) {
129                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
130                             e.getLocalizedMessage());
131                 }
132
133                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
134                 instance = new ElasticSearchHttpController(config);
135             }
136         }
137
138         return instance;
139     }
140
141     public ElasticSearchHttpController(ElasticSearchConfig config) {
142         this.config = config;
143         analysisConfig = new AnalysisConfiguration();
144
145         try {
146             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
147             checkConnection();
148             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
149         } catch (Exception e) {
150             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
151                     e.getMessage());
152         }
153     }
154
155
156     public AnalysisConfiguration getAnalysisConfig() {
157         return analysisConfig;
158     }
159
160     @Override
161     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
162         OperationResult result = new OperationResult();
163         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
164
165         try {
166
167             // Submit the request to ElasticSearch to create the index using a
168             // default document type.
169             result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
170                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
171
172             // ElasticSearch will return us a 200 code on success when we
173             // want to report a 201, so translate the result here.
174             if (result.getResultCode() == Status.OK.getStatusCode()) {
175                 result.setResultCode(Status.CREATED.getStatusCode());
176             }
177
178             if (isSuccess(result)) {
179                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
180             }
181         } catch (DocumentStoreOperationException | IOException e) {
182             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
183         }
184
185         return result;
186     }
187
188     @Override
189     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
190         OperationResult result = new OperationResult();
191         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
192
193         try {
194             result = createTable(index, dynamicSchema);
195
196             // ElasticSearch will return us a 200 code on success when we
197             // want to report a 201, so translate the result here.
198             if (result.getResultCode() == Status.OK.getStatusCode()) {
199                 result.setResultCode(Status.CREATED.getStatusCode());
200             }
201             if (isSuccess(result)) {
202                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
203             }
204         } catch (DocumentStoreOperationException e) {
205             result.setFailureCause("Document store operation failure.  Cause: " + e.getMessage());
206         }
207
208         return result;
209     }
210
211     @Override
212     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
213
214         // Initialize operation result with a failure codes / fault string
215         OperationResult opResult = new OperationResult();
216         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
217         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
218
219         // Grab the current time so we can use it to generate a metrics log.
220         MdcOverride override = getStartTime(new MdcOverride());
221
222         String fullUrl = getFullUrl("/" + indexName + "/", false);
223         HttpURLConnection conn = initializeConnection(fullUrl);
224
225         logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
226
227         try {
228             conn.setRequestMethod("DELETE");
229         } catch (ProtocolException e) {
230             shutdownConnection(conn);
231             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
232         }
233
234         handleResponse(conn, opResult);
235
236         // Generate a metrics log so we can track how long the operation took.
237         metricsLogger
238                 .info(SearchDbMsgs.DELETE_INDEX_TIME,
239                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
240                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
241                         override, indexName);
242
243         shutdownConnection(conn);
244
245         return opResult;
246     }
247
248     private OperationResult checkConnection() throws IOException {
249         String fullUrl = getFullUrl("/_cluster/health", false);
250         URL url = null;
251         HttpURLConnection conn = null;
252
253         url = new URL(fullUrl);
254         conn = (HttpURLConnection) url.openConnection();
255         conn.setRequestMethod("GET");
256         conn.setDoOutput(true);
257         logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
258
259         int resultCode = conn.getResponseCode();
260         logger.debug("getClusterHealth() response Code : " + resultCode);
261         OperationResult opResult = new OperationResult();
262         opResult.setResultCode(resultCode);
263
264         shutdownConnection(conn);
265
266         return opResult;
267     }
268
269     private String getFullUrl(String resourceUrl, boolean isSecure) {
270
271         final String host = config.getIpAddress();
272         final String port = config.getHttpPort();
273
274         if (isSecure) {
275             return String.format("https://%s:%s%s", host, port, resourceUrl);
276         } else {
277             return String.format("http://%s:%s%s", host, port, resourceUrl);
278         }
279     }
280
281     private void shutdownConnection(HttpURLConnection connection) {
282         if (connection == null) {
283             return;
284         }
285
286         InputStream inputstream = null;
287         OutputStream outputstream = null;
288
289         try {
290             inputstream = connection.getInputStream();
291         } catch (IOException e) {
292             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
293         } finally {
294             if (inputstream != null) {
295                 try {
296                     inputstream.close();
297                 } catch (IOException e) {
298                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
299                             e.getLocalizedMessage());
300                 }
301             }
302         }
303
304         try {
305             outputstream = connection.getOutputStream();
306         } catch (IOException e) {
307             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage());
308         } finally {
309             if (outputstream != null) {
310                 try {
311                     outputstream.close();
312                 } catch (IOException e) {
313                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection",
314                             e.getLocalizedMessage());
315                 }
316             }
317         }
318
319         connection.disconnect();
320     }
321
322     // @Override
323     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
324             throws DocumentStoreOperationException {
325
326         if (indexSettings == null) {
327             logger.debug("No settings provided.");
328         }
329
330         if (indexMappings == null) {
331             logger.debug("No mappings provided.");
332         }
333
334         OperationResult opResult = new OperationResult();
335
336         // Initialize operation result with a failure codes / fault string
337         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
338         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
339
340         // Grab the current time so we can use it to generate a metrics log.
341         MdcOverride override = getStartTime(new MdcOverride());
342
343         String fullUrl = getFullUrl("/" + indexName + "/", false);
344         HttpURLConnection conn = initializeConnection(fullUrl);
345
346         try {
347             conn.setRequestMethod("PUT");
348         } catch (ProtocolException e) {
349             shutdownConnection(conn);
350             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
351         }
352
353         StringBuilder sb = new StringBuilder(128);
354         sb.append("{ \"settings\" : ");
355         sb.append(indexSettings);
356         sb.append(",");
357
358         sb.append("\"mappings\" : {");
359         sb.append("\"" + typeName + "\" :");
360         sb.append(indexMappings);
361         sb.append("}}");
362
363         try {
364             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
365         } catch (IOException e) {
366             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
367             throw new DocumentStoreOperationException(e.getMessage(), e);
368         }
369
370         logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
371         logger.debug("Request content: " + sb.toString());
372
373         handleResponse(conn, opResult);
374
375         shutdownConnection(conn);
376
377         // Generate a metrics log so we can track how long the operation took.
378         metricsLogger
379                 .info(SearchDbMsgs.CREATE_INDEX_TIME,
380                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
381                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
382                         override, indexName);
383
384         return opResult;
385     }
386
387     /**
388      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
389      * index.
390      *
391      * @param indexName - The name of the index to be created
392      * @param settingsAndMappings - The actual JSON object that will define the index
393      * @return - The operation result of writing into Elasticsearch
394      * @throws DocumentStoreOperationException
395      */
396     protected OperationResult createTable(String indexName, String settingsAndMappings)
397             throws DocumentStoreOperationException {
398         OperationResult result = new OperationResult();
399         result.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
400         result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
401
402         // Grab the current time so we can use it to generate a metrics log.
403         MdcOverride override = getStartTime(new MdcOverride());
404
405         String fullUrl = getFullUrl("/" + indexName + "/", false);
406         HttpURLConnection conn = initializeConnection(fullUrl);
407
408         try {
409             conn.setRequestMethod("PUT");
410         } catch (ProtocolException e) {
411             shutdownConnection(conn);
412             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
413         }
414
415         try {
416             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
417         } catch (IOException e) {
418             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
419             throw new DocumentStoreOperationException(e.getMessage());
420         }
421         handleResponse(conn, result);
422
423         // Generate a metrics log so we can track how long the operation took.
424         metricsLogger
425                 .info(SearchDbMsgs.CREATE_INDEX_TIME,
426                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
427                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()),
428                         override, indexName);
429
430         return result;
431     }
432
433     @Override
434     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
435             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
436
437         if (!allowImplicitIndexCreation) {
438             // Before we do anything, make sure that the specified index actually exists in the
439             // document store - we don't want to rely on ElasticSearch to fail the document
440             // create because it could be configured to implicitly create a non-existent index,
441             // which can lead to hard-to-debug behaviour with queries down the road.
442             OperationResult indexExistsResult = checkIndexExistence(indexName);
443             if (!isSuccess(indexExistsResult)) {
444                 DocumentOperationResult opResult = new DocumentOperationResult();
445                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
446                 opResult.setResult("Document Index '" + indexName + "' does not exist.");
447                 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
448                 return opResult;
449             }
450         }
451
452         if (document.getId() == null || document.getId().isEmpty()) {
453             return createDocumentWithoutId(indexName, document);
454         } else {
455             return createDocumentWithId(indexName, document);
456         }
457     }
458
459     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
460             throws DocumentStoreOperationException {
461         // check if the document already exists
462         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
463
464         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
465             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
466                 opResult.setFailureCause("A document with the same id already exists.");
467             } else {
468                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
469             }
470             opResult.setResultCode(Status.CONFLICT.getStatusCode());
471             return opResult;
472         }
473
474         opResult = new DocumentOperationResult();
475         // Initialize operation result with a failure codes / fault string
476         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
477         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
478
479         // Grab the current time so we can use it to generate a metrics log.
480         MdcOverride override = getStartTime(new MdcOverride());
481
482         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
483         HttpURLConnection conn = initializeConnection(fullUrl);
484
485         try {
486             conn.setRequestMethod("PUT");
487         } catch (ProtocolException e) {
488             shutdownConnection(conn);
489             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
490         }
491
492         attachDocument(conn, document);
493
494         logger.debug("Sending 'PUT' request to: " + conn.getURL());
495
496         handleResponse(conn, opResult);
497         buildDocumentResult(opResult, indexName);
498
499         // Generate a metrics log so we can track how long the operation took.
500         metricsLogger
501                 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
502                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
503                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
504                         override, indexName);
505
506         shutdownConnection(conn);
507
508         return opResult;
509     }
510
511     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
512             throws DocumentStoreOperationException {
513
514         DocumentOperationResult response = new DocumentOperationResult();
515         // Initialize operation result with a failure codes / fault string
516         response.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
517         response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
518
519         // Grab the current time so we can use it to generate a metrics log.
520         MdcOverride override = getStartTime(new MdcOverride());
521
522         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
523         HttpURLConnection conn = initializeConnection(fullUrl);
524
525         try {
526             conn.setRequestMethod("POST");
527         } catch (ProtocolException e) {
528             shutdownConnection(conn);
529             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
530         }
531
532         attachDocument(conn, document);
533
534         logger.debug("Sending 'POST' request to: " + conn.getURL());
535
536         handleResponse(conn, response);
537         buildDocumentResult(response, indexName);
538
539         // Generate a metrics log so we can track how long the operation took.
540         metricsLogger
541                 .info(SearchDbMsgs.CREATE_DOCUMENT_TIME,
542                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode())
543                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()),
544                         override, indexName);
545
546         shutdownConnection(conn);
547
548         return response;
549     }
550
551     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
552             throws DocumentStoreOperationException {
553         conn.setRequestProperty("Connection", "Close");
554         attachContent(conn, doc.getContentInJson());
555     }
556
557     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
558             throws DocumentStoreOperationException {
559         DocumentOperationResult opResult = new DocumentOperationResult();
560
561         // Initialize operation result with a failure codes / fault string
562         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
563
564         // Grab the current time so we can use it to generate a metrics log.
565         MdcOverride override = getStartTime(new MdcOverride());
566
567         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
568         HttpURLConnection conn = initializeConnection(fullUrl);
569
570         try {
571             conn.setRequestMethod("HEAD");
572         } catch (ProtocolException e) {
573             shutdownConnection(conn);
574             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
575         }
576
577         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
578
579         int resultCode;
580         try {
581             resultCode = conn.getResponseCode();
582         } catch (IOException e) {
583             shutdownConnection(conn);
584             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
585         }
586
587         logger.debug("Response Code : " + resultCode);
588
589         opResult.setResultCode(resultCode);
590
591         // Generate a metrics log so we can track how long the operation took.
592         metricsLogger
593                 .info(SearchDbMsgs.GET_DOCUMENT_TIME,
594                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
595                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
596                         override, indexName, docId);
597
598         shutdownConnection(conn);
599
600         return opResult;
601     }
602
603     @Override
604     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
605             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
606
607         if (!allowImplicitIndexCreation) {
608             // Before we do anything, make sure that the specified index actually exists in the
609             // document store - we don't want to rely on ElasticSearch to fail the document
610             // create because it could be configured to implicitly create a non-existent index,
611             // which can lead to hard-to-debug behaviour with queries down the road.
612             OperationResult indexExistsResult = checkIndexExistence(indexName);
613             if (!isSuccess(indexExistsResult)) {
614                 DocumentOperationResult opResult = new DocumentOperationResult();
615                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
616                 opResult.setResult("Document Index '" + indexName + "' does not exist.");
617                 opResult.setFailureCause("Document Index '" + indexName + "' does not exist.");
618                 return opResult;
619             }
620         }
621
622         DocumentOperationResult opResult = new DocumentOperationResult();
623
624         // Initialize operation result with a failure codes / fault string
625         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
626         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
627
628         // Grab the current time so we can use it to generate a metrics log.
629         MdcOverride override = getStartTime(new MdcOverride());
630
631         String fullUrl = getFullUrl(
632                 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
633                 false);
634         HttpURLConnection conn = initializeConnection(fullUrl);
635
636         try {
637             conn.setRequestMethod("PUT");
638         } catch (ProtocolException e) {
639             shutdownConnection(conn);
640             throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e);
641         }
642
643         attachDocument(conn, document);
644
645         logger.debug("Sending 'PUT' request to: " + conn.getURL());
646
647         handleResponse(conn, opResult);
648         buildDocumentResult(opResult, indexName);
649
650         // Generate a metrics log so we can track how long the operation took.
651         metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME,
652                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
653                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
654                 override, indexName, document.getId());
655
656         shutdownConnection(conn);
657
658         return opResult;
659     }
660
661     @Override
662     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
663             throws DocumentStoreOperationException {
664         DocumentOperationResult opResult = new DocumentOperationResult();
665
666         // Initialize operation result with a failure codes / fault string
667         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
668         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
669
670         // Grab the current time so we can use it to generate a metrics log.
671         MdcOverride override = getStartTime(new MdcOverride());
672
673         String fullUrl = getFullUrl(
674                 "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
675                 false);
676         HttpURLConnection conn = initializeConnection(fullUrl);
677
678         try {
679             conn.setRequestMethod("DELETE");
680         } catch (ProtocolException e) {
681             shutdownConnection(conn);
682             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
683         }
684
685         logger.debug("\nSending 'DELETE' request to " + conn.getURL());
686
687         handleResponse(conn, opResult);
688         buildDocumentResult(opResult, indexName);
689         // supress the etag and url in response for delete as they are not required
690         if (opResult.getDocument() != null) {
691             opResult.getDocument().setEtag(null);
692             opResult.getDocument().setUrl(null);
693         }
694
695         // Generate a metrics log so we can track how long the operation took.
696         metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME,
697                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult())
698                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
699                 override, indexName, document.getId());
700
701         shutdownConnection(conn);
702
703         return opResult;
704     }
705
706     @Override
707     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
708             throws DocumentStoreOperationException {
709         DocumentOperationResult opResult = new DocumentOperationResult();
710
711         // Initialize operation result with a failure codes / fault string
712         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
713         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
714
715         // Grab the current time so we can use it to generate a metrics log.
716         MdcOverride override = getStartTime(new MdcOverride());
717
718         String fullUrl = null;
719         if (document.getVersion() == null) {
720             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
721         } else {
722             fullUrl = getFullUrl(
723                     "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(),
724                     false);
725         }
726         HttpURLConnection conn = initializeConnection(fullUrl);
727
728         logger.debug("\nSending 'GET' request to: " + conn.getURL());
729
730         handleResponse(conn, opResult);
731         buildDocumentResult(opResult, indexName);
732
733         // Generate a metrics log so we can track how long the operation took.
734         metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME,
735                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
736                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
737                 override, indexName, document.getId());
738
739         shutdownConnection(conn);
740
741         return opResult;
742     }
743
744     @Override
745     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
746         SearchOperationResult opResult = new SearchOperationResult();
747
748         // Initialize operation result with a failure codes / fault string
749         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
750         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
751
752         String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
753
754         // Grab the current time so we can use it to generate a metrics log.
755         MdcOverride override = getStartTime(new MdcOverride());
756
757         HttpURLConnection conn = initializeConnection(fullUrl);
758
759         try {
760             conn.setRequestMethod("GET");
761         } catch (ProtocolException e) {
762             shutdownConnection(conn);
763             throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
764         }
765
766         logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
767
768         handleResponse(conn, opResult);
769         buildSearchResult(opResult, indexName);
770
771
772         metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
773                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
774                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
775                 override, indexName, queryString);
776
777         return opResult;
778     }
779
780     @Override
781     public SearchOperationResult searchWithPayload(String indexName, String query)
782             throws DocumentStoreOperationException {
783         SearchOperationResult opResult = new SearchOperationResult();
784
785         if (logger.isDebugEnabled()) {
786             logger.debug("Querying index: " + indexName + " with query string: " + query);
787         }
788
789         // Initialize operation result with a failure codes / fault string
790         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
791         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
792
793         String fullUrl = getFullUrl("/" + indexName + "/_search", false);
794
795         // Grab the current time so we can use it to generate a metrics log.
796         MdcOverride override = getStartTime(new MdcOverride());
797
798         HttpURLConnection conn = initializeConnection(fullUrl);
799
800         try {
801             conn.setRequestMethod("POST");
802         } catch (ProtocolException e) {
803             shutdownConnection(conn);
804             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
805         }
806
807         attachContent(conn, query);
808
809         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
810         logger.debug("Request body =  Elasticsearch query = " + query);
811
812         handleResponse(conn, opResult);
813         buildSearchResult(opResult, indexName);
814
815         metricsLogger
816                 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
817                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
818                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
819                         override, indexName, query);
820
821         shutdownConnection(conn);
822
823         return opResult;
824     }
825
826
827     @Override
828     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
829             throws DocumentStoreOperationException {
830
831         SearchOperationResult opResult = new SearchOperationResult();
832
833         if (logger.isDebugEnabled()) {
834             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
835         }
836
837         // Initialize operation result with a failure codes / fault string
838         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
839         opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT);
840
841         String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
842
843         // Grab the current time so we can use it to generate a metrics log.
844         MdcOverride override = getStartTime(new MdcOverride());
845
846         HttpURLConnection conn = initializeConnection(fullUrl);
847
848         try {
849             conn.setRequestMethod("POST");
850         } catch (ProtocolException e) {
851             shutdownConnection(conn);
852             throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e);
853         }
854
855         attachContent(conn, query);
856
857         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
858         logger.debug("Request body =  Elasticsearch query = " + query);
859
860         handleResponse(conn, opResult);
861         buildSuggestResult(opResult, indexName);
862
863         metricsLogger
864                 .info(SearchDbMsgs.QUERY_DOCUMENT_TIME,
865                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
866                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()),
867                         override, indexName, query);
868
869         shutdownConnection(conn);
870
871         return opResult;
872     }
873
874     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
875         OutputStream outputStream = null;
876         OutputStreamWriter out = null;
877
878         try {
879             outputStream = conn.getOutputStream();
880         } catch (IOException e) {
881             shutdownConnection(conn);
882             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
883         }
884
885         out = new OutputStreamWriter(outputStream);
886
887         try {
888             out.write(content);
889             out.close();
890         } catch (IOException e) {
891             shutdownConnection(conn);
892             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
893         }
894     }
895
896     private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
897         URL url = null;
898         HttpURLConnection conn = null;
899
900         try {
901             url = new URL(fullUrl);
902         } catch (MalformedURLException e) {
903             throw new DocumentStoreOperationException("Error building a URL with " + url, e);
904         }
905
906         try {
907             conn = (HttpURLConnection) url.openConnection();
908             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
909             conn.setDoOutput(true);
910         } catch (IOException e) {
911             shutdownConnection(conn);
912             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
913         }
914
915         return conn;
916     }
917
918     private void handleResponse(HttpURLConnection conn, OperationResult opResult)
919             throws DocumentStoreOperationException {
920         int resultCode;
921
922         try {
923             resultCode = conn.getResponseCode();
924         } catch (IOException e) {
925             shutdownConnection(conn);
926             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
927         }
928
929         logger.debug("Response Code : " + resultCode);
930
931         InputStream inputStream = null;
932
933         if (!isSuccessCode(resultCode)) {
934             inputStream = conn.getErrorStream();
935         } else {
936             try {
937                 inputStream = conn.getInputStream();
938             } catch (IOException e) {
939                 shutdownConnection(conn);
940                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
941             }
942         }
943
944         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
945         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
946
947         StringBuilder result = new StringBuilder(128);
948         String string = null;
949
950         try {
951             while ((string = bufferedreader.readLine()) != null) {
952                 result.append(string).append("\n");
953             }
954         } catch (IOException e) {
955             shutdownConnection(conn);
956             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
957         }
958
959         if (resultCode == Status.CONFLICT.getStatusCode()) {
960             opResult.setResultCode(Status.PRECONDITION_FAILED.getStatusCode());
961         } else {
962             opResult.setResultCode(resultCode);
963         }
964         if (logger.isDebugEnabled()) {
965             logger.debug("Raw result string from ElasticSearch = " + result.toString());
966         }
967         opResult.setResult(result.toString());
968         opResult.setResultVersion(extractVersion(result.toString()));
969     }
970
971     private String extractVersion(String result) {
972         JSONParser parser = new JSONParser();
973         String version = null;
974         try {
975             JSONObject root = (JSONObject) parser.parse(result);
976             if (root.get("_version") != null) {
977                 version = root.get("_version").toString();
978             }
979         } catch (ParseException e) {
980             // Not all responses from ElasticSearch include a version, so
981             // if we don't get one back, just return an empty string rather
982             // than trigger a false failure.
983             version = "";
984         }
985         return version;
986     }
987
988     /**
989      * This convenience method gets the current system time and stores it in an attribute in the supplied
990      * {@link MdcOverride} object so that it can be used later by the metrics logger.
991      *
992      * @param override - The {@link MdcOverride} object to update.
993      * @return - The supplied {@link MdcOverride} object.
994      */
995     private MdcOverride getStartTime(MdcOverride override) {
996
997         // Grab the current time...
998         long startTimeInMs = System.currentTimeMillis();
999
1000         // ...and add it as an attribute to the supplied MDC Override
1001         // object.
1002         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
1003         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
1004
1005         // Return the MdcOverride object that we were passed.
1006         // This looks odd, but it allows us to do stuff like:
1007         //
1008         // MdcOverride ov = getStartTime(new MdcOverride())
1009         //
1010         // which is quite handy, but also allows us to pass in an existing
1011         // MdcOverride object which already has some attributes set.
1012         return override;
1013     }
1014
1015     private boolean isSuccess(OperationResult result) {
1016         return isSuccessCode(result.getResultCode());
1017     }
1018
1019     private boolean isSuccessCode(int statusCode) {
1020         return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
1021     }
1022
1023     @Override
1024     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
1025
1026         if (logger.isDebugEnabled()) {
1027             String dbgString = "ESController: performBulkOperations - Operations: ";
1028
1029             for (BulkRequest request : requests) {
1030                 dbgString += "[" + request.toString() + "] ";
1031             }
1032
1033             logger.debug(dbgString);
1034         }
1035
1036         // Grab the current time so we can use it to generate a metrics log.
1037         MdcOverride override = getStartTime(new MdcOverride());
1038
1039         // Parse the supplied set of operations.
1040         // Iterate over the list of operations which we were provided and
1041         // translate them into a format that ElasticSearh understands.
1042         int opCount = 0;
1043         StringBuilder esOperationSet = new StringBuilder(128);
1044         List<ElasticSearchResultItem> rejected = new ArrayList<>();
1045         for (BulkRequest request : requests) {
1046
1047             // Convert the request to the syntax ElasticSearch likes.
1048             if (buildEsOperation(request, esOperationSet, rejected)) {
1049                 opCount++;
1050             }
1051         }
1052
1053         ElasticSearchBulkOperationResult opResult = null;
1054         if (opCount > 0) {
1055
1056             // Open an HTTP connection to the ElasticSearch back end.
1057             String fullUrl = getFullUrl("/_bulk", false);
1058             URL url;
1059             HttpURLConnection conn;
1060             try {
1061
1062                 url = new URL(fullUrl);
1063                 conn = (HttpURLConnection) url.openConnection();
1064                 conn.setRequestMethod("PUT");
1065                 conn.setDoOutput(true);
1066                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
1067                 conn.setRequestProperty("Connection", "Close");
1068
1069             } catch (IOException e) {
1070
1071                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1072                 if (logger.isDebugEnabled()) {
1073                     logger.debug(Throwables.getStackTraceAsString(e));
1074                 }
1075
1076                 throw new DocumentStoreOperationException(
1077                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
1078             }
1079
1080             StringBuilder bulkResult = new StringBuilder(128);
1081             try {
1082                 // Create an output stream to write our request to.
1083                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
1084
1085                 if (logger.isDebugEnabled()) {
1086                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
1087                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
1088                 }
1089
1090                 // Write the resulting request string to our output stream. (this sends the request to ES?)
1091                 out.write(esOperationSet.toString());
1092                 out.close();
1093
1094                 // Open an input stream on our connection in order to read back the results.
1095                 InputStream is = conn.getInputStream();
1096                 InputStreamReader inputstreamreader = new InputStreamReader(is);
1097                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
1098
1099                 // Read the contents of the input stream into our result string...
1100                 String esResponseString = null;
1101
1102                 while ((esResponseString = bufferedreader.readLine()) != null) {
1103                     bulkResult.append(esResponseString).append("\n");
1104                 }
1105
1106             } catch (IOException e) {
1107
1108                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1109                 if (logger.isDebugEnabled()) {
1110                     StringWriter sw = new StringWriter();
1111                     e.printStackTrace(new PrintWriter(sw));
1112                     logger.debug(sw.toString());
1113                 }
1114
1115                 throw new DocumentStoreOperationException(
1116                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
1117             }
1118
1119             if (logger.isDebugEnabled()) {
1120                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
1121             }
1122
1123             // ...and marshal the resulting string into a Java object.
1124             try {
1125                 opResult = marshallEsBulkResult(bulkResult.toString());
1126
1127             } catch (IOException e) {
1128
1129                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1130                 if (logger.isDebugEnabled()) {
1131                     logger.debug(Throwables.getStackTraceAsString(e));
1132                 }
1133
1134                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
1135                         e);
1136             }
1137         }
1138
1139         // Finally, build the operation result and return it to the caller.
1140         OperationResult result = new OperationResult();
1141         result.setResultCode(207);
1142         result.setResult(buildGenericBulkResultSet(opResult, rejected));
1143
1144         // In the success case we don't want the entire result string to be
1145         // dumped into the metrics log, so concatenate it.
1146         String resultStringForMetricsLog = result.getResult();
1147         if (isSuccess(result)) {
1148             resultStringForMetricsLog =
1149                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1150         }
1151
1152         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1153                 new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1154                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1155                 override);
1156
1157         return result;
1158     }
1159
1160
1161     /**
1162      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1163      *
1164      * @param request - The request to be performed.
1165      * @param sb - The string builder to append the json data to
1166      * @throws DocumentStoreOperationException
1167      */
1168     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1169             throws DocumentStoreOperationException {
1170
1171         boolean retVal = true;
1172         // What kind of operation are we performing?
1173         switch (request.getOperationType()) {
1174
1175             // Create a new document.
1176             case CREATE:
1177
1178                 // Make sure that we were supplied a document payload.
1179                 if (request.getOperation().getDocument() == null) {
1180
1181                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1182                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1183                     return false;
1184                 }
1185
1186                 // Make sure that the supplied document URL is formatted
1187                 // correctly.
1188                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1189                     fails.add(generateRejectionEntry(request.getOperationType(),
1190                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1191                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1192                     return false;
1193                 }
1194
1195                 // Validate that the specified index actually exists before we
1196                 // try to perform the create.
1197                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1198
1199                     fails.add(generateRejectionEntry(request.getOperationType(),
1200                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1201                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1202                     return false;
1203                 }
1204
1205                 // If we were supplied an id for the new document, then
1206                 // include it in the bulk operation to Elastic Search
1207                 if (request.getId() == null) {
1208
1209                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1210
1211                     // Otherwise, we just leave that parameter off and ElasticSearch
1212                     // will generate one for us.
1213                 } else {
1214                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1215                             request.getId()));
1216                 }
1217
1218                 try {
1219                     // Append the document that we want to create.
1220                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1221                 } catch (JsonProcessingException e) {
1222                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1223                 }
1224
1225                 break;
1226
1227             // Update an existing document.
1228             case UPDATE:
1229
1230                 // Make sure that we were supplied a document payload.
1231                 if (request.getOperation().getDocument() == null) {
1232
1233                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1234                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1235                     return false;
1236                 }
1237
1238                 // Make sure that the supplied document URL is formatted
1239                 // correctly.
1240                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1241                     fails.add(generateRejectionEntry(request.getOperationType(),
1242                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1243                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1244                     return false;
1245                 }
1246
1247                 // Validate that the specified index actually exists before we
1248                 // try to perform the update.
1249                 if (!indexExists(request.getIndex())) {
1250
1251                     fails.add(generateRejectionEntry(request.getOperationType(),
1252                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1253                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1254                     return false;
1255                 }
1256
1257                 // Validate that the document we are trying to update actually
1258                 // exists before we try to perform the update.
1259                 if (!documentExists(request.getIndex(), request.getId())) {
1260
1261                     fails.add(generateRejectionEntry(request.getOperationType(),
1262                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1263                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1264                     return false;
1265                 }
1266
1267                 // It is mandatory that a version be supplied for an update operation,
1268                 // so validate that now.
1269                 if (request.getOperation().getMetaData().getEtag() == null) {
1270
1271                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1272                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1273                     return false;
1274                 }
1275
1276                 // Generate the update request...
1277                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1278                         request.getOperation().getMetaData().getEtag()));
1279
1280                 // ...and append the document that we want to update.
1281                 try {
1282                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1283                 } catch (JsonProcessingException e) {
1284                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1285                 }
1286                 break;
1287
1288             // Delete an existing document.
1289             case DELETE:
1290
1291                 // Make sure that the supplied document URL is formatted
1292                 // correctly.
1293                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1294                     fails.add(generateRejectionEntry(request.getOperationType(),
1295                             "Invalid document URL: " + request.getOperation().getMetaData().getUrl(),
1296                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1297                     return false;
1298                 }
1299
1300                 // Validate that the specified index actually exists before we
1301                 // try to perform the delete.
1302                 if (!indexExists(request.getIndex())) {
1303
1304                     fails.add(generateRejectionEntry(request.getOperationType(),
1305                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1306                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1307                     return false;
1308                 }
1309
1310                 // Validate that the document we are trying to update actually
1311                 // exists before we try to perform the delete.
1312                 if (!documentExists(request.getIndex(), request.getId())) {
1313
1314                     fails.add(generateRejectionEntry(request.getOperationType(),
1315                             "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(),
1316                             request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1317                     return false;
1318                 }
1319
1320                 // It is mandatory that a version be supplied for a delete operation,
1321                 // so validate that now.
1322                 if (request.getOperation().getMetaData().getEtag() == null) {
1323
1324                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1325                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1326                     return false;
1327                 }
1328
1329                 // Generate the delete request.
1330                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1331                         request.getOperation().getMetaData().getEtag()));
1332                 break;
1333             default:
1334         }
1335
1336         return retVal;
1337     }
1338
1339     private boolean indexExists(String index) throws DocumentStoreOperationException {
1340         return isSuccess(checkIndexExistence(index));
1341     }
1342
1343     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1344         return isSuccess(checkDocumentExistence(index, id));
1345     }
1346
1347     /**
1348      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1349      * document store.
1350      *
1351      * @param rejectReason - A message describing why the operation was rejected.
1352      * @param anId - The identifier associated with the document being acted on.
1353      * @param statusCode - An HTTP status code.
1354      * @return - A result set item.
1355      */
1356     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1357             String anId, int statusCode, String originalUrl) {
1358
1359         ElasticSearchError err = new ElasticSearchError();
1360         err.setReason(rejectReason);
1361
1362         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1363         op.setIndex(index);
1364         op.setId(anId);
1365         op.setStatus(statusCode);
1366         op.setError(err);
1367         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1368
1369         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1370
1371         switch (opType) {
1372             case CREATE:
1373                 rejectionResult.setCreate(op);
1374                 break;
1375             case UPDATE:
1376                 rejectionResult.setIndex(op);
1377                 break;
1378             case DELETE:
1379                 rejectionResult.setDelete(op);
1380                 break;
1381             default:
1382         }
1383
1384         return rejectionResult;
1385     }
1386
1387     /**
1388      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1389      * marshals it into a Java object.
1390      *
1391      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1392      * @return - The marshalled response.
1393      * @throws JsonParseException
1394      * @throws JsonMappingException
1395      * @throws IOException
1396      */
1397     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1398         if (jsonResult != null) {
1399             if (logger.isDebugEnabled()) {
1400                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1401             }
1402
1403             ObjectMapper mapper = new ObjectMapper();
1404             mapper.setSerializationInclusion(Include.NON_EMPTY);
1405
1406             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1407         }
1408
1409         return null;
1410     }
1411
1412     /**
1413      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1414      *
1415      * @param esResult - ElasticSearch bulk operations response.
1416      * @return - A generic result set.
1417      */
1418     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1419             List<ElasticSearchResultItem> rejectedOps) {
1420         int totalOps = 0;
1421         int totalSuccess = 0;
1422         int totalFails = 0;
1423
1424         if (logger.isDebugEnabled()) {
1425
1426             logger.debug("ESController: Build generic result set.  ES Results: "
1427                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1428         }
1429
1430         // Build a combined list of result items from the results returned
1431         // from ElasticSearch and the list of operations that we rejected
1432         // without sending to ElasticSearch.
1433         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1434         if (esResult != null) {
1435             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1436         }
1437         combinedResults.addAll(rejectedOps);
1438
1439         // Iterate over the individual results in the resulting result set.
1440         StringBuilder resultsBuilder = new StringBuilder();
1441         AtomicBoolean firstItem = new AtomicBoolean(true);
1442         for (ElasticSearchResultItem item : combinedResults) {
1443
1444             // Increment the operation counts.
1445             totalOps++;
1446             if (isSuccessCode(item.operationStatus().getStatus())) {
1447                 totalSuccess++;
1448             } else {
1449                 totalFails++;
1450             }
1451
1452             // Prepend a comma to our response string unless this it the
1453             // first result in the set.
1454             if (!firstItem.compareAndSet(true, false)) {
1455                 resultsBuilder.append(", ");
1456             }
1457
1458             // Append the current result as a generic json structure.
1459             resultsBuilder.append(item.toJson());
1460         }
1461
1462         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1463                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1464     }
1465
1466
1467     /**
1468      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1469      *
1470      * @param indexName - The index to look for.
1471      * @return - An operation result indicating the success or failure of the check.
1472      * @throws DocumentStoreOperationException
1473      */
1474     public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1475
1476         // Initialize operation result with a failure codes / fault string
1477         OperationResult opResult = new OperationResult();
1478         opResult.setResultCode(Status.INTERNAL_SERVER_ERROR.getStatusCode());
1479
1480         // Grab the current time so we can use it to generate a metrics log.
1481         MdcOverride override = getStartTime(new MdcOverride());
1482
1483         String fullUrl = getFullUrl("/" + indexName, false);
1484         HttpURLConnection conn = initializeConnection(fullUrl);
1485
1486         try {
1487             conn.setRequestMethod("HEAD");
1488
1489         } catch (ProtocolException e) {
1490             shutdownConnection(conn);
1491             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1492         }
1493
1494         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1495
1496         int resultCode;
1497         try {
1498             resultCode = conn.getResponseCode();
1499         } catch (IOException e) {
1500             shutdownConnection(conn);
1501             throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e);
1502         }
1503         logger.debug("Response Code : " + resultCode);
1504
1505         opResult.setResultCode(resultCode);
1506
1507         // Generate a metrics log so we can track how long the operation took.
1508         metricsLogger
1509                 .info(SearchDbMsgs.CHECK_INDEX_TIME,
1510                         new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode())
1511                                 .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()),
1512                         override, indexName);
1513
1514         shutdownConnection(conn);
1515
1516         return opResult;
1517     }
1518
1519
1520     private void buildDocumentResult(DocumentOperationResult result, String index)
1521             throws DocumentStoreOperationException {
1522
1523         JSONParser parser = new JSONParser();
1524         JSONObject root;
1525         try {
1526             root = (JSONObject) parser.parse(result.getResult());
1527             if (isSuccess(result)) {
1528                 // Success response object
1529                 Document doc = new Document();
1530                 doc.setEtag(result.getResultVersion());
1531                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1532
1533                 doc.setContent((JSONObject) root.get("_source"));
1534                 result.setDocument(doc);
1535
1536             } else {
1537                 // Error response object
1538                 JSONObject error = (JSONObject) root.get("error");
1539                 if (error != null) {
1540                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1541                 }
1542
1543             }
1544         } catch (Exception e) {
1545             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1546         }
1547
1548
1549     }
1550
1551     private String buildDocumentResponseUrl(String index, String id) {
1552         return ApiUtils.buildDocumentUri(index, id);
1553     }
1554
1555     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1556
1557         JSONParser parser = new JSONParser();
1558         JSONObject root;
1559
1560         try {
1561             root = (JSONObject) parser.parse(result.getResult());
1562             if (isSuccess(result)) {
1563                 JSONObject hits = (JSONObject) root.get("hits");
1564                 JSONArray hitArray = (JSONArray) hits.get("hits");
1565                 SearchHits searchHits = new SearchHits();
1566                 searchHits.setTotalHits(hits.get("total").toString());
1567                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1568
1569                 for (int i = 0; i < hitArray.size(); i++) {
1570                     JSONObject hit = (JSONObject) hitArray.get(i);
1571                     SearchHit searchHit = new SearchHit();
1572                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1573                     Document doc = new Document();
1574                     if (hit.get("_version") != null) {
1575                         doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1576                     }
1577
1578                     doc.setUrl(
1579                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1580                     doc.setContent((JSONObject) hit.get("_source"));
1581                     searchHit.setDocument(doc);
1582                     searchHitArray.add(searchHit);
1583                 }
1584                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1585                 result.setSearchResult(searchHits);
1586
1587                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1588                 if (aggregations != null) {
1589                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1590                     AggregationResults aggs = new AggregationResults();
1591                     aggs.setAggregations(aggResults);
1592                     result.setAggregationResult(aggs);
1593                 }
1594
1595                 // success
1596             } else {
1597                 JSONObject error = (JSONObject) root.get("error");
1598                 if (error != null) {
1599                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1600                 }
1601             }
1602         } catch (Exception e) {
1603             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1604         }
1605
1606     }
1607
1608     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1609         JSONParser parser = new JSONParser();
1610         JSONObject root;
1611         try {
1612             root = (JSONObject) parser.parse(result.getResult());
1613             if (isSuccess(result)) {
1614                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1615                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1616                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1617                 SuggestHits suggestHits = new SuggestHits();
1618                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1619
1620                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1621
1622                 for (int i = 0; i < optionsArray.size(); i++) {
1623                     JSONObject hit = (JSONObject) optionsArray.get(i);
1624
1625                     SuggestHit suggestHit = new SuggestHit();
1626                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1627                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1628                     Document doc = new Document();
1629                     if (hit.get("_version") != null) {
1630                         doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : "");
1631                     }
1632                     doc.setUrl(
1633                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1634
1635                     doc.setContent((JSONObject) hit.get("payload"));
1636                     suggestHit.setDocument(doc);
1637                     suggestHitArray.add(suggestHit);
1638                 }
1639                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1640                 result.setSuggestResult(suggestHits);
1641
1642                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1643                 if (aggregations != null) {
1644                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1645                     AggregationResults aggs = new AggregationResults();
1646                     aggs.setAggregations(aggResults);
1647                     result.setAggregationResult(aggs);
1648                 }
1649
1650                 // success
1651             } else {
1652                 JSONObject error = (JSONObject) root.get("error");
1653                 if (error != null) {
1654                     result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString()));
1655                 }
1656             }
1657         } catch (Exception e) {
1658             throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult());
1659         }
1660     }
1661 }