5734c4077dce9057c855d4397487abada15abea8
[aai/search-data-service.git] / src / main / java / org / onap / aai / sa / searchdbabstraction / elasticsearch / dao / ElasticSearchHttpController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao;
23
24 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
25 import static javax.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;
26 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
27
28 import com.fasterxml.jackson.annotation.JsonInclude.Include;
29 import com.fasterxml.jackson.core.JsonParseException;
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.JsonMappingException;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import com.google.common.base.Throwables;
34 import java.io.BufferedReader;
35 import java.io.File;
36 import java.io.FileInputStream;
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.InputStreamReader;
40 import java.io.OutputStream;
41 import java.io.OutputStreamWriter;
42 import java.io.PrintWriter;
43 import java.io.StringWriter;
44 import java.net.HttpURLConnection;
45 import java.net.MalformedURLException;
46 import java.net.ProtocolException;
47 import java.net.URL;
48 import java.text.SimpleDateFormat;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.List;
52 import java.util.Properties;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import javax.ws.rs.core.Response.Status;
55 import javax.ws.rs.core.Response.Status.Family;
56 import org.eclipse.jetty.http.HttpStatus;
57 import org.json.simple.JSONArray;
58 import org.json.simple.JSONObject;
59 import org.json.simple.parser.JSONParser;
60 import org.json.simple.parser.ParseException;
61 import org.onap.aai.cl.api.LogFields;
62 import org.onap.aai.cl.api.LogLine;
63 import org.onap.aai.cl.api.Logger;
64 import org.onap.aai.cl.eelf.LoggerFactory;
65 import org.onap.aai.cl.mdc.MdcContext;
66 import org.onap.aai.cl.mdc.MdcOverride;
67 import org.onap.aai.sa.rest.AnalysisConfiguration;
68 import org.onap.aai.sa.rest.ApiUtils;
69 import org.onap.aai.sa.rest.BulkRequest;
70 import org.onap.aai.sa.rest.BulkRequest.OperationType;
71 import org.onap.aai.sa.rest.DocumentSchema;
72 import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig;
73 import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException;
74 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult;
75 import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults;
76 import org.onap.aai.sa.searchdbabstraction.entity.Document;
77 import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult;
78 import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult;
79 import org.onap.aai.sa.searchdbabstraction.entity.OperationResult;
80 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder;
81 import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type;
82 import org.onap.aai.sa.searchdbabstraction.entity.SearchHit;
83 import org.onap.aai.sa.searchdbabstraction.entity.SearchHits;
84 import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult;
85 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit;
86 import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits;
87 import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs;
88 import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil;
89 import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil;
90 import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator;
91 import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants;
92
93 /**
94  * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface.
95  */
96 public class ElasticSearchHttpController implements DocumentStoreInterface {
97
98     private static ElasticSearchHttpController instance = null;
99
100     private static final Logger logger =
101             LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName());
102     private static final Logger metricsLogger =
103             LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName());
104
105     private static final String JSON_ATTR_VERSION = "_version";
106     private static final String JSON_ATTR_ERROR = "error";
107     private static final String JSON_ATTR_REASON = "reason";
108
109     private static final String DEFAULT_TYPE = "default";
110     private static final String QUERY_PARAM_VERSION = "?version=";
111
112     private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: ";
113     private static final String MSG_RESPONSE_CODE = "Response Code : ";
114     private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: ";
115     private static final String MSG_HTTP_PUT_FAILED = "Failed to set HTTP request method to PUT.";
116     private static final String MSG_HTTP_POST_FAILED = "Failed to set HTTP request method to POST.";
117     private static final String FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION =
118             "Failed to get the response code from the connection.";
119     private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response.";
120
121     private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE =
122             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n";
123     private static final String BULK_CREATE_WITH_INDEX_TEMPLATE =
124             "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n";
125     private static final String BULK_IMPORT_INDEX_TEMPLATE =
126             "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n";
127     private static final String BULK_DELETE_TEMPLATE =
128             "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n";
129
130     private final ElasticSearchConfig config;
131
132     protected AnalysisConfiguration analysisConfig;
133
134
135     public ElasticSearchHttpController(ElasticSearchConfig config) {
136         this.config = config;
137         analysisConfig = new AnalysisConfiguration();
138
139         try {
140             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false));
141             checkConnection();
142             logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false));
143         } catch (Exception e) {
144             logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false),
145                     e.getMessage());
146         }
147     }
148
149     public static ElasticSearchHttpController getInstance() {
150         synchronized (ElasticSearchHttpController.class) {
151             if (instance == null) {
152                 Properties properties = new Properties();
153                 File file = new File(SearchDbConstants.ES_CONFIG_FILE);
154                 try {
155                     properties.load(new FileInputStream(file));
156                 } catch (Exception e) {
157                     logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance",
158                             e.getLocalizedMessage());
159                 }
160
161                 ElasticSearchConfig config = new ElasticSearchConfig(properties);
162                 instance = new ElasticSearchHttpController(config);
163             }
164         }
165
166         return instance;
167     }
168
169     public AnalysisConfiguration getAnalysisConfig() {
170         return analysisConfig;
171     }
172
173     @Override
174     public OperationResult createIndex(String index, DocumentSchema documentSchema) {
175         try {
176             // Submit the request to ElasticSearch to create the index using a default document type.
177             OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(),
178                     DocumentSchemaUtil.generateDocumentMappings(documentSchema));
179
180             // ElasticSearch will return us a 200 code on success when we
181             // want to report a 201, so translate the result here.
182             if (result.getResultCode() == Status.OK.getStatusCode()) {
183                 result.setResultCode(Status.CREATED.getStatusCode());
184             }
185
186             if (isSuccess(result)) {
187                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
188             }
189             return result;
190         } catch (DocumentStoreOperationException | IOException e) {
191             return new OperationResultBuilder().useDefaults()
192                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
193         }
194
195     }
196
197     @Override
198     public OperationResult createDynamicIndex(String index, String dynamicSchema) {
199         try {
200             OperationResult result = createTable(index, dynamicSchema);
201
202             // ElasticSearch will return us a 200 code on success when we
203             // want to report a 201, so translate the result here.
204             if (result.getResultCode() == Status.OK.getStatusCode()) {
205                 result.setResultCode(Status.CREATED.getStatusCode());
206             }
207             if (isSuccess(result)) {
208                 result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}");
209             }
210             return result;
211         } catch (DocumentStoreOperationException e) {
212             return new OperationResultBuilder().useDefaults()
213                     .failureCause("Document store operation failure.  Cause: " + e.getMessage()).build();
214         }
215     }
216
217     @Override
218     public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException {
219         // Grab the current time so we can use it to generate a metrics log.
220         MdcOverride override = getStartTime(new MdcOverride());
221
222         String fullUrl = getFullUrl("/" + indexName + "/", false);
223         HttpURLConnection conn = initializeConnection(fullUrl);
224
225         logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL());
226
227         try {
228             conn.setRequestMethod("DELETE");
229         } catch (ProtocolException e) {
230             shutdownConnection(conn);
231             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
232         }
233
234         OperationResult opResult = handleResponse(conn);
235         logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName);
236         shutdownConnection(conn);
237
238         return opResult;
239     }
240
241     // @Override
242     protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings)
243             throws DocumentStoreOperationException {
244         if (indexSettings == null) {
245             logger.debug("No settings provided.");
246         }
247
248         if (indexMappings == null) {
249             logger.debug("No mappings provided.");
250         }
251
252         // Grab the current time so we can use it to generate a metrics log.
253         MdcOverride override = getStartTime(new MdcOverride());
254
255         String fullUrl = getFullUrl("/" + indexName + "/", false);
256         HttpURLConnection conn = initializeConnection(fullUrl);
257
258         try {
259             conn.setRequestMethod("PUT");
260         } catch (ProtocolException e) {
261             shutdownConnection(conn);
262             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
263         }
264
265         StringBuilder sb = new StringBuilder(128);
266         sb.append("{ \"settings\" : ");
267         sb.append(indexSettings);
268         sb.append(",");
269
270         sb.append("\"mappings\" : {");
271         sb.append("\"" + typeName + "\" :");
272         sb.append(indexMappings);
273         sb.append("}}");
274
275         try {
276             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString()));
277         } catch (IOException e) {
278             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
279             throw new DocumentStoreOperationException(e.getMessage(), e);
280         }
281
282         logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL());
283         logger.debug("Request content: " + sb);
284
285         OperationResult opResult = handleResponse(conn);
286         shutdownConnection(conn);
287         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName);
288
289         return opResult;
290     }
291
292     /**
293      * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the
294      * index.
295      *
296      * @param indexName - The name of the index to be created
297      * @param settingsAndMappings - The actual JSON object that will define the index
298      * @return - The operation result of writing into Elasticsearch
299      * @throws DocumentStoreOperationException
300      */
301     protected OperationResult createTable(String indexName, String settingsAndMappings)
302             throws DocumentStoreOperationException {
303         // Grab the current time so we can use it to generate a metrics log.
304         MdcOverride override = getStartTime(new MdcOverride());
305
306         String fullUrl = getFullUrl("/" + indexName + "/", false);
307         HttpURLConnection conn = initializeConnection(fullUrl);
308
309         try {
310             conn.setRequestMethod("PUT");
311         } catch (ProtocolException e) {
312             shutdownConnection(conn);
313             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
314         }
315
316         try {
317             attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings));
318         } catch (IOException e) {
319             logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e);
320             throw new DocumentStoreOperationException(e.getMessage());
321         }
322
323         OperationResult result = handleResponse(conn);
324         logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName);
325
326         return result;
327     }
328
329     @Override
330     public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document,
331             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
332
333         if (!allowImplicitIndexCreation) {
334             // Before we do anything, make sure that the specified index actually exists in the
335             // document store - we don't want to rely on ElasticSearch to fail the document
336             // create because it could be configured to implicitly create a non-existent index,
337             // which can lead to hard-to-debug behaviour with queries down the road.
338             OperationResult indexExistsResult = checkIndexExistence(indexName);
339             if (!isSuccess(indexExistsResult)) {
340                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
341                 return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND)
342                         .result(resultMsg).failureCause(resultMsg).build();
343             }
344         }
345
346         if (document.getId() == null || document.getId().isEmpty()) {
347             return createDocumentWithoutId(indexName, document);
348         } else {
349             return createDocumentWithId(indexName, document);
350         }
351     }
352
353     @Override
354     public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document,
355             boolean allowImplicitIndexCreation) throws DocumentStoreOperationException {
356
357         if (!allowImplicitIndexCreation) {
358             // Before we do anything, make sure that the specified index actually exists in the
359             // document store - we don't want to rely on ElasticSearch to fail the document
360             // create because it could be configured to implicitly create a non-existent index,
361             // which can lead to hard-to-debug behaviour with queries down the road.
362             OperationResult indexExistsResult = checkIndexExistence(indexName);
363             if (!isSuccess(indexExistsResult)) {
364                 DocumentOperationResult opResult = new DocumentOperationResult();
365                 opResult.setResultCode(Status.NOT_FOUND.getStatusCode());
366                 String resultMsg = "Document Index '" + indexName + "' does not exist.";
367                 opResult.setResult(resultMsg);
368                 opResult.setFailureCause(resultMsg);
369                 return opResult;
370             }
371         }
372
373         // Grab the current time so we can use it to generate a metrics log.
374         MdcOverride override = getStartTime(new MdcOverride());
375
376         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
377                 + document.getVersion(), false);
378         HttpURLConnection conn = initializeConnection(fullUrl);
379
380         try {
381             conn.setRequestMethod("PUT");
382         } catch (ProtocolException e) {
383             shutdownConnection(conn);
384             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
385         }
386
387         attachDocument(conn, document);
388
389         logger.debug("Sending 'PUT' request to: " + conn.getURL());
390
391         DocumentOperationResult opResult = getOperationResult(conn);
392         buildDocumentResult(opResult, indexName);
393
394         logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId());
395
396         shutdownConnection(conn);
397
398         return opResult;
399     }
400
401     @Override
402     public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document)
403             throws DocumentStoreOperationException {
404         // Grab the current time so we can use it to generate a metrics log.
405         MdcOverride override = getStartTime(new MdcOverride());
406
407         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
408                 + document.getVersion(), false);
409         HttpURLConnection conn = initializeConnection(fullUrl);
410
411         try {
412             conn.setRequestMethod("DELETE");
413         } catch (ProtocolException e) {
414             shutdownConnection(conn);
415             throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e);
416         }
417
418         logger.debug("\nSending 'DELETE' request to " + conn.getURL());
419
420         DocumentOperationResult opResult = getOperationResult(conn);
421         buildDocumentResult(opResult, indexName);
422         // supress the etag and url in response for delete as they are not required
423         if (opResult.getDocument() != null) {
424             opResult.getDocument().setEtag(null);
425             opResult.getDocument().setUrl(null);
426         }
427
428         logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId());
429
430         shutdownConnection(conn);
431
432         return opResult;
433     }
434
435     @Override
436     public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document)
437             throws DocumentStoreOperationException {
438         // Grab the current time so we can use it to generate a metrics log.
439         MdcOverride override = getStartTime(new MdcOverride());
440
441         String fullUrl = null;
442         if (document.getVersion() == null) {
443             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
444         } else {
445             fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + QUERY_PARAM_VERSION
446                     + document.getVersion(), false);
447         }
448         HttpURLConnection conn = initializeConnection(fullUrl);
449
450         logger.debug("\nSending 'GET' request to: " + conn.getURL());
451
452         DocumentOperationResult opResult = getOperationResult(conn);
453         buildDocumentResult(opResult, indexName);
454
455         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId());
456
457         shutdownConnection(conn);
458
459         return opResult;
460     }
461
462     @Override
463     public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException {
464
465         String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false);
466
467         // Grab the current time so we can use it to generate a metrics log.
468         MdcOverride override = getStartTime(new MdcOverride());
469
470         HttpURLConnection conn = initializeConnection(fullUrl);
471
472         try {
473             conn.setRequestMethod("GET");
474         } catch (ProtocolException e) {
475             shutdownConnection(conn);
476             throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e);
477         }
478
479         logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL());
480
481         SearchOperationResult opResult = getSearchOperationResult(conn);
482         buildSearchResult(opResult, indexName);
483
484         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString);
485
486         return opResult;
487     }
488
489     @Override
490     public SearchOperationResult searchWithPayload(String indexName, String query)
491             throws DocumentStoreOperationException {
492         if (logger.isDebugEnabled()) {
493             logger.debug("Querying index: " + indexName + " with query string: " + query);
494         }
495
496         String fullUrl = getFullUrl("/" + indexName + "/_search", false);
497
498         // Grab the current time so we can use it to generate a metrics log.
499         MdcOverride override = getStartTime(new MdcOverride());
500
501         HttpURLConnection conn = initializeConnection(fullUrl);
502
503         try {
504             conn.setRequestMethod("POST");
505         } catch (ProtocolException e) {
506             shutdownConnection(conn);
507             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
508         }
509
510         attachContent(conn, query);
511
512         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
513         logger.debug("Request body =  Elasticsearch query = " + query);
514
515         SearchOperationResult opResult = getSearchOperationResult(conn);
516         buildSearchResult(opResult, indexName);
517
518         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
519
520         shutdownConnection(conn);
521
522         return opResult;
523     }
524
525
526     @Override
527     public SearchOperationResult suggestionQueryWithPayload(String indexName, String query)
528             throws DocumentStoreOperationException {
529         if (logger.isDebugEnabled()) {
530             logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query);
531         }
532
533         String fullUrl = getFullUrl("/" + indexName + "/_suggest", false);
534
535         // Grab the current time so we can use it to generate a metrics log.
536         MdcOverride override = getStartTime(new MdcOverride());
537
538         HttpURLConnection conn = initializeConnection(fullUrl);
539
540         try {
541             conn.setRequestMethod("POST");
542         } catch (ProtocolException e) {
543             shutdownConnection(conn);
544             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
545         }
546
547         attachContent(conn, query);
548
549         logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL());
550         logger.debug("Request body =  Elasticsearch query = " + query);
551
552         SearchOperationResult opResult = getSearchOperationResult(conn);
553         buildSuggestResult(opResult, indexName);
554
555         logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query);
556
557         shutdownConnection(conn);
558
559         return opResult;
560     }
561
562     @Override
563     public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException {
564         if (logger.isDebugEnabled()) {
565             StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: ");
566
567             for (BulkRequest request : requests) {
568                 dbgString.append("[").append(request).append("] ");
569             }
570
571             logger.debug(dbgString.toString());
572         }
573
574         // Grab the current time so we can use it to generate a metrics log.
575         MdcOverride override = getStartTime(new MdcOverride());
576
577         // Parse the supplied set of operations.
578         // Iterate over the list of operations which we were provided and
579         // translate them into a format that ElasticSearh understands.
580         int opCount = 0;
581         StringBuilder esOperationSet = new StringBuilder(128);
582         List<ElasticSearchResultItem> rejected = new ArrayList<>();
583         for (BulkRequest request : requests) {
584
585             // Convert the request to the syntax ElasticSearch likes.
586             if (buildEsOperation(request, esOperationSet, rejected)) {
587                 opCount++;
588             }
589         }
590
591         ElasticSearchBulkOperationResult opResult = null;
592         if (opCount > 0) {
593
594             // Open an HTTP connection to the ElasticSearch back end.
595             String fullUrl = getFullUrl("/_bulk", false);
596             URL url;
597             HttpURLConnection conn;
598             try {
599
600                 url = new URL(fullUrl);
601                 conn = (HttpURLConnection) url.openConnection();
602                 conn.setRequestMethod("PUT");
603                 conn.setDoOutput(true);
604                 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_FORM_URLENCODED);
605                 conn.setRequestProperty("Connection", "Close");
606
607             } catch (IOException e) {
608
609                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
610                 if (logger.isDebugEnabled()) {
611                     logger.debug(Throwables.getStackTraceAsString(e));
612                 }
613
614                 throw new DocumentStoreOperationException(
615                         "Failed to open connection to document store.  Cause: " + e.getMessage(), e);
616             }
617
618             StringBuilder bulkResult = new StringBuilder(128);
619             try {
620                 // Create an output stream to write our request to.
621                 OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream());
622
623                 if (logger.isDebugEnabled()) {
624                     logger.debug("ESController: Sending 'BULK' request to " + conn.getURL());
625                     logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n"));
626                 }
627
628                 // Write the resulting request string to our output stream. (this sends the request to ES?)
629                 out.write(esOperationSet.toString());
630                 out.close();
631
632                 // Open an input stream on our connection in order to read back the results.
633                 InputStream is = conn.getInputStream();
634                 InputStreamReader inputstreamreader = new InputStreamReader(is);
635                 BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
636
637                 // Read the contents of the input stream into our result string...
638                 String esResponseString = null;
639
640                 while ((esResponseString = bufferedreader.readLine()) != null) {
641                     bulkResult.append(esResponseString).append("\n");
642                 }
643
644             } catch (IOException e) {
645
646                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
647                 if (logger.isDebugEnabled()) {
648                     StringWriter sw = new StringWriter();
649                     e.printStackTrace(new PrintWriter(sw));
650                     logger.debug(sw.toString());
651                 }
652
653                 throw new DocumentStoreOperationException(
654                         "Failure interacting with document store.  Cause: " + e.getMessage(), e);
655             }
656
657             if (logger.isDebugEnabled()) {
658                 logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString());
659             }
660
661             // ...and marshal the resulting string into a Java object.
662             try {
663                 opResult = marshallEsBulkResult(bulkResult.toString());
664
665             } catch (IOException e) {
666                 logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage());
667                 if (logger.isDebugEnabled()) {
668                     logger.debug(Throwables.getStackTraceAsString(e));
669                 }
670
671                 throw new DocumentStoreOperationException("Failed to marshal response body.  Cause: " + e.getMessage(),
672                         e);
673             }
674         }
675
676         OperationResult result = new OperationResultBuilder() //
677                 .resultCode(HttpStatus.MULTI_STATUS_207) //
678                 .result(buildGenericBulkResultSet(opResult, rejected)) //
679                 .build();
680
681         // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate
682         // it.
683         String resultStringForMetricsLog = result.getResult();
684         if (isSuccess(result)) {
685             resultStringForMetricsLog =
686                     resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "...";
687         }
688
689         metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME,
690                 new LogFields() //
691                         .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode())
692                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog),
693                 override);
694
695         return result;
696     }
697
698
699     /**
700      * This method queryies ElasticSearch to determine if the supplied index is present in the document store.
701      *
702      * @param indexName - The index to look for.
703      * @return - An operation result indicating the success or failure of the check.
704      * @throws DocumentStoreOperationException
705      */
706     private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException {
707         // Grab the current time so we can use it to generate a metrics log.
708         MdcOverride override = getStartTime(new MdcOverride());
709
710         String fullUrl = getFullUrl("/" + indexName, false);
711         HttpURLConnection conn = initializeConnection(fullUrl);
712
713         try {
714             conn.setRequestMethod("HEAD");
715
716         } catch (ProtocolException e) {
717             shutdownConnection(conn);
718             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
719         }
720
721         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
722
723         int resultCode;
724         try {
725             resultCode = conn.getResponseCode();
726         } catch (IOException e) {
727             shutdownConnection(conn);
728             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
729         }
730         logger.debug(MSG_RESPONSE_CODE + resultCode);
731
732         OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build();
733         logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName);
734         shutdownConnection(conn);
735
736         return opResult;
737     }
738
739     private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document)
740             throws DocumentStoreOperationException {
741         // check if the document already exists
742         DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId());
743
744         if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) {
745             if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) {
746                 opResult.setFailureCause("A document with the same id already exists.");
747             } else {
748                 opResult.setFailureCause("Failed to verify a document with the specified id does not already exist.");
749             }
750             opResult.setResultCode(Status.CONFLICT.getStatusCode());
751             return opResult;
752         }
753
754         // Grab the current time so we can use it to generate a metrics log.
755         MdcOverride override = getStartTime(new MdcOverride());
756
757         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false);
758         HttpURLConnection conn = initializeConnection(fullUrl);
759
760         try {
761             conn.setRequestMethod("PUT");
762         } catch (ProtocolException e) {
763             shutdownConnection(conn);
764             throw new DocumentStoreOperationException(MSG_HTTP_PUT_FAILED, e);
765         }
766
767         attachDocument(conn, document);
768
769         logger.debug("Sending 'PUT' request to: " + conn.getURL());
770
771         opResult = getOperationResult(conn);
772         buildDocumentResult(opResult, indexName);
773
774         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName);
775
776         shutdownConnection(conn);
777
778         return opResult;
779     }
780
781     private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document)
782             throws DocumentStoreOperationException {
783         // Grab the current time so we can use it to generate a metrics log.
784         MdcOverride override = getStartTime(new MdcOverride());
785
786         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false);
787         HttpURLConnection conn = initializeConnection(fullUrl);
788
789         try {
790             conn.setRequestMethod("POST");
791         } catch (ProtocolException e) {
792             shutdownConnection(conn);
793             throw new DocumentStoreOperationException(MSG_HTTP_POST_FAILED, e);
794         }
795
796         attachDocument(conn, document);
797
798         logger.debug("Sending 'POST' request to: " + conn.getURL());
799
800         DocumentOperationResult response = getOperationResult(conn);
801         buildDocumentResult(response, indexName);
802
803         logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName);
804
805         shutdownConnection(conn);
806
807         return response;
808     }
809
810     private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc)
811             throws DocumentStoreOperationException {
812         conn.setRequestProperty("Connection", "Close");
813         attachContent(conn, doc.getContentInJson());
814     }
815
816     private DocumentOperationResult checkDocumentExistence(String indexName, String docId)
817             throws DocumentStoreOperationException {
818         // Grab the current time so we can use it to generate a metrics log.
819         MdcOverride override = getStartTime(new MdcOverride());
820
821         String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false);
822         HttpURLConnection conn = initializeConnection(fullUrl);
823
824         try {
825             conn.setRequestMethod("HEAD");
826         } catch (ProtocolException e) {
827             shutdownConnection(conn);
828             throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e);
829         }
830
831         logger.debug("Sending 'HEAD' request to: " + conn.getURL());
832
833         int resultCode;
834         try {
835             resultCode = conn.getResponseCode();
836         } catch (IOException e) {
837             shutdownConnection(conn);
838             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
839         }
840
841         logger.debug(MSG_RESPONSE_CODE + resultCode);
842
843         DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT)
844                 .useDefaults().resultCode(resultCode).build();
845
846         logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId);
847         shutdownConnection(conn);
848
849         return opResult;
850     }
851
852     private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException {
853         OutputStream outputStream = null;
854         OutputStreamWriter out = null;
855
856         try {
857             outputStream = conn.getOutputStream();
858         } catch (IOException e) {
859             shutdownConnection(conn);
860             throw new DocumentStoreOperationException("Failed to get connection output stream.", e);
861         }
862
863         out = new OutputStreamWriter(outputStream);
864
865         try {
866             out.write(content);
867             out.close();
868         } catch (IOException e) {
869             shutdownConnection(conn);
870             throw new DocumentStoreOperationException("Failed to write to the output stream.", e);
871         }
872     }
873
874     private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException {
875         URL url = null;
876         HttpURLConnection conn = null;
877
878         try {
879             url = new URL(fullUrl);
880         } catch (MalformedURLException e) {
881             throw new DocumentStoreOperationException("Error building a URL with " + url, e);
882         }
883
884         try {
885             conn = (HttpURLConnection) url.openConnection();
886             conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
887             conn.setDoOutput(true);
888         } catch (IOException e) {
889             shutdownConnection(conn);
890             throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e);
891         }
892
893         return conn;
894     }
895
896     private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException {
897         return handleResponse(conn, new OperationResultBuilder().useDefaults());
898     }
899
900     private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb)
901             throws DocumentStoreOperationException {
902         int resultCode;
903
904         try {
905             resultCode = conn.getResponseCode();
906         } catch (IOException e) {
907             shutdownConnection(conn);
908             throw new DocumentStoreOperationException(FAILED_TO_GET_THE_RESPONSE_CODE_FROM_THE_CONNECTION, e);
909         }
910
911         logger.debug(MSG_RESPONSE_CODE + resultCode);
912
913         InputStream inputStream = null;
914
915         if (!isSuccessCode(resultCode)) {
916             inputStream = conn.getErrorStream();
917         } else {
918             try {
919                 inputStream = conn.getInputStream();
920             } catch (IOException e) {
921                 shutdownConnection(conn);
922                 throw new DocumentStoreOperationException("Failed to get the response input stream.", e);
923             }
924         }
925
926         InputStreamReader inputstreamreader = new InputStreamReader(inputStream);
927         BufferedReader bufferedreader = new BufferedReader(inputstreamreader);
928
929         StringBuilder result = new StringBuilder(128);
930         String string = null;
931
932         try {
933             while ((string = bufferedreader.readLine()) != null) {
934                 result.append(string).append("\n");
935             }
936         } catch (IOException e) {
937             shutdownConnection(conn);
938             throw new DocumentStoreOperationException("Failed getting the response body payload.", e);
939         }
940
941         if (resultCode == Status.CONFLICT.getStatusCode()) {
942             rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode());
943         } else {
944             rb.resultCode(resultCode);
945         }
946         if (logger.isDebugEnabled()) {
947             logger.debug("Raw result string from ElasticSearch = " + result.toString());
948         }
949         rb.result(result.toString());
950         rb.resultVersion(extractVersion(result.toString()));
951         return rb.build();
952     }
953
954     private String extractVersion(String result) {
955         JSONParser parser = new JSONParser();
956         String version = null;
957         try {
958             JSONObject root = (JSONObject) parser.parse(result);
959             if (root.get(JSON_ATTR_VERSION) != null) {
960                 version = root.get(JSON_ATTR_VERSION).toString();
961             }
962         } catch (ParseException e) {
963             // Not all responses from ElasticSearch include a version, so
964             // if we don't get one back, just return an empty string rather
965             // than trigger a false failure.
966             version = "";
967         }
968         return version;
969     }
970
971     /**
972      * This convenience method gets the current system time and stores it in an attribute in the supplied
973      * {@link MdcOverride} object so that it can be used later by the metrics logger.
974      *
975      * @param override - The {@link MdcOverride} object to update.
976      * @return - The supplied {@link MdcOverride} object.
977      */
978     private MdcOverride getStartTime(MdcOverride override) {
979
980         // Grab the current time...
981         long startTimeInMs = System.currentTimeMillis();
982
983         // ...and add it as an attribute to the supplied MDC Override
984         // object.
985         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
986         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
987
988         // Return the MdcOverride object that we were passed.
989         // This looks odd, but it allows us to do stuff like:
990         //
991         // MdcOverride ov = getStartTime(new MdcOverride())
992         //
993         // which is quite handy, but also allows us to pass in an existing
994         // MdcOverride object which already has some attributes set.
995         return override;
996     }
997
998     private boolean isSuccess(OperationResult result) {
999         return isSuccessCode(result.getResultCode());
1000     }
1001
1002     private boolean isSuccessCode(int statusCode) {
1003         return Family.familyOf(statusCode).equals(Family.SUCCESSFUL);
1004     }
1005
1006     private OperationResult checkConnection() throws IOException {
1007         String fullUrl = getFullUrl("/_cluster/health", false);
1008         URL url = null;
1009         HttpURLConnection conn = null;
1010
1011         url = new URL(fullUrl);
1012         conn = (HttpURLConnection) url.openConnection();
1013         conn.setRequestMethod("GET");
1014         conn.setDoOutput(true);
1015         logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url);
1016
1017         int resultCode = conn.getResponseCode();
1018         logger.debug("getClusterHealth() response Code : " + resultCode);
1019
1020         shutdownConnection(conn);
1021
1022         return new OperationResultBuilder().resultCode(resultCode).build();
1023     }
1024
1025     private String getFullUrl(String resourceUrl, boolean isSecure) {
1026
1027         final String host = config.getIpAddress();
1028         final String port = config.getHttpPort();
1029
1030         if (isSecure) {
1031             return String.format("https://%s:%s%s", host, port, resourceUrl);
1032         } else {
1033             return String.format("http://%s:%s%s", host, port, resourceUrl);
1034         }
1035     }
1036
1037     private void shutdownConnection(HttpURLConnection connection) {
1038         if (connection == null) {
1039             return;
1040         }
1041
1042         final String methodName = "shutdownConnection";
1043         InputStream inputstream = null;
1044         OutputStream outputstream = null;
1045
1046         try {
1047             inputstream = connection.getInputStream();
1048         } catch (IOException e) {
1049             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
1050         } finally {
1051             if (inputstream != null) {
1052                 try {
1053                     inputstream.close();
1054                 } catch (IOException e) {
1055                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
1056                 }
1057             }
1058         }
1059
1060         try {
1061             outputstream = connection.getOutputStream();
1062         } catch (IOException e) {
1063             logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
1064         } finally {
1065             if (outputstream != null) {
1066                 try {
1067                     outputstream.close();
1068                 } catch (IOException e) {
1069                     logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage());
1070                 }
1071             }
1072         }
1073
1074         connection.disconnect();
1075     }
1076
1077     /**
1078      * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch.
1079      *
1080      * @param request - The request to be performed.
1081      * @param sb - The string builder to append the json data to
1082      * @throws DocumentStoreOperationException
1083      */
1084     private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails)
1085             throws DocumentStoreOperationException {
1086
1087         boolean retVal = true;
1088         // What kind of operation are we performing?
1089         switch (request.getOperationType()) {
1090
1091             // Create a new document.
1092             case CREATE:
1093
1094                 // Make sure that we were supplied a document payload.
1095                 if (request.getOperation().getDocument() == null) {
1096
1097                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1098                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1099                     return false;
1100                 }
1101
1102                 // Make sure that the supplied document URL is formatted
1103                 // correctly.
1104                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) {
1105                     fails.add(generateRejectionEntry(request.getOperationType(),
1106                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1107                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1108                     return false;
1109                 }
1110
1111                 // Validate that the specified index actually exists before we
1112                 // try to perform the create.
1113                 if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) {
1114
1115                     fails.add(generateRejectionEntry(request.getOperationType(),
1116                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1117                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1118                     return false;
1119                 }
1120
1121                 // If we were supplied an id for the new document, then
1122                 // include it in the bulk operation to Elastic Search
1123                 if (request.getId() == null) {
1124
1125                     sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE));
1126
1127                     // Otherwise, we just leave that parameter off and ElasticSearch
1128                     // will generate one for us.
1129                 } else {
1130                     sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE,
1131                             request.getId()));
1132                 }
1133
1134                 try {
1135                     // Append the document that we want to create.
1136                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1137                 } catch (JsonProcessingException e) {
1138                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1139                 }
1140
1141                 break;
1142
1143             // Update an existing document.
1144             case UPDATE:
1145
1146                 // Make sure that we were supplied a document payload.
1147                 if (request.getOperation().getDocument() == null) {
1148
1149                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload",
1150                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1151                     return false;
1152                 }
1153
1154                 // Make sure that the supplied document URL is formatted
1155                 // correctly.
1156                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1157                     fails.add(generateRejectionEntry(request.getOperationType(),
1158                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1159                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1160                     return false;
1161                 }
1162
1163                 // Validate that the specified index actually exists before we
1164                 // try to perform the update.
1165                 if (!indexExists(request.getIndex())) {
1166
1167                     fails.add(generateRejectionEntry(request.getOperationType(),
1168                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1169                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1170                     return false;
1171                 }
1172
1173                 // Validate that the document we are trying to update actually
1174                 // exists before we try to perform the update.
1175                 if (!documentExists(request.getIndex(), request.getId())) {
1176
1177                     fails.add(generateRejectionEntry(request.getOperationType(),
1178                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1179                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1180                     return false;
1181                 }
1182
1183                 // It is mandatory that a version be supplied for an update operation,
1184                 // so validate that now.
1185                 if (request.getOperation().getMetaData().getEtag() == null) {
1186
1187                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1188                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1189                     return false;
1190                 }
1191
1192                 // Generate the update request...
1193                 sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1194                         request.getOperation().getMetaData().getEtag()));
1195
1196                 // ...and append the document that we want to update.
1197                 try {
1198                     sb.append(request.getOperation().getDocument().toJson()).append("\n");
1199                 } catch (JsonProcessingException e) {
1200                     throw new DocumentStoreOperationException("Failure parsing document to json", e);
1201                 }
1202                 break;
1203
1204             // Delete an existing document.
1205             case DELETE:
1206
1207                 // Make sure that the supplied document URL is formatted
1208                 // correctly.
1209                 if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) {
1210                     fails.add(generateRejectionEntry(request.getOperationType(),
1211                             MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(),
1212                             request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl()));
1213                     return false;
1214                 }
1215
1216                 // Validate that the specified index actually exists before we
1217                 // try to perform the delete.
1218                 if (!indexExists(request.getIndex())) {
1219
1220                     fails.add(generateRejectionEntry(request.getOperationType(),
1221                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1222                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1223                     return false;
1224                 }
1225
1226                 // Validate that the document we are trying to update actually
1227                 // exists before we try to perform the delete.
1228                 if (!documentExists(request.getIndex(), request.getId())) {
1229
1230                     fails.add(generateRejectionEntry(request.getOperationType(),
1231                             MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(),
1232                             request.getId(), 404, request.getOperation().getMetaData().getUrl()));
1233                     return false;
1234                 }
1235
1236                 // It is mandatory that a version be supplied for a delete operation,
1237                 // so validate that now.
1238                 if (request.getOperation().getMetaData().getEtag() == null) {
1239
1240                     fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field",
1241                             request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl()));
1242                     return false;
1243                 }
1244
1245                 // Generate the delete request.
1246                 sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(),
1247                         request.getOperation().getMetaData().getEtag()));
1248                 break;
1249             default:
1250         }
1251
1252         return retVal;
1253     }
1254
1255     private boolean indexExists(String index) throws DocumentStoreOperationException {
1256         return isSuccess(checkIndexExistence(index));
1257     }
1258
1259     private boolean documentExists(String index, String id) throws DocumentStoreOperationException {
1260         return isSuccess(checkDocumentExistence(index, id));
1261     }
1262
1263     /**
1264      * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the
1265      * document store.
1266      *
1267      * @param rejectReason - A message describing why the operation was rejected.
1268      * @param anId - The identifier associated with the document being acted on.
1269      * @param statusCode - An HTTP status code.
1270      * @return - A result set item.
1271      */
1272     private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index,
1273             String anId, int statusCode, String originalUrl) {
1274
1275         ElasticSearchError err = new ElasticSearchError();
1276         err.setReason(rejectReason);
1277
1278         ElasticSearchOperationStatus op = new ElasticSearchOperationStatus();
1279         op.setIndex(index);
1280         op.setId(anId);
1281         op.setStatus(statusCode);
1282         op.setError(err);
1283         op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl);
1284
1285         ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem();
1286
1287         switch (opType) {
1288             case CREATE:
1289                 rejectionResult.setCreate(op);
1290                 break;
1291             case UPDATE:
1292                 rejectionResult.setIndex(op);
1293                 break;
1294             case DELETE:
1295                 rejectionResult.setDelete(op);
1296                 break;
1297             default:
1298         }
1299
1300         return rejectionResult;
1301     }
1302
1303     /**
1304      * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and
1305      * marshals it into a Java object.
1306      *
1307      * @param jsonResult - The bulk operations response returned from ElasticSearch.
1308      * @return - The marshalled response.
1309      * @throws JsonParseException
1310      * @throws JsonMappingException
1311      * @throws IOException
1312      */
1313     private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException {
1314         if (jsonResult != null) {
1315             if (logger.isDebugEnabled()) {
1316                 logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", ""));
1317             }
1318
1319             ObjectMapper mapper = new ObjectMapper();
1320             mapper.setSerializationInclusion(Include.NON_EMPTY);
1321
1322             return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class);
1323         }
1324
1325         return null;
1326     }
1327
1328     /**
1329      * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload.
1330      *
1331      * @param esResult - ElasticSearch bulk operations response.
1332      * @return - A generic result set.
1333      */
1334     private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult,
1335             List<ElasticSearchResultItem> rejectedOps) {
1336         int totalOps = 0;
1337         int totalSuccess = 0;
1338         int totalFails = 0;
1339
1340         if (logger.isDebugEnabled()) {
1341
1342             logger.debug("ESController: Build generic result set.  ES Results: "
1343                     + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString());
1344         }
1345
1346         // Build a combined list of result items from the results returned
1347         // from ElasticSearch and the list of operations that we rejected
1348         // without sending to ElasticSearch.
1349         List<ElasticSearchResultItem> combinedResults = new ArrayList<>();
1350         if (esResult != null) {
1351             combinedResults.addAll(Arrays.asList(esResult.getItems()));
1352         }
1353         combinedResults.addAll(rejectedOps);
1354
1355         // Iterate over the individual results in the resulting result set.
1356         StringBuilder resultsBuilder = new StringBuilder();
1357         AtomicBoolean firstItem = new AtomicBoolean(true);
1358         for (ElasticSearchResultItem item : combinedResults) {
1359
1360             // Increment the operation counts.
1361             totalOps++;
1362             if (isSuccessCode(item.operationStatus().getStatus())) {
1363                 totalSuccess++;
1364             } else {
1365                 totalFails++;
1366             }
1367
1368             // Prepend a comma to our response string unless this it the
1369             // first result in the set.
1370             if (!firstItem.compareAndSet(true, false)) {
1371                 resultsBuilder.append(", ");
1372             }
1373
1374             // Append the current result as a generic json structure.
1375             resultsBuilder.append(item.toJson());
1376         }
1377
1378         return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", "
1379                 + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}";
1380     }
1381
1382     private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException {
1383         return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults());
1384     }
1385
1386     private SearchOperationResult getSearchOperationResult(HttpURLConnection conn)
1387             throws DocumentStoreOperationException {
1388         return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults());
1389     }
1390
1391     private void buildDocumentResult(DocumentOperationResult result, String index)
1392             throws DocumentStoreOperationException {
1393
1394         JSONParser parser = new JSONParser();
1395         JSONObject root;
1396         try {
1397             root = (JSONObject) parser.parse(result.getResult());
1398             if (isSuccess(result)) {
1399                 // Success response object
1400                 Document doc = new Document();
1401                 doc.setEtag(result.getResultVersion());
1402                 doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString()));
1403
1404                 doc.setContent((JSONObject) root.get("_source"));
1405                 result.setDocument(doc);
1406
1407             } else {
1408                 // Error response object
1409                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1410                 if (error != null) {
1411                     result.setError(
1412                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1413                 }
1414
1415             }
1416         } catch (Exception e) {
1417             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1418         }
1419     }
1420
1421     private String buildDocumentResponseUrl(String index, String id) {
1422         return ApiUtils.buildDocumentUri(index, id);
1423     }
1424
1425     private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1426         JSONParser parser = new JSONParser();
1427         JSONObject root;
1428
1429         try {
1430             root = (JSONObject) parser.parse(result.getResult());
1431             if (isSuccess(result)) {
1432                 JSONObject hits = (JSONObject) root.get("hits");
1433                 JSONArray hitArray = (JSONArray) hits.get("hits");
1434                 SearchHits searchHits = new SearchHits();
1435                 searchHits.setTotalHits(hits.get("total").toString());
1436                 ArrayList<SearchHit> searchHitArray = new ArrayList<>();
1437
1438                 for (int i = 0; i < hitArray.size(); i++) {
1439                     JSONObject hit = (JSONObject) hitArray.get(i);
1440                     SearchHit searchHit = new SearchHit();
1441                     searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : "");
1442                     Document doc = new Document();
1443                     if (hit.get(JSON_ATTR_VERSION) != null) {
1444                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1445                     }
1446
1447                     doc.setUrl(
1448                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1449                     doc.setContent((JSONObject) hit.get("_source"));
1450                     searchHit.setDocument(doc);
1451                     searchHitArray.add(searchHit);
1452                 }
1453                 searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()]));
1454                 result.setSearchResult(searchHits);
1455
1456                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1457                 if (aggregations != null) {
1458                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1459                     AggregationResults aggs = new AggregationResults();
1460                     aggs.setAggregations(aggResults);
1461                     result.setAggregationResult(aggs);
1462                 }
1463
1464                 // success
1465             } else {
1466                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1467                 if (error != null) {
1468                     result.setError(
1469                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1470                 }
1471             }
1472         } catch (Exception e) {
1473             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1474         }
1475     }
1476
1477     private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException {
1478         JSONParser parser = new JSONParser();
1479         JSONObject root;
1480         try {
1481             root = (JSONObject) parser.parse(result.getResult());
1482             if (isSuccess(result)) {
1483                 JSONArray hitArray = (JSONArray) root.get("suggest-vnf");
1484                 JSONObject hitdata = (JSONObject) hitArray.get(0);
1485                 JSONArray optionsArray = (JSONArray) hitdata.get("options");
1486                 SuggestHits suggestHits = new SuggestHits();
1487                 suggestHits.setTotalHits(String.valueOf(optionsArray.size()));
1488
1489                 ArrayList<SuggestHit> suggestHitArray = new ArrayList<>();
1490
1491                 for (int i = 0; i < optionsArray.size(); i++) {
1492                     JSONObject hit = (JSONObject) optionsArray.get(i);
1493
1494                     SuggestHit suggestHit = new SuggestHit();
1495                     suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : "");
1496                     suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : "");
1497                     Document doc = new Document();
1498                     if (hit.get(JSON_ATTR_VERSION) != null) {
1499                         doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : "");
1500                     }
1501                     doc.setUrl(
1502                             buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : ""));
1503
1504                     doc.setContent((JSONObject) hit.get("payload"));
1505                     suggestHit.setDocument(doc);
1506                     suggestHitArray.add(suggestHit);
1507                 }
1508                 suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()]));
1509                 result.setSuggestResult(suggestHits);
1510
1511                 JSONObject aggregations = (JSONObject) root.get("aggregations");
1512                 if (aggregations != null) {
1513                     AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations);
1514                     AggregationResults aggs = new AggregationResults();
1515                     aggs.setAggregations(aggResults);
1516                     result.setAggregationResult(aggs);
1517                 }
1518
1519                 // success
1520             } else {
1521                 JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR);
1522                 if (error != null) {
1523                     result.setError(
1524                             new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString()));
1525                 }
1526             }
1527         } catch (Exception e) {
1528             throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult());
1529         }
1530     }
1531
1532     /**
1533      * Record the timing of the operation in the metrics log.
1534      *
1535      */
1536     private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult,
1537             String... args) {
1538         metricsLogger.info(message,
1539                 new LogFields() //
1540                         .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode())
1541                         .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()),
1542                 override, args);
1543     }
1544 }