Update the dependencies to use project version
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / synchronizer / ElasticSearchIndexCleaner.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.synchronizer;
24
25 import com.fasterxml.jackson.core.JsonProcessingException;
26 import com.fasterxml.jackson.databind.JsonNode;
27 import com.fasterxml.jackson.databind.ObjectMapper;
28 import com.fasterxml.jackson.databind.node.ArrayNode;
29 import com.fasterxml.jackson.databind.node.ObjectNode;
30
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Iterator;
35 import java.util.List;
36
37 import org.onap.aai.sparky.dal.rest.OperationResult;
38 import org.onap.aai.sparky.dal.rest.RestDataProvider;
39 import org.onap.aai.sparky.logging.AaiUiMsgs;
40 import org.onap.aai.sparky.synchronizer.entity.ObjectIdCollection;
41 import org.onap.aai.sparky.synchronizer.entity.SearchableEntity;
42 import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
43 import org.onap.aai.cl.api.Logger;
44 import org.onap.aai.cl.eelf.LoggerFactory;
45
46 /**
47  * The Class ElasticSearchIndexCleaner.
48  */
49 public class ElasticSearchIndexCleaner implements IndexCleaner {
50
51   private static final Logger LOG =
52       LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
53
54   private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
55   private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
56
57   private ObjectIdCollection before;
58   private ObjectIdCollection after;
59
60   private String host;
61   private String port;
62
63   private String indexName;
64   private String indexType;
65   private int scrollContextTimeToLiveInMinutes;
66   private int numItemsToGetBulkRequest;
67
68   private RestDataProvider restDataProvider;
69   private ObjectMapper mapper;
70
71   /**
72    * Instantiates a new elastic search index cleaner.
73    *
74    * @param restDataProvider the rest data provider
75    * @param indexName the index name
76    * @param indexType the index type
77    * @param host the host
78    * @param port the port
79    * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
80    * @param numItemsToGetBulkRequest the num items to get bulk request
81    */
82   protected ElasticSearchIndexCleaner(RestDataProvider restDataProvider, String indexName,
83       String indexType, String host, String port, int scrollContextTimeToLiveInMinutes,
84       int numItemsToGetBulkRequest) {
85     this.restDataProvider = restDataProvider;
86     this.before = null;
87     this.after = null;
88     this.indexName = indexName;
89     this.indexType = indexType;
90     this.mapper = new ObjectMapper();
91     this.host = host;
92     this.port = port;
93     this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
94     this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
95   }
96
97   /*
98    * (non-Javadoc)
99    * 
100    * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
101    */
102   @Override
103   public OperationState populatePreOperationCollection() {
104
105     try {
106       before = retrieveAllDocumentIdentifiers();
107       return OperationState.OK;
108     } catch (Exception exc) {
109       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
110       return OperationState.ERROR;
111     }
112
113   }
114
115   /*
116    * (non-Javadoc)
117    * 
118    * @see org.onap.aai.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
119    */
120   @Override
121   public OperationState populatePostOperationCollection() {
122     try {
123       after = retrieveAllDocumentIdentifiers();
124       return OperationState.OK;
125     } catch (Exception exc) {
126       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, indexName, exc.getMessage());
127       return OperationState.ERROR;
128     }
129   }
130
131   /*
132    * (non-Javadoc)
133    * 
134    * @see org.onap.aai.sparky.synchronizer.IndexCleaner#performCleanup()
135    */
136   @Override
137   public OperationState performCleanup() {
138     // TODO Auto-generated method stub
139     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, indexName);
140
141     int sizeBefore = before.getSize();
142     int sizeAfter = after.getSize();
143
144     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
145         String.valueOf(sizeAfter));
146
147     /*
148      * If the processedImportIds size <= 0, then something has failed in the sync operation and we
149      * shouldn't do the selective delete right now.
150      */
151
152     if (sizeAfter > 0) {
153
154       Collection<String> presyncIds = before.getImportedObjectIdsAsValues();
155       presyncIds.removeAll(after.getImportedObjectIdsAsValues());
156
157       try {
158         LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, indexName, indexType,
159             String.valueOf(presyncIds.size()));
160
161         ObjectIdCollection bulkIds = new ObjectIdCollection();
162
163         Iterator<String> it = presyncIds.iterator();
164         int numItemsInBulkRequest = 0;
165         int numItemsRemainingToBeDeleted = presyncIds.size();
166
167         while (it.hasNext()) {
168
169           bulkIds.addObjectId(it.next());
170           numItemsInBulkRequest++;
171
172           if (numItemsInBulkRequest >= this.numItemsToGetBulkRequest) {
173             LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
174             OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues());
175             // pegCountersForElasticBulkDelete(bulkDeleteResult);
176             numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
177             numItemsInBulkRequest = 0;
178             bulkIds.clear();
179           }
180         }
181
182         if (numItemsRemainingToBeDeleted > 0) {
183           LOG.info(AaiUiMsgs.ES_BULK_DELETE, indexName, String.valueOf(bulkIds.getSize()));
184           OperationResult bulkDeleteResult = bulkDelete(bulkIds.getImportedObjectIdsAsValues());
185           // pegCountersForElasticBulkDelete(bulkDeleteResult);
186         }
187
188
189       } catch (Exception exc) {
190         LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, indexName, exc.getLocalizedMessage());
191
192       }
193     }
194
195     return OperationState.OK;
196   }
197
198   @Override
199   public String getIndexName() {
200     return indexName;
201   }
202
203   public void setIndexName(String indexName) {
204     this.indexName = indexName;
205   }
206
207   /**
208    * Builds the initial scroll request payload.
209    *
210    * @param numItemsToGetPerRequest the num items to get per request
211    * @param fieldList the field list
212    * @return the string
213    * @throws JsonProcessingException the json processing exception
214    */
215   protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
216       List<String> fieldList) throws JsonProcessingException {
217
218     ObjectNode rootNode = mapper.createObjectNode();
219     rootNode.put("size", numItemsToGetPerRequest);
220
221     ArrayNode fields = mapper.createArrayNode();
222
223     for (String f : fieldList) {
224       fields.add(f);
225     }
226
227     rootNode.set("fields", fields);
228
229     ObjectNode queryNode = mapper.createObjectNode();
230     queryNode.set("match_all", mapper.createObjectNode());
231
232     rootNode.set("query", queryNode);
233
234     return mapper.writeValueAsString(rootNode);
235
236   }
237
238   /**
239    * Builds the subsequent scroll context request payload.
240    *
241    * @param scrollId the scroll id
242    * @param contextTimeToLiveInMinutes the context time to live in minutes
243    * @return the string
244    * @throws JsonProcessingException the json processing exception
245    */
246   protected String buildSubsequentScrollContextRequestPayload(String scrollId,
247       int contextTimeToLiveInMinutes) throws JsonProcessingException {
248
249     ObjectNode rootNode = mapper.createObjectNode();
250
251     rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
252     rootNode.put("scroll_id", scrollId);
253
254     return mapper.writeValueAsString(rootNode);
255
256   }
257
258   /**
259    * Parses the elastic search result.
260    *
261    * @param jsonResult the json result
262    * @return the json node
263    * @throws JsonProcessingException the json processing exception
264    * @throws IOException Signals that an I/O exception has occurred.
265    */
266   protected JsonNode parseElasticSearchResult(String jsonResult)
267       throws JsonProcessingException, IOException {
268     ObjectMapper mapper = new ObjectMapper();
269     return mapper.readTree(jsonResult);
270   }
271
272   /**
273    * Lookup index doc.
274    *
275    * @param ids the ids
276    * @param docs the docs
277    * @return the array list
278    */
279   protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
280       List<SearchableEntity> docs) {
281     ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
282
283     if (ids != null && docs != null) {
284       for (SearchableEntity d : docs) {
285         if (ids.contains(d.getId())) {
286           objs.add(d);
287         }
288       }
289     }
290
291     return objs;
292   }
293
294   /**
295    * Builds the delete data object.
296    *
297    * @param index the index
298    * @param type the type
299    * @param id the id
300    * @return the object node
301    */
302   protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
303
304     ObjectNode indexDocProperties = mapper.createObjectNode();
305
306     indexDocProperties.put("_index", index);
307     indexDocProperties.put("_type", type);
308     indexDocProperties.put("_id", id);
309
310     ObjectNode rootNode = mapper.createObjectNode();
311     rootNode.set("delete", indexDocProperties);
312
313     return rootNode;
314   }
315
316   /**
317    * This method might appear to be a little strange, and is simply an optimization to take an
318    * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
319    *
320    * @param startNode the start node
321    * @param fieldPath the field path
322    * @return the node path
323    */
324   protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
325
326     JsonNode jsonNode = null;
327
328     for (String field : fieldPath) {
329       if (jsonNode == null) {
330         jsonNode = startNode.get(field);
331       } else {
332         jsonNode = jsonNode.get(field);
333       }
334
335       /*
336        * This is our safety net in case any intermediate path returns a null
337        */
338
339       if (jsonNode == null) {
340         return null;
341       }
342
343     }
344
345     return jsonNode;
346   }
347
348   /**
349    * Gets the full url.
350    *
351    * @param resourceUrl the resource url
352    * @return the full url
353    */
354   private String getFullUrl(String resourceUrl) {
355     return String.format("http://%s:%s%s", host, port, resourceUrl);
356   }
357
358   /**
359    * Retrieve all document identifiers.
360    *
361    * @return the object id collection
362    * @throws IOException Signals that an I/O exception has occurred.
363    */
364   public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
365
366     ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
367
368     long opStartTimeInMs = System.currentTimeMillis();
369
370     List<String> fields = new ArrayList<String>();
371     fields.add("_id");
372     // fields.add("entityType");
373
374     String scrollRequestPayload =
375         buildInitialScrollRequestPayload(this.numItemsToGetBulkRequest, fields);
376
377     final String fullUrlStr = getFullUrl("/" + indexName + "/" + indexType + "/_search?scroll="
378         + this.scrollContextTimeToLiveInMinutes + "m");
379
380     OperationResult result =
381         restDataProvider.doPost(fullUrlStr, scrollRequestPayload, "application/json");
382
383     if (result.wasSuccessful()) {
384
385       JsonNode rootNode = parseElasticSearchResult(result.getResult());
386
387       /*
388        * Check the result for success / failure, and enumerate all the index ids that resulted in
389        * success, and ignore the ones that failed or log them so we have a record of the failure.
390        */
391       int totalRecordsAvailable = 0;
392       String scrollId = null;
393       int numRecordsFetched = 0;
394
395       if (rootNode != null) {
396
397         scrollId = getFieldValue(rootNode, "_scroll_id");
398         final String tookStr = getFieldValue(rootNode, "took");
399         int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
400         boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
401
402         if (timedOut) {
403           LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
404               String.valueOf(tookInMs));
405         } else {
406           LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
407               String.valueOf(tookInMs));
408         }
409
410         JsonNode hitsNode = rootNode.get("hits");
411         totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
412
413         LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
414             String.valueOf(totalRecordsAvailable));
415
416         /*
417          * Collect all object ids
418          */
419
420         ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
421
422         Iterator<JsonNode> nodeIterator = hitsArray.iterator();
423
424         String key = null;
425         String value = null;
426         JsonNode jsonNode = null;
427
428         while (nodeIterator.hasNext()) {
429
430           jsonNode = nodeIterator.next();
431
432           key = getFieldValue(jsonNode, "_id");
433
434           if (key != null) {
435             currentDocumentIds.addObjectId(key);
436           }
437
438           /*
439            * if (key != null) {
440            * 
441            * JsonNode fieldsNode = jNode.get("fields");
442            * 
443            * if (fieldsNode != null) {
444            * 
445            * JsonNode entityTypeNode = fieldsNode.get("entityType");
446            * 
447            * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
448            * 
449            * if (aNode.size() > 0) { value = aNode.get(0).asText(); objAndtTypesMap.put(key, value);
450            * numRecordsFetched++; } } } }
451            */
452
453         }
454
455         int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
456
457         int numRequiredAdditionalFetches =
458             (totalRecordsRemainingToFetch / this.numItemsToGetBulkRequest);
459
460         /*
461          * Do an additional fetch for the remaining items (if needed)
462          */
463
464         if (totalRecordsRemainingToFetch % numItemsToGetBulkRequest != 0) {
465           numRequiredAdditionalFetches += 1;
466         }
467
468         if (LOG.isDebugEnabled()) {
469           LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
470               String.valueOf(numRequiredAdditionalFetches));
471         }
472
473
474         for (int x = 0; x < numRequiredAdditionalFetches; x++) {
475
476           if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
477             // abort the whole thing because now we can't reliably cleanup the orphans.
478             throw new IOException(
479                 "Failed to collect pre-sync doc collection from index.  Aborting operation");
480           }
481           if (LOG.isDebugEnabled()) {
482             LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
483                 String.valueOf(currentDocumentIds.getSize()),
484                 String.valueOf(totalRecordsAvailable));
485           }
486
487         }
488
489       }
490
491     } else {
492       // scroll context get failed, nothing else to do
493       LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
494     }
495
496     LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
497         String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
498
499     return currentDocumentIds;
500
501   }
502
503   /**
504    * Collect items from scroll context.
505    *
506    * @param scrollId the scroll id
507    * @param objectIds the object ids
508    * @return the operation state
509    * @throws IOException Signals that an I/O exception has occurred.
510    */
511   private OperationState collectItemsFromScrollContext(String scrollId,
512       ObjectIdCollection objectIds) throws IOException {
513
514     // ObjectIdCollection documentIdCollection = new ObjectIdCollection();
515
516     String requestPayload =
517         buildSubsequentScrollContextRequestPayload(scrollId, scrollContextTimeToLiveInMinutes);
518
519     final String fullUrlStr = getFullUrl("/_search/scroll");
520
521     OperationResult opResult =
522         restDataProvider.doPost(fullUrlStr, requestPayload, "application/json");
523
524     if (opResult.getResultCode() >= 300) {
525       LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
526       return OperationState.ERROR;
527     }
528
529     JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
530
531     /*
532      * Check the result for success / failure, and enumerate all the index ids that resulted in
533      * success, and ignore the ones that failed or log them so we have a record of the failure.
534      */
535
536     if (rootNode != null) {
537       boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
538       final String tookStr = getFieldValue(rootNode, "took");
539       int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
540
541       JsonNode hitsNode = rootNode.get("hits");
542
543       if (timedOut) {
544         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
545       } else {
546         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
547       }
548
549       /*
550        * Collect all object ids
551        */
552
553       ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
554       String key = null;
555       String value = null;
556       JsonNode jsonNode = null;
557
558       Iterator<JsonNode> nodeIterator = hitsArray.iterator();
559
560       while (nodeIterator.hasNext()) {
561
562         jsonNode = nodeIterator.next();
563
564         key = getFieldValue(jsonNode, "_id");
565
566         if (key != null) {
567           objectIds.addObjectId(key);
568
569           /*
570            * JsonNode fieldsNode = jNode.get("fields");
571            * 
572            * if (fieldsNode != null) {
573            * 
574            * JsonNode entityTypeNode = fieldsNode.get("entityType");
575            * 
576            * if (entityTypeNode != null) { ArrayNode aNode = (ArrayNode) entityTypeNode;
577            * 
578            * if (aNode.size() > 0) { value = aNode.get(0).asText(); objectIdsAndTypes.put(key,
579            * value); } } } }
580            */
581
582         }
583
584       }
585     } else {
586       // scroll context get failed, nothing else to do
587       LOG.error(AaiUiMsgs.ERROR_GENERIC, opResult.toString());
588     }
589
590     return OperationState.OK;
591   }
592
593   /**
594    * Gets the field value.
595    *
596    * @param node the node
597    * @param fieldName the field name
598    * @return the field value
599    */
600   protected String getFieldValue(JsonNode node, String fieldName) {
601
602     JsonNode field = node.get(fieldName);
603
604     if (field != null) {
605       return field.asText();
606     }
607
608     return null;
609
610   }
611
612   /**
613    * Bulk delete.
614    *
615    * @param docIds the doc ids
616    * @return the operation result
617    * @throws IOException Signals that an I/O exception has occurred.
618    */
619   public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
620
621     if (docIds == null || docIds.size() == 0) {
622       LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
623       return new OperationResult(500,
624           "Skipping bulkDelete(); operation because docs to delete list is empty");
625     }
626
627     LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
628
629     StringBuilder sb = new StringBuilder(128);
630
631     for (String id : docIds) {
632       sb.append(
633           String.format(BULK_OP_LINE_TEMPLATE, buildDeleteDataObject(indexName, indexType, id)));
634     }
635
636     sb.append("\n");
637
638     final String fullUrlStr = getFullUrl("/_bulk");
639
640     return restDataProvider.doPost(fullUrlStr, sb.toString(), "application/x-www-form-urlencoded");
641
642   }
643
644   /**
645    * @return the before
646    */
647   public ObjectIdCollection getBefore() {
648     return before;
649   }
650
651   /**
652    * @param before the before to set
653    */
654   public void setBefore(ObjectIdCollection before) {
655     this.before = before;
656   }
657
658   /**
659    * @return the after
660    */
661   public ObjectIdCollection getAfter() {
662     return after;
663   }
664
665   /**
666    * @param after the after to set
667    */
668   public void setAfter(ObjectIdCollection after) {
669     this.after = after;
670   }
671
672   /**
673    * @return the host
674    */
675   public String getHost() {
676     return host;
677   }
678
679   /**
680    * @param host the host to set
681    */
682   public void setHost(String host) {
683     this.host = host;
684   }
685
686   /**
687    * @return the port
688    */
689   public String getPort() {
690     return port;
691   }
692
693   /**
694    * @param port the port to set
695    */
696   public void setPort(String port) {
697     this.port = port;
698   }
699
700   /**
701    * @return the indexType
702    */
703   public String getIndexType() {
704     return indexType;
705   }
706
707   /**
708    * @param indexType the indexType to set
709    */
710   public void setIndexType(String indexType) {
711     this.indexType = indexType;
712   }
713
714   /**
715    * @return the scrollContextTimeToLiveInMinutes
716    */
717   public int getScrollContextTimeToLiveInMinutes() {
718     return scrollContextTimeToLiveInMinutes;
719   }
720
721   /**
722    * @param scrollContextTimeToLiveInMinutes the scrollContextTimeToLiveInMinutes to set
723    */
724   public void setScrollContextTimeToLiveInMinutes(int scrollContextTimeToLiveInMinutes) {
725     this.scrollContextTimeToLiveInMinutes = scrollContextTimeToLiveInMinutes;
726   }
727
728   /**
729    * @return the numItemsToGetBulkRequest
730    */
731   public int getNumItemsToGetBulkRequest() {
732     return numItemsToGetBulkRequest;
733   }
734
735   /**
736    * @param numItemsToGetBulkRequest the numItemsToGetBulkRequest to set
737    */
738   public void setNumItemsToGetBulkRequest(int numItemsToGetBulkRequest) {
739     this.numItemsToGetBulkRequest = numItemsToGetBulkRequest;
740   }
741
742   /**
743    * @return the restDataProvider
744    */
745   public RestDataProvider getRestDataProvider() {
746     return restDataProvider;
747   }
748
749   /**
750    * @param restDataProvider the restDataProvider to set
751    */
752   public void setRestDataProvider(RestDataProvider restDataProvider) {
753     this.restDataProvider = restDataProvider;
754   }
755
756   /**
757    * @return the mapper
758    */
759   public ObjectMapper getMapper() {
760     return mapper;
761   }
762
763   /**
764    * @param mapper the mapper to set
765    */
766   public void setMapper(ObjectMapper mapper) {
767     this.mapper = mapper;
768   }
769
770   /**
771    * @return the log
772    */
773   public static Logger getLog() {
774     return LOG;
775   }
776
777   /**
778    * @return the bulkOpLineTemplate
779    */
780   public static String getBulkOpLineTemplate() {
781     return BULK_OP_LINE_TEMPLATE;
782   }
783
784   /**
785    * @return the timestampFormat
786    */
787   public static String getTimestampFormat() {
788     return TIMESTAMP_FORMAT;
789   }
790
791   /*
792   
793   */
794
795 }