Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / SyncHelper.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import com.google.common.util.concurrent.ThreadFactoryBuilder;
29
30 import java.lang.Thread.UncaughtExceptionHandler;
31 import java.text.SimpleDateFormat;
32 import java.util.ArrayList;
33 import java.util.Calendar;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.TimeZone;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 import org.openecomp.cl.api.Logger;
44 import org.openecomp.cl.eelf.LoggerFactory;
45 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
46 import org.openecomp.sparky.config.oxm.OxmModelLoader;
47 import org.openecomp.sparky.dal.aai.ActiveInventoryAdapter;
48 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
49 import org.openecomp.sparky.dal.aai.config.ActiveInventoryRestConfig;
50 import org.openecomp.sparky.dal.cache.EntityCache;
51 import org.openecomp.sparky.dal.cache.InMemoryEntityCache;
52 import org.openecomp.sparky.dal.cache.PersistentEntityCache;
53 import org.openecomp.sparky.dal.elasticsearch.ElasticSearchAdapter;
54 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
55 import org.openecomp.sparky.dal.rest.RestClientBuilder;
56 import org.openecomp.sparky.dal.rest.RestfulDataAccessor;
57 import org.openecomp.sparky.logging.AaiUiMsgs;
58 import org.openecomp.sparky.synchronizer.SyncController.SyncActions;
59 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
60 import org.openecomp.sparky.synchronizer.config.SynchronizerConstants;
61 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
62 import org.openecomp.sparky.util.ErrorUtil;
63 import org.openecomp.sparky.viewandinspect.config.TierSupportUiConstants;
64 import org.slf4j.MDC;
65
66 /**
67  * The Class SyncHelper.
68  *
69  * @author davea.
70  */
71 public class SyncHelper {
72
73   private final Logger LOG = LoggerFactory.getInstance().getLogger(SyncHelper.class);
74   private SyncController syncController = null;
75   private SyncController entityCounterHistorySummarizer = null;
76
77   private ScheduledExecutorService oneShotExecutor = Executors.newSingleThreadScheduledExecutor();
78   private ScheduledExecutorService periodicExecutor = null;
79   private ScheduledExecutorService historicalExecutor =
80       Executors.newSingleThreadScheduledExecutor();
81
82   private SynchronizerConfiguration syncConfig;
83   private ElasticSearchConfig esConfig;
84   private OxmModelLoader oxmModelLoader;
85
86   private Boolean initialSyncRunning = false;
87   private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
88   private AtomicLong timeNextSync = new AtomicLong();
89   Map<String, String> contextMap;
90
91   /**
92    * The Class SyncTask.
93    */
94   private class SyncTask implements Runnable {
95
96     private boolean isInitialSync;
97
98     /**
99      * Instantiates a new sync task.
100      *
101      * @param initialSync the initial sync
102      */
103     public SyncTask(boolean initialSync) {
104       this.isInitialSync = initialSync;
105     }
106
107     /*
108      * (non-Javadoc)
109      * 
110      * @see java.lang.Runnable#run()
111      */
112     @Override
113     public void run() {
114       long opStartTime = System.currentTimeMillis();
115       MDC.setContextMap(contextMap);
116
117       LOG.info(AaiUiMsgs.SEARCH_ENGINE_SYNC_STARTED, sdf.format(opStartTime)
118           .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
119
120       try {
121
122         if (syncController == null) {
123           LOG.error(AaiUiMsgs.SYNC_SKIPPED_SYNCCONTROLLER_NOT_INITIALIZED);
124           return;
125         }
126
127         int taskFrequencyInDays = SynchronizerConfiguration.getConfig().getSyncTaskFrequencyInDay();
128
129         /*
130          * Do nothing if the initial start-up sync hasn't finished yet, but the regular sync
131          * scheduler fired up a regular sync.
132          */
133         if (!initialSyncRunning) {
134           if (isInitialSync) {
135             initialSyncRunning = true;
136           } else {
137             // update 'timeNextSync' for periodic sync
138             timeNextSync.getAndAdd(taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
139
140           }
141
142           LOG.info(AaiUiMsgs.INFO_GENERIC, "SyncTask, starting syncrhonization");
143
144           syncController.performAction(SyncActions.SYNCHRONIZE);
145
146           while (syncController.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
147             Thread.sleep(1000);
148           }
149
150         } else {
151           LOG.info(AaiUiMsgs.SKIP_PERIODIC_SYNC_AS_SYNC_DIDNT_FINISH, sdf.format(opStartTime)
152               .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
153
154           return;
155         }
156
157         long opEndTime = System.currentTimeMillis();
158
159         if (isInitialSync) {
160           /*
161            * Handle corner case when start-up sync operation overlapped with a scheduled
162            * sync-start-time. Note that the scheduled sync does nothing if 'initialSyncRunning' is
163            * TRUE. So the actual next-sync is one more sync-cycle away
164            */
165           long knownNextSyncTime = timeNextSync.get();
166           if (knownNextSyncTime != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS
167               && opEndTime > knownNextSyncTime) {
168             timeNextSync.compareAndSet(knownNextSyncTime,
169                 knownNextSyncTime + taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
170             initialSyncRunning = false;
171           }
172         }
173
174         String durationMessage =
175             String.format(syncController.getControllerName() + " synchronization took '%d' ms.",
176                 (opEndTime - opStartTime));
177
178         LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
179
180         // Provide log about the time for next synchronization
181         if (syncConfig.isConfigOkForPeriodicSync()
182             && timeNextSync.get() != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
183           TimeZone tz = TimeZone.getTimeZone(syncConfig.getSyncTaskStartTimeTimeZone());
184           sdf.setTimeZone(tz);
185           if (opEndTime - opStartTime > taskFrequencyInDays
186               * SynchronizerConstants.MILLISEC_IN_A_DAY) {
187             String durationWasLongerMessage = String.format(
188                 syncController.getControllerName()
189                     + " synchronization took '%d' ms which is larger than"
190                     + " synchronization interval of '%d' ms.",
191                 (opEndTime - opStartTime),
192                 taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
193
194             LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
195           }
196
197           LOG.info(AaiUiMsgs.SYNC_TO_BEGIN, syncController.getControllerName(),
198               sdf.format(timeNextSync).replaceAll(SynchronizerConstants.TIME_STD,
199                   SynchronizerConstants.TIME_CONFIG_STD));
200         }
201
202       } catch (Exception exc) {
203         String message = "Caught an exception while attempt to synchronize elastic search "
204             + "with an error cause = " + ErrorUtil.extractStackTraceElements(5, exc);
205         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
206       }
207
208     }
209
210   }
211
212   /**
213    * The Class HistoricalEntityCountSummaryTask.
214    */
215   private class HistoricalEntityCountSummaryTask implements Runnable {
216
217     /**
218      * Instantiates a new historical entity count summary task.
219      */
220     public HistoricalEntityCountSummaryTask() {}
221
222     /*
223      * (non-Javadoc)
224      * 
225      * @see java.lang.Runnable#run()
226      */
227     @Override
228     public void run() {
229
230       long opStartTime = System.currentTimeMillis();
231       MDC.setContextMap(contextMap);
232       LOG.info(AaiUiMsgs.HISTORICAL_ENTITY_COUNT_SUMMARIZER_STARTING, sdf.format(opStartTime)
233           .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
234
235       try {
236         if (entityCounterHistorySummarizer == null) {
237           LOG.error(AaiUiMsgs.HISTORICAL_ENTITY_COUNT_SUMMARIZER_NOT_STARTED);
238           return;
239         }
240
241         LOG.info(AaiUiMsgs.INFO_GENERIC,
242             "EntityCounterHistorySummarizer, starting syncrhonization");
243
244         entityCounterHistorySummarizer.performAction(SyncActions.SYNCHRONIZE);
245
246         while (entityCounterHistorySummarizer
247             .getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
248           Thread.sleep(1000);
249         }
250
251         long opEndTime = System.currentTimeMillis();
252
253         LOG.info(AaiUiMsgs.HISTORICAL_SYNC_DURATION,
254             entityCounterHistorySummarizer.getControllerName(),
255             String.valueOf(opEndTime - opStartTime));
256
257         long taskFrequencyInMs =
258             syncConfig.getHistoricalEntitySummarizedFrequencyInMinutes() * 60 * 1000;
259
260         if (syncConfig.isHistoricalEntitySummarizerEnabled()) {
261           String time = sdf.format(System.currentTimeMillis() + taskFrequencyInMs)
262               .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD);
263
264           LOG.info(AaiUiMsgs.HISTORICAL_SYNC_TO_BEGIN, time);
265         }
266
267
268       } catch (Exception exc) {
269         String message = "Caught an exception while attempting to populate entity country "
270             + "history elasticsearch table with an error cause = "
271             + ErrorUtil.extractStackTraceElements(5, exc);
272         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
273       }
274
275     }
276
277   }
278
279   /**
280    * Gets the first sync time.
281    *
282    * @param calendar the calendar
283    * @param timeNow the time now
284    * @param taskFreqInDay the task freq in day
285    * @return the first sync time
286    */
287   public long getFirstSyncTime(Calendar calendar, long timeNow, int taskFreqInDay) {
288     if (taskFreqInDay == SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
289       return SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS;
290     } else if (timeNow > calendar.getTimeInMillis()) {
291       calendar.add(Calendar.DAY_OF_MONTH, taskFreqInDay);
292     }
293     return calendar.getTimeInMillis();
294   }
295
296   /**
297    * Boot strap and configure the moving pieces of the Sync Controller.
298    */
299
300   private void initializeSyncController() {
301
302     try {
303
304       /*
305        * TODO: it would be nice to have XML IoC / dependency injection kind of thing for these
306        * pieces maybe Spring?
307        */
308
309       /*
310        * Sync Controller itself
311        */
312
313       syncController = new SyncController("entitySyncController");
314
315       /*
316        * Create common elements
317        */
318
319       ActiveInventoryAdapter aaiAdapter = new ActiveInventoryAdapter(new RestClientBuilder());
320       ActiveInventoryRestConfig aaiRestConfig =
321           ActiveInventoryConfig.getConfig().getAaiRestConfig();
322
323
324       EntityCache cache = null;
325
326       if (aaiRestConfig.isCacheEnabled()) {
327         cache = new PersistentEntityCache(aaiRestConfig.getStorageFolderOverride(),
328             aaiRestConfig.getNumCacheWorkers());
329       } else {
330         cache = new InMemoryEntityCache();
331       }
332
333       RestClientBuilder clientBuilder = new RestClientBuilder();
334
335       aaiAdapter.setCacheEnabled(true);
336       aaiAdapter.setEntityCache(cache);
337
338       clientBuilder.setUseHttps(false);
339
340       RestfulDataAccessor nonCachingRestProvider = new RestfulDataAccessor(clientBuilder);
341
342       ElasticSearchConfig esConfig = ElasticSearchConfig.getConfig();
343       ElasticSearchAdapter esAdapter = new ElasticSearchAdapter(nonCachingRestProvider, esConfig);
344
345       /*
346        * Register Index Validators
347        */
348
349       IndexIntegrityValidator entitySearchIndexValidator =
350           new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getIndexName(),
351               esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
352               esConfig.buildElasticSearchTableConfig());
353
354       syncController.registerIndexValidator(entitySearchIndexValidator);
355
356       // TODO: Insert IndexValidator for TopographicalEntityIndex
357       // we should have one, but one isn't 100% required as none of the fields are analyzed
358
359       /*
360        * Register Synchronizers
361        */
362
363       SearchableEntitySynchronizer ses = new SearchableEntitySynchronizer(esConfig.getIndexName());
364       ses.setAaiDataProvider(aaiAdapter);
365       ses.setEsDataProvider(esAdapter);
366       syncController.registerEntitySynchronizer(ses);
367
368       CrossEntityReferenceSynchronizer cers = new CrossEntityReferenceSynchronizer(
369           esConfig.getIndexName(), ActiveInventoryConfig.getConfig());
370       cers.setAaiDataProvider(aaiAdapter);
371       cers.setEsDataProvider(esAdapter);
372       syncController.registerEntitySynchronizer(cers);
373
374       GeoSynchronizer geo = new GeoSynchronizer(esConfig.getTopographicalSearchIndex());
375       geo.setAaiDataProvider(aaiAdapter);
376       geo.setEsDataProvider(esAdapter);
377       syncController.registerEntitySynchronizer(geo);
378
379       if (syncConfig.isAutosuggestSynchronizationEnabled()) {
380         initAutoSuggestionSynchronizer(esConfig, aaiAdapter, esAdapter, nonCachingRestProvider);
381         initAggregationSynchronizer(esConfig, aaiAdapter, esAdapter, nonCachingRestProvider);
382       }
383
384       /*
385        * Register Cleaners
386        */
387
388       IndexCleaner searchableIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
389           esConfig.getIndexName(), esConfig.getType(), esConfig.getIpAddress(),
390           esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
391           syncConfig.getNumScrollContextItemsToRetrievePerRequest());
392
393       syncController.registerIndexCleaner(searchableIndexCleaner);
394
395       IndexCleaner geoIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
396           esConfig.getTopographicalSearchIndex(), esConfig.getType(), esConfig.getIpAddress(),
397           esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
398           syncConfig.getNumScrollContextItemsToRetrievePerRequest());
399
400       syncController.registerIndexCleaner(geoIndexCleaner);
401
402
403     } catch (Exception exc) {
404       String message = "Error: failed to sync with message = " + exc.getMessage();
405       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
406     }
407
408   }
409
410   /**
411    * Inits the entity counter history summarizer.
412    */
413   private void initEntityCounterHistorySummarizer() {
414
415     LOG.info(AaiUiMsgs.INFO_GENERIC, "initEntityCounterHistorySummarizer");
416
417     try {
418       entityCounterHistorySummarizer = new SyncController("entityCounterHistorySummarizer");
419
420       ActiveInventoryAdapter aaiAdapter = new ActiveInventoryAdapter(new RestClientBuilder());
421       aaiAdapter.setCacheEnabled(false);
422
423       RestClientBuilder clientBuilder = new RestClientBuilder();
424       clientBuilder.setUseHttps(false);
425
426       RestfulDataAccessor nonCachingRestProvider = new RestfulDataAccessor(clientBuilder);
427       ElasticSearchConfig esConfig = ElasticSearchConfig.getConfig();
428       ElasticSearchAdapter esAdapter = new ElasticSearchAdapter(nonCachingRestProvider, esConfig);
429
430       IndexIntegrityValidator entityCounterHistoryValidator =
431           new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getEntityCountHistoryIndex(),
432               esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
433               esConfig.buildElasticSearchEntityCountHistoryTableConfig());
434
435       entityCounterHistorySummarizer.registerIndexValidator(entityCounterHistoryValidator);
436
437       HistoricalEntitySummarizer historicalSummarizer =
438           new HistoricalEntitySummarizer(esConfig.getEntityCountHistoryIndex());
439       historicalSummarizer.setAaiDataProvider(aaiAdapter);
440       historicalSummarizer.setEsDataProvider(esAdapter);
441       //entityCounterHistorySummarizer.registerEntitySynchronizer(historicalSummarizer);
442
443     } catch (Exception exc) {
444       String message = "Error: failed to sync with message = " + exc.getMessage();
445       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
446     }
447   }
448
449   private List<String> getAutosuggestableEntitiesFromOXM() {
450     Map<String, OxmEntityDescriptor> map = oxmModelLoader.getSuggestionSearchEntityDescriptors();
451     List<String> suggestableEntities = new ArrayList<String>();
452     
453     for (String entity: map.keySet()){
454       suggestableEntities.add(entity);
455     }
456     return suggestableEntities;
457   }
458
459   /**
460    * Initialize the AutosuggestionSynchronizer and 
461    * AggregationSuggestionSynchronizer
462    * 
463    * @param esConfig
464    * @param aaiAdapter
465    * @param esAdapter
466    * @param nonCachingRestProvider
467    */
468   private void initAutoSuggestionSynchronizer(ElasticSearchConfig esConfig,
469       ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
470       RestfulDataAccessor nonCachingRestProvider) {
471     LOG.info(AaiUiMsgs.INFO_GENERIC, "initAutoSuggestionSynchronizer");
472
473     // Initialize for entityautosuggestindex
474     try {
475       IndexIntegrityValidator autoSuggestionIndexValidator =
476           new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getAutosuggestIndexname(),
477               esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
478               esConfig.buildAutosuggestionTableConfig());
479
480       syncController.registerIndexValidator(autoSuggestionIndexValidator);
481
482       AutosuggestionSynchronizer suggestionSynchronizer =
483           new AutosuggestionSynchronizer(esConfig.getAutosuggestIndexname());
484       suggestionSynchronizer.setAaiDataProvider(aaiAdapter);
485       suggestionSynchronizer.setEsDataProvider(esAdapter);
486       syncController.registerEntitySynchronizer(suggestionSynchronizer);
487       
488       AggregationSuggestionSynchronizer aggregationSuggestionSynchronizer =
489           new AggregationSuggestionSynchronizer(esConfig.getAutosuggestIndexname());
490       aggregationSuggestionSynchronizer.setEsDataProvider(esAdapter);
491       syncController.registerEntitySynchronizer(aggregationSuggestionSynchronizer);
492
493       IndexCleaner autosuggestIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
494           esConfig.getAutosuggestIndexname(), esConfig.getType(), esConfig.getIpAddress(),
495           esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
496           syncConfig.getNumScrollContextItemsToRetrievePerRequest());
497
498       syncController.registerIndexCleaner(autosuggestIndexCleaner);
499     } catch (Exception exc) {
500       String message = "Error: failed to sync with message = " + exc.getMessage();
501       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
502     }
503   }
504   
505   /**
506    * Initialize the AggregationSynchronizer
507    * 
508    * @param esConfig
509    * @param aaiAdapter
510    * @param esAdapter
511    * @param nonCachingRestProvider
512    */
513   private void initAggregationSynchronizer(ElasticSearchConfig esConfig,
514       ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
515       RestfulDataAccessor nonCachingRestProvider) {
516     LOG.info(AaiUiMsgs.INFO_GENERIC, "initAggregationSynchronizer");
517
518     List<String> aggregationEntities = getAutosuggestableEntitiesFromOXM();
519
520     // For each index: create an IndexValidator, a Synchronizer, and an IndexCleaner
521     for (String entity : aggregationEntities) {
522       try {
523         String indexName = TierSupportUiConstants.getAggregationIndexName(entity);
524
525         IndexIntegrityValidator aggregationIndexValidator = new IndexIntegrityValidator(
526             nonCachingRestProvider, indexName, esConfig.getType(), esConfig.getIpAddress(),
527             esConfig.getHttpPort(), esConfig.buildAggregationTableConfig());
528
529         syncController.registerIndexValidator(aggregationIndexValidator);
530
531         /*
532          * TODO: This per-entity-synchronizer approach will eventually result in AAI / ES overload
533          * because of the existing dedicated thread pools for ES + AAI operations within the
534          * synchronizer. If we had 50 types to sync then the thread pools within each Synchronizer
535          * would cause some heartburn as there would be hundreds of threads trying to talk to AAI.
536          * Given that we our running out of time, let's make sure we can get it functional and then
537          * we'll re-visit.
538          */
539         AggregationSynchronizer aggSynchronizer = new AggregationSynchronizer(entity, indexName);
540         aggSynchronizer.setAaiDataProvider(aaiAdapter);
541         aggSynchronizer.setEsDataProvider(esAdapter);
542         syncController.registerEntitySynchronizer(aggSynchronizer);
543
544         IndexCleaner entityDataIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
545             indexName, esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
546             syncConfig.getScrollContextTimeToLiveInMinutes(),
547             syncConfig.getNumScrollContextItemsToRetrievePerRequest());
548
549         syncController.registerIndexCleaner(entityDataIndexCleaner);
550
551       } catch (Exception exc) {
552         String message = "Error: failed to sync with message = " + exc.getMessage();
553         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
554       }
555     }
556   }
557
558   /**
559    * Instantiates a new sync helper.
560    *
561    * @param loader the loader
562    */
563   public SyncHelper(OxmModelLoader loader) {
564     try {
565       this.contextMap = MDC.getCopyOfContextMap();
566       this.syncConfig = SynchronizerConfiguration.getConfig();
567       this.esConfig = ElasticSearchConfig.getConfig();
568       this.oxmModelLoader = loader;
569
570       UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
571
572         @Override
573         public void uncaughtException(Thread thread, Throwable exc) {
574           LOG.error(AaiUiMsgs.ERROR_GENERIC, thread.getName() + ": " + exc);
575         }
576       };
577
578       ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("SyncHelper-%d")
579           .setUncaughtExceptionHandler(uncaughtExceptionHandler).build();
580
581       periodicExecutor = Executors.newScheduledThreadPool(3, namedThreadFactory);
582
583       /*
584        * We only want to initialize the synchronizer if sync has been configured to start
585        */
586       if (syncConfig.isConfigOkForStartupSync() || syncConfig.isConfigOkForPeriodicSync()) {
587         initializeSyncController();
588       }
589       
590       if (syncConfig.isHistoricalEntitySummarizerEnabled()) {
591         initEntityCounterHistorySummarizer(); 
592       } else { 
593         LOG.info(AaiUiMsgs.INFO_GENERIC, "history summarizer disabled"); 
594       }
595        
596
597       // schedule startup synchronization
598       if (syncConfig.isConfigOkForStartupSync()) {
599
600         long taskInitialDelayInMs = syncConfig.getSyncTaskInitialDelayInMs();
601         if (taskInitialDelayInMs != SynchronizerConstants.DELAY_NO_STARTUP_SYNC_IN_MS) {
602           oneShotExecutor.schedule(new SyncTask(true), taskInitialDelayInMs, TimeUnit.MILLISECONDS);
603           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine startup synchronization is enabled.");
604         } else {
605           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine startup synchronization is disabled.");
606         }
607       }
608
609       // schedule periodic synchronization
610       if (syncConfig.isConfigOkForPeriodicSync()) {
611
612         TimeZone tz = TimeZone.getTimeZone(syncConfig.getSyncTaskStartTimeTimeZone());
613         Calendar calendar = Calendar.getInstance(tz);
614         sdf.setTimeZone(tz);
615
616         calendar.set(Calendar.HOUR_OF_DAY, syncConfig.getSyncTaskStartTimeHr());
617         calendar.set(Calendar.MINUTE, syncConfig.getSyncTaskStartTimeMin());
618         calendar.set(Calendar.SECOND, syncConfig.getSyncTaskStartTimeSec());
619
620         long timeCurrent = calendar.getTimeInMillis();
621         int taskFrequencyInDay = syncConfig.getSyncTaskFrequencyInDay();
622         timeNextSync.getAndSet(getFirstSyncTime(calendar, timeCurrent, taskFrequencyInDay));
623
624         long delayUntilFirstRegSyncInMs = 0;
625         delayUntilFirstRegSyncInMs = timeNextSync.get() - timeCurrent;
626
627         // Do all calculation in milliseconds
628         long taskFreqencyInMs = taskFrequencyInDay * SynchronizerConstants.MILLISEC_IN_A_DAY;
629
630         if (taskFreqencyInMs != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
631           periodicExecutor.scheduleAtFixedRate(new SyncTask(false), delayUntilFirstRegSyncInMs,
632               taskFreqencyInMs, TimeUnit.MILLISECONDS);
633           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine periodic synchronization is enabled.");
634           // case: when - startup sync is misconfigured or is disabled
635           // - give a clue to user when is the next periodic sync
636           if (!syncConfig.isConfigOkForStartupSync()
637               || syncConfig.isConfigDisabledForInitialSync()) {
638             LOG.info(AaiUiMsgs.SYNC_TO_BEGIN, syncController.getControllerName(),
639                 sdf.format(timeNextSync).replaceAll(SynchronizerConstants.TIME_STD,
640                     SynchronizerConstants.TIME_CONFIG_STD));
641           }
642         } else {
643           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine periodic synchronization is disabled.");
644         }
645       }
646
647       // schedule periodic synchronization
648       if (syncConfig.isHistoricalEntitySummarizerEnabled()) {
649         scheduleHistoricalCounterSyncTask();
650       }
651
652     } catch (Exception exc) {
653       String message = "Caught an exception while starting up the SyncHelper. Error cause = \n"
654           + ErrorUtil.extractStackTraceElements(5, exc);
655       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
656     }
657   }
658
659   /**
660    * Schedule historical counter sync task.
661    */
662   private void scheduleHistoricalCounterSyncTask() {
663     long taskFrequencyInMs =
664         syncConfig.getHistoricalEntitySummarizedFrequencyInMinutes() * 60 * 1000;
665     historicalExecutor.scheduleWithFixedDelay(new HistoricalEntityCountSummaryTask(), 0,
666         taskFrequencyInMs, TimeUnit.MILLISECONDS);
667     LOG.info(AaiUiMsgs.INFO_GENERIC,
668         "Historical Entity Count Summarizer synchronization is enabled.");
669   }
670
671   /**
672    * Shutdown.
673    */
674   public void shutdown() {
675
676     if (oneShotExecutor != null) {
677       oneShotExecutor.shutdown();
678     }
679
680     if (periodicExecutor != null) {
681       periodicExecutor.shutdown();
682     }
683
684     if (historicalExecutor != null) {
685       historicalExecutor.shutdown();
686     }
687
688     if (syncController != null) {
689       syncController.shutdown();
690     }
691
692     if (entityCounterHistorySummarizer != null) {
693       entityCounterHistorySummarizer.shutdown();
694     }
695
696   }
697
698   public OxmModelLoader getOxmModelLoader() {
699     return oxmModelLoader;
700   }
701
702   public void setOxmModelLoader(OxmModelLoader oxmModelLoader) {
703     this.oxmModelLoader = oxmModelLoader;
704   }
705 }