Use Builder Pattern to create Operation Results
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.eclipse.jetty.http.HttpStatus;
57 import org.json.simple.JSONArray;
58 import org.json.simple.JSONObject;
59 import org.json.simple.parser.JSONParser;
60 import org.json.simple.parser.ParseException;
61 import org.onap.aai.cl.api.LogFields;
62 import org.onap.aai.cl.api.LogLine;
63 import org.onap.aai.cl.api.Logger;
64 import org.onap.aai.cl.eelf.LoggerFactory;
65 import org.onap.aai.cl.mdc.MdcContext;
66 import org.onap.aai.cl.mdc.MdcOverride;
67 import org.onap.aai.sa.rest.AnalysisConfiguration;
68 import org.onap.aai.sa.rest.ApiUtils;
69 import org.onap.aai.sa.rest.BulkRequest;
70 import org.onap.aai.sa.rest.BulkRequest.OperationType;
71 import org.onap.aai.sa.rest.DocumentSchema;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
73 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
75 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
76 import org.onap.aai.sa.searchdbabstraction.entity.Document;
77 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
80 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
81 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
82 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
84 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
85 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
86 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
87 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
88 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
89 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
90 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
91 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
92
93 /**
94  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
95  */
96 public class ElasticSearchHttpController implements DocumentStoreInterface {
97
98     private static ElasticSearchHttpController instance = null;
99
100     private static final Logger logger =
101             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
102     private static final Logger metricsLogger =
103             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
104
105     private static final String JSON_ATTR_VERSION = "_version";
106     private static final String JSON_ATTR_ERROR = "error";
107     private static final String JSON_ATTR_REASON = "reason";
108
109     private static final String DEFAULT_TYPE = "default";
110     private static final String QUERY_PARAM_VERSION = "?version=";
111
112     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
113     private static final String MSG_RESPONSE_CODE = "Response Code : ";
114     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
115     private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
116     private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
117     private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
118             "Failed to get the response code from the connection.";
119     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
120
121     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
122             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
123     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
124             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
125     private static final String BULK_IMPORT_INDEX_TEMPLATE =
126             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
127     private static final String BULK_DELETE_TEMPLATE =
128             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
129
130     private final ElasticSearchConfig config;
131
132     protected AnalysisConfiguration analysisConfig;
133
134
135     public ElasticSearchHttpController(ElasticSearchConfig config) {
136         this.config = config;
137         analysisConfig = new AnalysisConfiguration();
138
139         try {
140             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
141             checkConnection();
142             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
143         } catch (Exception e) {
144             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
145                     e.getMessage());
146         }
147     }
148
149     public static ElasticSearchHttpController getInstance() {
150         synchronized (ElasticSearchHttpController.class) {
151             if (instance == null) {
152                 Properties properties = new Properties();
153                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
154                 try {
155                     properties.load(new FileInputStream(file));
156                 } catch (Exception e) {
157                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
158                             e.getLocalizedMessage());
159                 }
160
161                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
162                 instance = new ElasticSearchHttpController(config);
163             }
164         }
165
166         return instance;
167     }
168
169     public AnalysisConfiguration getAnalysisConfig() {
170         return analysisConfig;
171     }
172
173     @Override
174     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
175         try {
176             // Submit the request to ElasticSearch to create the index using a default document type.
177             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
178                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
179
180             // ElasticSearch will return us a 200 code on success when we
181             // want to report a 201, so translate the result here.
182             if (result.getResultCode() == Status.OK.getStatusCode()) {
183                 result.setResultCode(Status.CREATED.getStatusCode());
184             }
185
186             if (isSuccess(result)) {
187                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
188             }
189             return result;
190         } catch (DocumentStoreOperationException | IOException e) {
191             return new OperationResultBuilder().useDefaults()
192                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
193         }
194
195     }
196
197     @Override
198     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
199         try {
200             OperationResult result = createTable(index, dynamicSchema);
201
202             // ElasticSearch will return us a 200 code on success when we
203             // want to report a 201, so translate the result here.
204             if (result.getResultCode() == Status.OK.getStatusCode()) {
205                 result.setResultCode(Status.CREATED.getStatusCode());
206             }
207             if (isSuccess(result)) {
208                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
209             }
210             return result;
211         } catch (DocumentStoreOperationException e) {
212             return new OperationResultBuilder().useDefaults()
213                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
214         }
215     }
216
217     @Override
218     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
219         // Grab the current time so we can use it to generate a metrics log.
220         MdcOverride override = getStartTime(new MdcOverride());
221
222         String fullUrl = getFullUrl("/" + indexName + "/", false);
223         HttpURLConnection conn = initializeConnection(fullUrl);
224
225         logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
226
227         try {
228             conn.setRequestMethod("DELETE");
229         } catch (ProtocolException e) {
230             shutdownConnection(conn);
231             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
232         }
233
234         OperationResult opResult = handleResponse(conn);
235         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
236         shutdownConnection(conn);
237
238         return opResult;
239     }
240
241     private OperationResult checkConnection() throws IOException {
242         String fullUrl = getFullUrl("/_cluster/health", false);
243         URL url = null;
244         HttpURLConnection conn = null;
245
246         url = new URL(fullUrl);
247         conn = (HttpURLConnection) url.openConnection();
248         conn.setRequestMethod("GET");
249         conn.setDoOutput(true);
250         logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
251
252         int resultCode = conn.getResponseCode();
253         logger.debug("getClusterHealth() response Code : " + resultCode);
254
255         shutdownConnection(conn);
256
257         return new OperationResultBuilder().resultCode(resultCode).build();
258     }
259
260     private String getFullUrl(String resourceUrl, boolean isSecure) {
261
262         final String host = config.getIpAddress();
263         final String port = config.getHttpPort();
264
265         if (isSecure) {
266             return String.format("https://%s:%s%s", host, port, resourceUrl);
267         } else {
268             return String.format("http://%s:%s%s", host, port, resourceUrl);
269         }
270     }
271
272     private void shutdownConnection(HttpURLConnection connection) {
273         if (connection == null) {
274             return;
275         }
276
277         final String methodName = "shutdownConnection";
278         InputStream inputstream = null;
279         OutputStream outputstream = null;
280
281         try {
282             inputstream = connection.getInputStream();
283         } catch (IOException e) {
284             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
285         } finally {
286             if (inputstream != null) {
287                 try {
288                     inputstream.close();
289                 } catch (IOException e) {
290                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
291                 }
292             }
293         }
294
295         try {
296             outputstream = connection.getOutputStream();
297         } catch (IOException e) {
298             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
299         } finally {
300             if (outputstream != null) {
301                 try {
302                     outputstream.close();
303                 } catch (IOException e) {
304                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
305                 }
306             }
307         }
308
309         connection.disconnect();
310     }
311
312     // @Override
313     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
314             throws DocumentStoreOperationException {
315         if (indexSettings == null) {
316             logger.debug("No settings provided.");
317         }
318
319         if (indexMappings == null) {
320             logger.debug("No mappings provided.");
321         }
322
323         // Grab the current time so we can use it to generate a metrics log.
324         MdcOverride override = getStartTime(new MdcOverride());
325
326         String fullUrl = getFullUrl("/" + indexName + "/", false);
327         HttpURLConnection conn = initializeConnection(fullUrl);
328
329         try {
330             conn.setRequestMethod("PUT");
331         } catch (ProtocolException e) {
332             shutdownConnection(conn);
333             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
334         }
335
336         StringBuilder sb = new StringBuilder(128);
337         sb.append("{ \"settings\" : ");
338         sb.append(indexSettings);
339         sb.append(",");
340
341         sb.append("\"mappings\" : {");
342         sb.append("\"" + typeName + "\" :");
343         sb.append(indexMappings);
344         sb.append("}}");
345
346         try {
347             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
348         } catch (IOException e) {
349             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
350             throw new DocumentStoreOperationException(e.getMessage(), e);
351         }
352
353         logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
354         logger.debug("Request content: " + sb);
355
356         OperationResult opResult = handleResponse(conn);
357         shutdownConnection(conn);
358         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
359
360         return opResult;
361     }
362
363     /**
364      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
365      * index.
366      *
367      * @param indexName - The name of the index to be created
368      * @param settingsAndMappings - The actual JSON object that will define the index
369      * @return - The operation result of writing into Elasticsearch
370      * @throws DocumentStoreOperationException
371      */
372     protected OperationResult createTable(String indexName, String settingsAndMappings)
373             throws DocumentStoreOperationException {
374         // Grab the current time so we can use it to generate a metrics log.
375         MdcOverride override = getStartTime(new MdcOverride());
376
377         String fullUrl = getFullUrl("/" + indexName + "/", false);
378         HttpURLConnection conn = initializeConnection(fullUrl);
379
380         try {
381             conn.setRequestMethod("PUT");
382         } catch (ProtocolException e) {
383             shutdownConnection(conn);
384             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
385         }
386
387         try {
388             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
389         } catch (IOException e) {
390             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
391             throw new DocumentStoreOperationException(e.getMessage());
392         }
393
394         OperationResult result = handleResponse(conn);
395         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
396
397         return result;
398     }
399
400     @Override
401     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
402             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
403
404         if (!allowImplicitIndexCreation) {
405             // Before we do anything, make sure that the specified index actually exists in the
406             // document store - we don't want to rely on ElasticSearch to fail the document
407             // create because it could be configured to implicitly create a non-existent index,
408             // which can lead to hard-to-debug behaviour with queries down the road.
409             OperationResult indexExistsResult = checkIndexExistence(indexName);
410             if (!isSuccess(indexExistsResult)) {
411                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
412                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
413                         .result(resultMsg).failureCause(resultMsg).build();
414             }
415         }
416
417         if (document.getId() == null || document.getId().isEmpty()) {
418             return createDocumentWithoutId(indexName, document);
419         } else {
420             return createDocumentWithId(indexName, document);
421         }
422     }
423
424     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
425             throws DocumentStoreOperationException {
426         // check if the document already exists
427         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
428
429         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
430             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
431                 opResult.setFailureCause("A document with the same id already exists.");
432             } else {
433                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
434             }
435             opResult.setResultCode(Status.CONFLICT.getStatusCode());
436             return opResult;
437         }
438
439         // Grab the current time so we can use it to generate a metrics log.
440         MdcOverride override = getStartTime(new MdcOverride());
441
442         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
443         HttpURLConnection conn = initializeConnection(fullUrl);
444
445         try {
446             conn.setRequestMethod("PUT");
447         } catch (ProtocolException e) {
448             shutdownConnection(conn);
449             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
450         }
451
452         attachDocument(conn, document);
453
454         logger.debug("Sending 'PUT' request to: " + conn.getURL());
455
456         opResult = getOperationResult(conn);
457         buildDocumentResult(opResult, indexName);
458
459         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
460
461         shutdownConnection(conn);
462
463         return opResult;
464     }
465
466     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
467             throws DocumentStoreOperationException {
468         // Grab the current time so we can use it to generate a metrics log.
469         MdcOverride override = getStartTime(new MdcOverride());
470
471         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
472         HttpURLConnection conn = initializeConnection(fullUrl);
473
474         try {
475             conn.setRequestMethod("POST");
476         } catch (ProtocolException e) {
477             shutdownConnection(conn);
478             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
479         }
480
481         attachDocument(conn, document);
482
483         logger.debug("Sending 'POST' request to: " + conn.getURL());
484
485         DocumentOperationResult response = getOperationResult(conn);
486         buildDocumentResult(response, indexName);
487
488         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
489
490         shutdownConnection(conn);
491
492         return response;
493     }
494
495     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
496             throws DocumentStoreOperationException {
497         conn.setRequestProperty("Connection", "Close");
498         attachContent(conn, doc.getContentInJson());
499     }
500
501     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
502             throws DocumentStoreOperationException {
503         // Grab the current time so we can use it to generate a metrics log.
504         MdcOverride override = getStartTime(new MdcOverride());
505
506         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
507         HttpURLConnection conn = initializeConnection(fullUrl);
508
509         try {
510             conn.setRequestMethod("HEAD");
511         } catch (ProtocolException e) {
512             shutdownConnection(conn);
513             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
514         }
515
516         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
517
518         int resultCode;
519         try {
520             resultCode = conn.getResponseCode();
521         } catch (IOException e) {
522             shutdownConnection(conn);
523             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
524         }
525
526         logger.debug(MSG_RESPONSE_CODE + resultCode);
527
528         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
529                 .useDefaults().resultCode(resultCode).build();
530
531         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
532         shutdownConnection(conn);
533
534         return opResult;
535     }
536
537     @Override
538     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
539             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
540
541         if (!allowImplicitIndexCreation) {
542             // Before we do anything, make sure that the specified index actually exists in the
543             // document store - we don't want to rely on ElasticSearch to fail the document
544             // create because it could be configured to implicitly create a non-existent index,
545             // which can lead to hard-to-debug behaviour with queries down the road.
546             OperationResult indexExistsResult = checkIndexExistence(indexName);
547             if (!isSuccess(indexExistsResult)) {
548                 DocumentOperationResult opResult = new DocumentOperationResult();
549                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
550                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
551                 opResult.setResult(resultMsg);
552                 opResult.setFailureCause(resultMsg);
553                 return opResult;
554             }
555         }
556
557         // Grab the current time so we can use it to generate a metrics log.
558         MdcOverride override = getStartTime(new MdcOverride());
559
560         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
561                 + document.getVersion(), false);
562         HttpURLConnection conn = initializeConnection(fullUrl);
563
564         try {
565             conn.setRequestMethod("PUT");
566         } catch (ProtocolException e) {
567             shutdownConnection(conn);
568             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
569         }
570
571         attachDocument(conn, document);
572
573         logger.debug("Sending 'PUT' request to: " + conn.getURL());
574
575         DocumentOperationResult opResult = getOperationResult(conn);
576         buildDocumentResult(opResult, indexName);
577
578         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
579
580         shutdownConnection(conn);
581
582         return opResult;
583     }
584
585     @Override
586     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
587             throws DocumentStoreOperationException {
588         // Grab the current time so we can use it to generate a metrics log.
589         MdcOverride override = getStartTime(new MdcOverride());
590
591         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
592                 + document.getVersion(), false);
593         HttpURLConnection conn = initializeConnection(fullUrl);
594
595         try {
596             conn.setRequestMethod("DELETE");
597         } catch (ProtocolException e) {
598             shutdownConnection(conn);
599             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
600         }
601
602         logger.debug("\nSending 'DELETE' request to " + conn.getURL());
603
604         DocumentOperationResult opResult = getOperationResult(conn);
605         buildDocumentResult(opResult, indexName);
606         // supress the etag and url in response for delete as they are not required
607         if (opResult.getDocument() != null) {
608             opResult.getDocument().setEtag(null);
609             opResult.getDocument().setUrl(null);
610         }
611
612         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
613
614         shutdownConnection(conn);
615
616         return opResult;
617     }
618
619     @Override
620     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
621             throws DocumentStoreOperationException {
622         // Grab the current time so we can use it to generate a metrics log.
623         MdcOverride override = getStartTime(new MdcOverride());
624
625         String fullUrl = null;
626         if (document.getVersion() == null) {
627             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
628         } else {
629             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
630                     + document.getVersion(), false);
631         }
632         HttpURLConnection conn = initializeConnection(fullUrl);
633
634         logger.debug("\nSending 'GET' request to: " + conn.getURL());
635
636         DocumentOperationResult opResult = getOperationResult(conn);
637         buildDocumentResult(opResult, indexName);
638
639         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
640
641         shutdownConnection(conn);
642
643         return opResult;
644     }
645
646     @Override
647     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
648
649         String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
650
651         // Grab the current time so we can use it to generate a metrics log.
652         MdcOverride override = getStartTime(new MdcOverride());
653
654         HttpURLConnection conn = initializeConnection(fullUrl);
655
656         try {
657             conn.setRequestMethod("GET");
658         } catch (ProtocolException e) {
659             shutdownConnection(conn);
660             throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
661         }
662
663         logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
664
665         SearchOperationResult opResult = getSearchOperationResult(conn);
666         buildSearchResult(opResult, indexName);
667
668         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
669
670         return opResult;
671     }
672
673     @Override
674     public SearchOperationResult searchWithPayload(String indexName, String query)
675             throws DocumentStoreOperationException {
676         if (logger.isDebugEnabled()) {
677             logger.debug("Querying index: " + indexName + " with query string: " + query);
678         }
679
680         String fullUrl = getFullUrl("/" + indexName + "/_search", false);
681
682         // Grab the current time so we can use it to generate a metrics log.
683         MdcOverride override = getStartTime(new MdcOverride());
684
685         HttpURLConnection conn = initializeConnection(fullUrl);
686
687         try {
688             conn.setRequestMethod("POST");
689         } catch (ProtocolException e) {
690             shutdownConnection(conn);
691             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
692         }
693
694         attachContent(conn, query);
695
696         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
697         logger.debug("Request body =  Elasticsearch query = " + query);
698
699         SearchOperationResult opResult = getSearchOperationResult(conn);
700         buildSearchResult(opResult, indexName);
701
702         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
703
704         shutdownConnection(conn);
705
706         return opResult;
707     }
708
709
710     @Override
711     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
712             throws DocumentStoreOperationException {
713         if (logger.isDebugEnabled()) {
714             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
715         }
716
717         String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
718
719         // Grab the current time so we can use it to generate a metrics log.
720         MdcOverride override = getStartTime(new MdcOverride());
721
722         HttpURLConnection conn = initializeConnection(fullUrl);
723
724         try {
725             conn.setRequestMethod("POST");
726         } catch (ProtocolException e) {
727             shutdownConnection(conn);
728             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
729         }
730
731         attachContent(conn, query);
732
733         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
734         logger.debug("Request body =  Elasticsearch query = " + query);
735
736         SearchOperationResult opResult = getSearchOperationResult(conn);
737         buildSuggestResult(opResult, indexName);
738
739         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
740
741         shutdownConnection(conn);
742
743         return opResult;
744     }
745
746     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
747         OutputStream outputStream = null;
748         OutputStreamWriter out = null;
749
750         try {
751             outputStream = conn.getOutputStream();
752         } catch (IOException e) {
753             shutdownConnection(conn);
754             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
755         }
756
757         out = new OutputStreamWriter(outputStream);
758
759         try {
760             out.write(content);
761             out.close();
762         } catch (IOException e) {
763             shutdownConnection(conn);
764             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
765         }
766     }
767
768     private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
769         URL url = null;
770         HttpURLConnection conn = null;
771
772         try {
773             url = new URL(fullUrl);
774         } catch (MalformedURLException e) {
775             throw new DocumentStoreOperationException("Error building a URL with " + url, e);
776         }
777
778         try {
779             conn = (HttpURLConnection) url.openConnection();
780             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
781             conn.setDoOutput(true);
782         } catch (IOException e) {
783             shutdownConnection(conn);
784             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
785         }
786
787         return conn;
788     }
789
790     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
791         return handleResponse(conn, new OperationResultBuilder().useDefaults());
792     }
793
794     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
795             throws DocumentStoreOperationException {
796         int resultCode;
797
798         try {
799             resultCode = conn.getResponseCode();
800         } catch (IOException e) {
801             shutdownConnection(conn);
802             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
803         }
804
805         logger.debug(MSG_RESPONSE_CODE + resultCode);
806
807         InputStream inputStream = null;
808
809         if (!isSuccessCode(resultCode)) {
810             inputStream = conn.getErrorStream();
811         } else {
812             try {
813                 inputStream = conn.getInputStream();
814             } catch (IOException e) {
815                 shutdownConnection(conn);
816                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
817             }
818         }
819
820         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
821         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
822
823         StringBuilder result = new StringBuilder(128);
824         String string = null;
825
826         try {
827             while ((string = bufferedreader.readLine()) != null) {
828                 result.append(string).append("\n");
829             }
830         } catch (IOException e) {
831             shutdownConnection(conn);
832             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
833         }
834
835         if (resultCode == Status.CONFLICT.getStatusCode()) {
836             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
837         } else {
838             rb.resultCode(resultCode);
839         }
840         if (logger.isDebugEnabled()) {
841             logger.debug("Raw result string from ElasticSearch = " + result.toString());
842         }
843         rb.result(result.toString());
844         rb.resultVersion(extractVersion(result.toString()));
845         return rb.build();
846     }
847
848     private String extractVersion(String result) {
849         JSONParser parser = new JSONParser();
850         String version = null;
851         try {
852             JSONObject root = (JSONObject) parser.parse(result);
853             if (root.get(JSON_ATTR_VERSION) != null) {
854                 version = root.get(JSON_ATTR_VERSION).toString();
855             }
856         } catch (ParseException e) {
857             // Not all responses from ElasticSearch include a version, so
858             // if we don't get one back, just return an empty string rather
859             // than trigger a false failure.
860             version = "";
861         }
862         return version;
863     }
864
865     /**
866      * This convenience method gets the current system time and stores it in an attribute in the supplied
867      * {@link MdcOverride} object so that it can be used later by the metrics logger.
868      *
869      * @param override - The {@link MdcOverride} object to update.
870      * @return - The supplied {@link MdcOverride} object.
871      */
872     private MdcOverride getStartTime(MdcOverride override) {
873
874         // Grab the current time...
875         long startTimeInMs = System.currentTimeMillis();
876
877         // ...and add it as an attribute to the supplied MDC Override
878         // object.
879         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
880         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
881
882         // Return the MdcOverride object that we were passed.
883         // This looks odd, but it allows us to do stuff like:
884         //
885         // MdcOverride ov = getStartTime(new MdcOverride())
886         //
887         // which is quite handy, but also allows us to pass in an existing
888         // MdcOverride object which already has some attributes set.
889         return override;
890     }
891
892     private boolean isSuccess(OperationResult result) {
893         return isSuccessCode(result.getResultCode());
894     }
895
896     private boolean isSuccessCode(int statusCode) {
897         return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
898     }
899
900     @Override
901     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
902         if (logger.isDebugEnabled()) {
903             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
904
905             for (BulkRequest request : requests) {
906                 dbgString.append("[").append(request).append("] ");
907             }
908
909             logger.debug(dbgString.toString());
910         }
911
912         // Grab the current time so we can use it to generate a metrics log.
913         MdcOverride override = getStartTime(new MdcOverride());
914
915         // Parse the supplied set of operations.
916         // Iterate over the list of operations which we were provided and
917         // translate them into a format that ElasticSearh understands.
918         int opCount = 0;
919         StringBuilder esOperationSet = new StringBuilder(128);
920         List<ElasticSearchResultItem> rejected = new ArrayList<>();
921         for (BulkRequest request : requests) {
922
923             // Convert the request to the syntax ElasticSearch likes.
924             if (buildEsOperation(request, esOperationSet, rejected)) {
925                 opCount++;
926             }
927         }
928
929         ElasticSearchBulkOperationResult opResult = null;
930         if (opCount > 0) {
931
932             // Open an HTTP connection to the ElasticSearch back end.
933             String fullUrl = getFullUrl("/_bulk", false);
934             URL url;
935             HttpURLConnection conn;
936             try {
937
938                 url = new URL(fullUrl);
939                 conn = (HttpURLConnection) url.openConnection();
940                 conn.setRequestMethod("PUT");
941                 conn.setDoOutput(true);
942                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
943                 conn.setRequestProperty("Connection", "Close");
944
945             } catch (IOException e) {
946
947                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
948                 if (logger.isDebugEnabled()) {
949                     logger.debug(Throwables.getStackTraceAsString(e));
950                 }
951
952                 throw new DocumentStoreOperationException(
953                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
954             }
955
956             StringBuilder bulkResult = new StringBuilder(128);
957             try {
958                 // Create an output stream to write our request to.
959                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
960
961                 if (logger.isDebugEnabled()) {
962                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
963                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
964                 }
965
966                 // Write the resulting request string to our output stream. (this sends the request to ES?)
967                 out.write(esOperationSet.toString());
968                 out.close();
969
970                 // Open an input stream on our connection in order to read back the results.
971                 InputStream is = conn.getInputStream();
972                 InputStreamReader inputstreamreader = new InputStreamReader(is);
973                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
974
975                 // Read the contents of the input stream into our result string...
976                 String esResponseString = null;
977
978                 while ((esResponseString = bufferedreader.readLine()) != null) {
979                     bulkResult.append(esResponseString).append("\n");
980                 }
981
982             } catch (IOException e) {
983
984                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
985                 if (logger.isDebugEnabled()) {
986                     StringWriter sw = new StringWriter();
987                     e.printStackTrace(new PrintWriter(sw));
988                     logger.debug(sw.toString());
989                 }
990
991                 throw new DocumentStoreOperationException(
992                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
993             }
994
995             if (logger.isDebugEnabled()) {
996                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
997             }
998
999             // ...and marshal the resulting string into a Java object.
1000             try {
1001                 opResult = marshallEsBulkResult(bulkResult.toString());
1002
1003             } catch (IOException e) {
1004
1005                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
1006                 if (logger.isDebugEnabled()) {
1007                     logger.debug(Throwables.getStackTraceAsString(e));
1008                 }
1009
1010                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
1011                         e);
1012             }
1013         }
1014
1015         OperationResult result = new OperationResultBuilder() //
1016                 .resultCode(HttpStatus.MULTI_STATUS_207) //
1017                 .result(buildGenericBulkResultSet(opResult, rejected)) //
1018                 .build();
1019
1020         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
1021         // it.
1022         String resultStringForMetricsLog = result.getResult();
1023         if (isSuccess(result)) {
1024             resultStringForMetricsLog =
1025                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
1026         }
1027
1028         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
1029                 new LogFields() //
1030                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
1031                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
1032                 override);
1033
1034         return result;
1035     }
1036
1037     /**
1038      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1039      *
1040      * @param request - The request to be performed.
1041      * @param sb - The string builder to append the json data to
1042      * @throws DocumentStoreOperationException
1043      */
1044     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1045             throws DocumentStoreOperationException {
1046
1047         boolean retVal = true;
1048         // What kind of operation are we performing?
1049         switch (request.getOperationType()) {
1050
1051             // Create a new document.
1052             case CREATE:
1053
1054                 // Make sure that we were supplied a document payload.
1055                 if (request.getOperation().getDocument() == null) {
1056
1057                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1058                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1059                     return false;
1060                 }
1061
1062                 // Make sure that the supplied document URL is formatted
1063                 // correctly.
1064                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1065                     fails.add(generateRejectionEntry(request.getOperationType(),
1066                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1067                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1068                     return false;
1069                 }
1070
1071                 // Validate that the specified index actually exists before we
1072                 // try to perform the create.
1073                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1074
1075                     fails.add(generateRejectionEntry(request.getOperationType(),
1076                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1077                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1078                     return false;
1079                 }
1080
1081                 // If we were supplied an id for the new document, then
1082                 // include it in the bulk operation to Elastic Search
1083                 if (request.getId() == null) {
1084
1085                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1086
1087                     // Otherwise, we just leave that parameter off and ElasticSearch
1088                     // will generate one for us.
1089                 } else {
1090                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1091                             request.getId()));
1092                 }
1093
1094                 try {
1095                     // Append the document that we want to create.
1096                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1097                 } catch (JsonProcessingException e) {
1098                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1099                 }
1100
1101                 break;
1102
1103             // Update an existing document.
1104             case UPDATE:
1105
1106                 // Make sure that we were supplied a document payload.
1107                 if (request.getOperation().getDocument() == null) {
1108
1109                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1110                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1111                     return false;
1112                 }
1113
1114                 // Make sure that the supplied document URL is formatted
1115                 // correctly.
1116                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1117                     fails.add(generateRejectionEntry(request.getOperationType(),
1118                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1119                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1120                     return false;
1121                 }
1122
1123                 // Validate that the specified index actually exists before we
1124                 // try to perform the update.
1125                 if (!indexExists(request.getIndex())) {
1126
1127                     fails.add(generateRejectionEntry(request.getOperationType(),
1128                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1129                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1130                     return false;
1131                 }
1132
1133                 // Validate that the document we are trying to update actually
1134                 // exists before we try to perform the update.
1135                 if (!documentExists(request.getIndex(), request.getId())) {
1136
1137                     fails.add(generateRejectionEntry(request.getOperationType(),
1138                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1139                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1140                     return false;
1141                 }
1142
1143                 // It is mandatory that a version be supplied for an update operation,
1144                 // so validate that now.
1145                 if (request.getOperation().getMetaData().getEtag() == null) {
1146
1147                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1148                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1149                     return false;
1150                 }
1151
1152                 // Generate the update request...
1153                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1154                         request.getOperation().getMetaData().getEtag()));
1155
1156                 // ...and append the document that we want to update.
1157                 try {
1158                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1159                 } catch (JsonProcessingException e) {
1160                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1161                 }
1162                 break;
1163
1164             // Delete an existing document.
1165             case DELETE:
1166
1167                 // Make sure that the supplied document URL is formatted
1168                 // correctly.
1169                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1170                     fails.add(generateRejectionEntry(request.getOperationType(),
1171                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1172                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1173                     return false;
1174                 }
1175
1176                 // Validate that the specified index actually exists before we
1177                 // try to perform the delete.
1178                 if (!indexExists(request.getIndex())) {
1179
1180                     fails.add(generateRejectionEntry(request.getOperationType(),
1181                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1182                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1183                     return false;
1184                 }
1185
1186                 // Validate that the document we are trying to update actually
1187                 // exists before we try to perform the delete.
1188                 if (!documentExists(request.getIndex(), request.getId())) {
1189
1190                     fails.add(generateRejectionEntry(request.getOperationType(),
1191                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1192                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1193                     return false;
1194                 }
1195
1196                 // It is mandatory that a version be supplied for a delete operation,
1197                 // so validate that now.
1198                 if (request.getOperation().getMetaData().getEtag() == null) {
1199
1200                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1201                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1202                     return false;
1203                 }
1204
1205                 // Generate the delete request.
1206                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1207                         request.getOperation().getMetaData().getEtag()));
1208                 break;
1209             default:
1210         }
1211
1212         return retVal;
1213     }
1214
1215     private boolean indexExists(String index) throws DocumentStoreOperationException {
1216         return isSuccess(checkIndexExistence(index));
1217     }
1218
1219     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1220         return isSuccess(checkDocumentExistence(index, id));
1221     }
1222
1223     /**
1224      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1225      * document store.
1226      *
1227      * @param rejectReason - A message describing why the operation was rejected.
1228      * @param anId - The identifier associated with the document being acted on.
1229      * @param statusCode - An HTTP status code.
1230      * @return - A result set item.
1231      */
1232     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1233             String anId, int statusCode, String originalUrl) {
1234
1235         ElasticSearchError err = new ElasticSearchError();
1236         err.setReason(rejectReason);
1237
1238         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1239         op.setIndex(index);
1240         op.setId(anId);
1241         op.setStatus(statusCode);
1242         op.setError(err);
1243         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1244
1245         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1246
1247         switch (opType) {
1248             case CREATE:
1249                 rejectionResult.setCreate(op);
1250                 break;
1251             case UPDATE:
1252                 rejectionResult.setIndex(op);
1253                 break;
1254             case DELETE:
1255                 rejectionResult.setDelete(op);
1256                 break;
1257             default:
1258         }
1259
1260         return rejectionResult;
1261     }
1262
1263     /**
1264      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1265      * marshals it into a Java object.
1266      *
1267      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1268      * @return - The marshalled response.
1269      * @throws JsonParseException
1270      * @throws JsonMappingException
1271      * @throws IOException
1272      */
1273     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1274         if (jsonResult != null) {
1275             if (logger.isDebugEnabled()) {
1276                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1277             }
1278
1279             ObjectMapper mapper = new ObjectMapper();
1280             mapper.setSerializationInclusion(Include.NON_EMPTY);
1281
1282             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1283         }
1284
1285         return null;
1286     }
1287
1288     /**
1289      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1290      *
1291      * @param esResult - ElasticSearch bulk operations response.
1292      * @return - A generic result set.
1293      */
1294     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1295             List<ElasticSearchResultItem> rejectedOps) {
1296         int totalOps = 0;
1297         int totalSuccess = 0;
1298         int totalFails = 0;
1299
1300         if (logger.isDebugEnabled()) {
1301
1302             logger.debug("ESController: Build generic result set.  ES Results: "
1303                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1304         }
1305
1306         // Build a combined list of result items from the results returned
1307         // from ElasticSearch and the list of operations that we rejected
1308         // without sending to ElasticSearch.
1309         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1310         if (esResult != null) {
1311             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1312         }
1313         combinedResults.addAll(rejectedOps);
1314
1315         // Iterate over the individual results in the resulting result set.
1316         StringBuilder resultsBuilder = new StringBuilder();
1317         AtomicBoolean firstItem = new AtomicBoolean(true);
1318         for (ElasticSearchResultItem item : combinedResults) {
1319
1320             // Increment the operation counts.
1321             totalOps++;
1322             if (isSuccessCode(item.operationStatus().getStatus())) {
1323                 totalSuccess++;
1324             } else {
1325                 totalFails++;
1326             }
1327
1328             // Prepend a comma to our response string unless this it the
1329             // first result in the set.
1330             if (!firstItem.compareAndSet(true, false)) {
1331                 resultsBuilder.append(", ");
1332             }
1333
1334             // Append the current result as a generic json structure.
1335             resultsBuilder.append(item.toJson());
1336         }
1337
1338         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1339                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1340     }
1341
1342
1343     /**
1344      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
1345      *
1346      * @param indexName - The index to look for.
1347      * @return - An operation result indicating the success or failure of the check.
1348      * @throws DocumentStoreOperationException
1349      */
1350     public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
1351         // Grab the current time so we can use it to generate a metrics log.
1352         MdcOverride override = getStartTime(new MdcOverride());
1353
1354         String fullUrl = getFullUrl("/" + indexName, false);
1355         HttpURLConnection conn = initializeConnection(fullUrl);
1356
1357         try {
1358             conn.setRequestMethod("HEAD");
1359
1360         } catch (ProtocolException e) {
1361             shutdownConnection(conn);
1362             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
1363         }
1364
1365         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
1366
1367         int resultCode;
1368         try {
1369             resultCode = conn.getResponseCode();
1370         } catch (IOException e) {
1371             shutdownConnection(conn);
1372             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
1373         }
1374         logger.debug(MSG_RESPONSE_CODE + resultCode);
1375
1376         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
1377         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
1378         shutdownConnection(conn);
1379
1380         return opResult;
1381     }
1382
1383     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1384         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1385     }
1386
1387     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1388             throws DocumentStoreOperationException {
1389         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1390     }
1391
1392     private void buildDocumentResult(DocumentOperationResult result, String index)
1393             throws DocumentStoreOperationException {
1394
1395         JSONParser parser = new JSONParser();
1396         JSONObject root;
1397         try {
1398             root = (JSONObject) parser.parse(result.getResult());
1399             if (isSuccess(result)) {
1400                 // Success response object
1401                 Document doc = new Document();
1402                 doc.setEtag(result.getResultVersion());
1403                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1404
1405                 doc.setContent((JSONObject) root.get("_source"));
1406                 result.setDocument(doc);
1407
1408             } else {
1409                 // Error response object
1410                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1411                 if (error != null) {
1412                     result.setError(
1413                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1414                 }
1415
1416             }
1417         } catch (Exception e) {
1418             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1419         }
1420     }
1421
1422     private String buildDocumentResponseUrl(String index, String id) {
1423         return ApiUtils.buildDocumentUri(index, id);
1424     }
1425
1426     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1427         JSONParser parser = new JSONParser();
1428         JSONObject root;
1429
1430         try {
1431             root = (JSONObject) parser.parse(result.getResult());
1432             if (isSuccess(result)) {
1433                 JSONObject hits = (JSONObject) root.get("hits");
1434                 JSONArray hitArray = (JSONArray) hits.get("hits");
1435                 SearchHits searchHits = new SearchHits();
1436                 searchHits.setTotalHits(hits.get("total").toString());
1437                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1438
1439                 for (int i = 0; i < hitArray.size(); i++) {
1440                     JSONObject hit = (JSONObject) hitArray.get(i);
1441                     SearchHit searchHit = new SearchHit();
1442                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1443                     Document doc = new Document();
1444                     if (hit.get(JSON_ATTR_VERSION) != null) {
1445                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1446                     }
1447
1448                     doc.setUrl(
1449                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1450                     doc.setContent((JSONObject) hit.get("_source"));
1451                     searchHit.setDocument(doc);
1452                     searchHitArray.add(searchHit);
1453                 }
1454                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1455                 result.setSearchResult(searchHits);
1456
1457                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1458                 if (aggregations != null) {
1459                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1460                     AggregationResults aggs = new AggregationResults();
1461                     aggs.setAggregations(aggResults);
1462                     result.setAggregationResult(aggs);
1463                 }
1464
1465                 // success
1466             } else {
1467                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1468                 if (error != null) {
1469                     result.setError(
1470                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1471                 }
1472             }
1473         } catch (Exception e) {
1474             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1475         }
1476     }
1477
1478     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1479         JSONParser parser = new JSONParser();
1480         JSONObject root;
1481         try {
1482             root = (JSONObject) parser.parse(result.getResult());
1483             if (isSuccess(result)) {
1484                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1485                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1486                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1487                 SuggestHits suggestHits = new SuggestHits();
1488                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1489
1490                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1491
1492                 for (int i = 0; i < optionsArray.size(); i++) {
1493                     JSONObject hit = (JSONObject) optionsArray.get(i);
1494
1495                     SuggestHit suggestHit = new SuggestHit();
1496                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1497                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1498                     Document doc = new Document();
1499                     if (hit.get(JSON_ATTR_VERSION) != null) {
1500                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1501                     }
1502                     doc.setUrl(
1503                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1504
1505                     doc.setContent((JSONObject) hit.get("payload"));
1506                     suggestHit.setDocument(doc);
1507                     suggestHitArray.add(suggestHit);
1508                 }
1509                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1510                 result.setSuggestResult(suggestHits);
1511
1512                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1513                 if (aggregations != null) {
1514                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1515                     AggregationResults aggs = new AggregationResults();
1516                     aggs.setAggregations(aggResults);
1517                     result.setAggregationResult(aggs);
1518                 }
1519
1520                 // success
1521             } else {
1522                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1523                 if (error != null) {
1524                     result.setError(
1525                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1526                 }
1527             }
1528         } catch (Exception e) {
1529             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1530         }
1531     }
1532
1533     /**
1534      * Record the timing of the operation in the metrics log.
1535      *
1536      */
1537     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1538             String... args) {
1539         metricsLogger.info(message,
1540                 new LogFields() //
1541                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1542                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),
1543                 override, args);
1544     }
1545 }