002bc5829ce48694682980f288c0961c32e5d591
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / SyncHelper.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.openecomp.sparky.synchronizer;
24
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26
27 import java.lang.Thread.UncaughtExceptionHandler;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Calendar;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.TimeZone;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 import org.openecomp.cl.api.Logger;
41 import org.openecomp.cl.eelf.LoggerFactory;
42 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
43 import org.openecomp.sparky.config.oxm.OxmModelLoader;
44 import org.openecomp.sparky.dal.aai.ActiveInventoryAdapter;
45 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
46 import org.openecomp.sparky.dal.aai.config.ActiveInventoryRestConfig;
47 import org.openecomp.sparky.dal.cache.EntityCache;
48 import org.openecomp.sparky.dal.cache.InMemoryEntityCache;
49 import org.openecomp.sparky.dal.cache.PersistentEntityCache;
50 import org.openecomp.sparky.dal.elasticsearch.ElasticSearchAdapter;
51 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
52 import org.openecomp.sparky.dal.rest.RestClientBuilder;
53 import org.openecomp.sparky.dal.rest.RestfulDataAccessor;
54 import org.openecomp.sparky.logging.AaiUiMsgs;
55 import org.openecomp.sparky.synchronizer.SyncController.SyncActions;
56 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
57 import org.openecomp.sparky.synchronizer.config.SynchronizerConstants;
58 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
59 import org.openecomp.sparky.util.ErrorUtil;
60 import org.openecomp.sparky.viewandinspect.config.TierSupportUiConstants;
61 import org.slf4j.MDC;
62
63 /**
64  * The Class SyncHelper.
65  *
66  * @author davea.
67  */
68 public class SyncHelper {
69
70   private final Logger LOG = LoggerFactory.getInstance().getLogger(SyncHelper.class);
71   private SyncController syncController = null;
72   private SyncController entityCounterHistorySummarizer = null;
73
74   private ScheduledExecutorService oneShotExecutor = Executors.newSingleThreadScheduledExecutor();
75   private ScheduledExecutorService periodicExecutor = null;
76   private ScheduledExecutorService historicalExecutor =
77       Executors.newSingleThreadScheduledExecutor();
78
79   private SynchronizerConfiguration syncConfig;
80   private ElasticSearchConfig esConfig;
81   private OxmModelLoader oxmModelLoader;
82
83   private Boolean initialSyncRunning = false;
84   private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
85   private AtomicLong timeNextSync = new AtomicLong();
86   Map<String, String> contextMap;
87
88   /**
89    * The Class SyncTask.
90    */
91   private class SyncTask implements Runnable {
92
93     private boolean isInitialSync;
94
95     /**
96      * Instantiates a new sync task.
97      *
98      * @param initialSync the initial sync
99      */
100     public SyncTask(boolean initialSync) {
101       this.isInitialSync = initialSync;
102     }
103
104     /*
105      * (non-Javadoc)
106      * 
107      * @see java.lang.Runnable#run()
108      */
109     @Override
110     public void run() {
111       long opStartTime = System.currentTimeMillis();
112       MDC.setContextMap(contextMap);
113
114       LOG.info(AaiUiMsgs.SEARCH_ENGINE_SYNC_STARTED, sdf.format(opStartTime)
115           .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
116
117       try {
118
119         if (syncController == null) {
120           LOG.error(AaiUiMsgs.SYNC_SKIPPED_SYNCCONTROLLER_NOT_INITIALIZED);
121           return;
122         }
123
124         int taskFrequencyInDays = SynchronizerConfiguration.getConfig().getSyncTaskFrequencyInDay();
125
126         /*
127          * Do nothing if the initial start-up sync hasn't finished yet, but the regular sync
128          * scheduler fired up a regular sync.
129          */
130         if (!initialSyncRunning) {
131           if (isInitialSync) {
132             initialSyncRunning = true;
133           } else {
134             // update 'timeNextSync' for periodic sync
135             timeNextSync.getAndAdd(taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
136
137           }
138
139           LOG.info(AaiUiMsgs.INFO_GENERIC, "SyncTask, starting syncrhonization");
140
141           syncController.performAction(SyncActions.SYNCHRONIZE);
142
143           while (syncController.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
144             Thread.sleep(1000);
145           }
146
147         } else {
148           LOG.info(AaiUiMsgs.SKIP_PERIODIC_SYNC_AS_SYNC_DIDNT_FINISH, sdf.format(opStartTime)
149               .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
150
151           return;
152         }
153
154         long opEndTime = System.currentTimeMillis();
155
156         if (isInitialSync) {
157           /*
158            * Handle corner case when start-up sync operation overlapped with a scheduled
159            * sync-start-time. Note that the scheduled sync does nothing if 'initialSyncRunning' is
160            * TRUE. So the actual next-sync is one more sync-cycle away
161            */
162           long knownNextSyncTime = timeNextSync.get();
163           if (knownNextSyncTime != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS
164               && opEndTime > knownNextSyncTime) {
165             timeNextSync.compareAndSet(knownNextSyncTime,
166                 knownNextSyncTime + taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
167             initialSyncRunning = false;
168           }
169         }
170
171         String durationMessage =
172             String.format(syncController.getControllerName() + " synchronization took '%d' ms.",
173                 (opEndTime - opStartTime));
174
175         LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
176
177         // Provide log about the time for next synchronization
178         if (syncConfig.isConfigOkForPeriodicSync()
179             && timeNextSync.get() != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
180           TimeZone tz = TimeZone.getTimeZone(syncConfig.getSyncTaskStartTimeTimeZone());
181           sdf.setTimeZone(tz);
182           if (opEndTime - opStartTime > taskFrequencyInDays
183               * SynchronizerConstants.MILLISEC_IN_A_DAY) {
184             String durationWasLongerMessage = String.format(
185                 syncController.getControllerName()
186                     + " synchronization took '%d' ms which is larger than"
187                     + " synchronization interval of '%d' ms.",
188                 (opEndTime - opStartTime),
189                 taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
190
191             LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
192           }
193
194           LOG.info(AaiUiMsgs.SYNC_TO_BEGIN, syncController.getControllerName(),
195               sdf.format(timeNextSync).replaceAll(SynchronizerConstants.TIME_STD,
196                   SynchronizerConstants.TIME_CONFIG_STD));
197         }
198
199       } catch (Exception exc) {
200         String message = "Caught an exception while attempt to synchronize elastic search "
201             + "with an error cause = " + ErrorUtil.extractStackTraceElements(5, exc);
202         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
203       }
204
205     }
206
207   }
208
209   /**
210    * The Class HistoricalEntityCountSummaryTask.
211    */
212   private class HistoricalEntityCountSummaryTask implements Runnable {
213
214     /**
215      * Instantiates a new historical entity count summary task.
216      */
217     public HistoricalEntityCountSummaryTask() {}
218
219     /*
220      * (non-Javadoc)
221      * 
222      * @see java.lang.Runnable#run()
223      */
224     @Override
225     public void run() {
226
227       long opStartTime = System.currentTimeMillis();
228       MDC.setContextMap(contextMap);
229       LOG.info(AaiUiMsgs.HISTORICAL_ENTITY_COUNT_SUMMARIZER_STARTING, sdf.format(opStartTime)
230           .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
231
232       try {
233         if (entityCounterHistorySummarizer == null) {
234           LOG.error(AaiUiMsgs.HISTORICAL_ENTITY_COUNT_SUMMARIZER_NOT_STARTED);
235           return;
236         }
237
238         LOG.info(AaiUiMsgs.INFO_GENERIC,
239             "EntityCounterHistorySummarizer, starting syncrhonization");
240
241         entityCounterHistorySummarizer.performAction(SyncActions.SYNCHRONIZE);
242
243         while (entityCounterHistorySummarizer
244             .getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
245           Thread.sleep(1000);
246         }
247
248         long opEndTime = System.currentTimeMillis();
249
250         LOG.info(AaiUiMsgs.HISTORICAL_SYNC_DURATION,
251             entityCounterHistorySummarizer.getControllerName(),
252             String.valueOf(opEndTime - opStartTime));
253
254         long taskFrequencyInMs =
255             syncConfig.getHistoricalEntitySummarizedFrequencyInMinutes() * 60 * 1000;
256
257         if (syncConfig.isHistoricalEntitySummarizerEnabled()) {
258           String time = sdf.format(System.currentTimeMillis() + taskFrequencyInMs)
259               .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD);
260
261           LOG.info(AaiUiMsgs.HISTORICAL_SYNC_TO_BEGIN, time);
262         }
263
264
265       } catch (Exception exc) {
266         String message = "Caught an exception while attempting to populate entity country "
267             + "history elasticsearch table with an error cause = "
268             + ErrorUtil.extractStackTraceElements(5, exc);
269         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
270       }
271
272     }
273
274   }
275
276   /**
277    * Gets the first sync time.
278    *
279    * @param calendar the calendar
280    * @param timeNow the time now
281    * @param taskFreqInDay the task freq in day
282    * @return the first sync time
283    */
284   public long getFirstSyncTime(Calendar calendar, long timeNow, int taskFreqInDay) {
285     if (taskFreqInDay == SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
286       return SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS;
287     } else if (timeNow > calendar.getTimeInMillis()) {
288       calendar.add(Calendar.DAY_OF_MONTH, taskFreqInDay);
289     }
290     return calendar.getTimeInMillis();
291   }
292
293   /**
294    * Boot strap and configure the moving pieces of the Sync Controller.
295    */
296
297   private void initializeSyncController() {
298
299     try {
300
301       /*
302        * TODO: it would be nice to have XML IoC / dependency injection kind of thing for these
303        * pieces maybe Spring?
304        */
305
306       /*
307        * Sync Controller itself
308        */
309
310       syncController = new SyncController("entitySyncController");
311
312       /*
313        * Create common elements
314        */
315
316       ActiveInventoryAdapter aaiAdapter = new ActiveInventoryAdapter(new RestClientBuilder());
317       ActiveInventoryRestConfig aaiRestConfig =
318           ActiveInventoryConfig.getConfig().getAaiRestConfig();
319
320
321       EntityCache cache = null;
322
323       if (aaiRestConfig.isCacheEnabled()) {
324         cache = new PersistentEntityCache(aaiRestConfig.getStorageFolderOverride(),
325             aaiRestConfig.getNumCacheWorkers());
326       } else {
327         cache = new InMemoryEntityCache();
328       }
329
330       RestClientBuilder clientBuilder = new RestClientBuilder();
331
332       aaiAdapter.setCacheEnabled(true);
333       aaiAdapter.setEntityCache(cache);
334
335       clientBuilder.setUseHttps(false);
336
337       RestfulDataAccessor nonCachingRestProvider = new RestfulDataAccessor(clientBuilder);
338
339       ElasticSearchConfig esConfig = ElasticSearchConfig.getConfig();
340       ElasticSearchAdapter esAdapter = new ElasticSearchAdapter(nonCachingRestProvider, esConfig);
341
342       /*
343        * Register Index Validators
344        */
345
346       IndexIntegrityValidator entitySearchIndexValidator =
347           new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getIndexName(),
348               esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
349               esConfig.buildElasticSearchTableConfig());
350
351       syncController.registerIndexValidator(entitySearchIndexValidator);
352
353       // TODO: Insert IndexValidator for TopographicalEntityIndex
354       // we should have one, but one isn't 100% required as none of the fields are analyzed
355
356       /*
357        * Register Synchronizers
358        */
359
360       SearchableEntitySynchronizer ses = new SearchableEntitySynchronizer(esConfig.getIndexName());
361       ses.setAaiDataProvider(aaiAdapter);
362       ses.setEsDataProvider(esAdapter);
363       syncController.registerEntitySynchronizer(ses);
364
365       CrossEntityReferenceSynchronizer cers = new CrossEntityReferenceSynchronizer(
366           esConfig.getIndexName(), ActiveInventoryConfig.getConfig());
367       cers.setAaiDataProvider(aaiAdapter);
368       cers.setEsDataProvider(esAdapter);
369       syncController.registerEntitySynchronizer(cers);
370
371       GeoSynchronizer geo = new GeoSynchronizer(esConfig.getTopographicalSearchIndex());
372       geo.setAaiDataProvider(aaiAdapter);
373       geo.setEsDataProvider(esAdapter);
374       syncController.registerEntitySynchronizer(geo);
375
376       if (syncConfig.isAutosuggestSynchronizationEnabled()) {
377         initAutoSuggestionSynchronizer(esConfig, aaiAdapter, esAdapter, nonCachingRestProvider);
378         initAggregationSynchronizer(esConfig, aaiAdapter, esAdapter, nonCachingRestProvider);
379       }
380
381       /*
382        * Register Cleaners
383        */
384
385       IndexCleaner searchableIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
386           esConfig.getIndexName(), esConfig.getType(), esConfig.getIpAddress(),
387           esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
388           syncConfig.getNumScrollContextItemsToRetrievePerRequest());
389
390       syncController.registerIndexCleaner(searchableIndexCleaner);
391
392       IndexCleaner geoIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
393           esConfig.getTopographicalSearchIndex(), esConfig.getType(), esConfig.getIpAddress(),
394           esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
395           syncConfig.getNumScrollContextItemsToRetrievePerRequest());
396
397       syncController.registerIndexCleaner(geoIndexCleaner);
398
399
400     } catch (Exception exc) {
401       String message = "Error: failed to sync with message = " + exc.getMessage();
402       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
403     }
404
405   }
406
407   /**
408    * Inits the entity counter history summarizer.
409    */
410   private void initEntityCounterHistorySummarizer() {
411
412     LOG.info(AaiUiMsgs.INFO_GENERIC, "initEntityCounterHistorySummarizer");
413
414     try {
415       entityCounterHistorySummarizer = new SyncController("entityCounterHistorySummarizer");
416
417       ActiveInventoryAdapter aaiAdapter = new ActiveInventoryAdapter(new RestClientBuilder());
418       aaiAdapter.setCacheEnabled(false);
419
420       RestClientBuilder clientBuilder = new RestClientBuilder();
421       clientBuilder.setUseHttps(false);
422
423       RestfulDataAccessor nonCachingRestProvider = new RestfulDataAccessor(clientBuilder);
424       ElasticSearchConfig esConfig = ElasticSearchConfig.getConfig();
425       ElasticSearchAdapter esAdapter = new ElasticSearchAdapter(nonCachingRestProvider, esConfig);
426
427       IndexIntegrityValidator entityCounterHistoryValidator =
428           new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getEntityCountHistoryIndex(),
429               esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
430               esConfig.buildElasticSearchEntityCountHistoryTableConfig());
431
432       entityCounterHistorySummarizer.registerIndexValidator(entityCounterHistoryValidator);
433
434       HistoricalEntitySummarizer historicalSummarizer =
435           new HistoricalEntitySummarizer(esConfig.getEntityCountHistoryIndex());
436       historicalSummarizer.setAaiDataProvider(aaiAdapter);
437       historicalSummarizer.setEsDataProvider(esAdapter);
438       entityCounterHistorySummarizer.registerEntitySynchronizer(historicalSummarizer);
439
440     } catch (Exception exc) {
441       String message = "Error: failed to sync with message = " + exc.getMessage();
442       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
443     }
444   }
445
446   private List<String> getAutosuggestableEntitiesFromOXM() {
447     Map<String, OxmEntityDescriptor> map = oxmModelLoader.getSuggestionSearchEntityDescriptors();
448     List<String> suggestableEntities = new ArrayList<String>();
449     
450     for (String entity: map.keySet()){
451       suggestableEntities.add(entity);
452     }
453     return suggestableEntities;
454   }
455
456   /**
457    * Initialize the AutosuggestionSynchronizer and 
458    * AggregationSuggestionSynchronizer
459    * 
460    * @param esConfig
461    * @param aaiAdapter
462    * @param esAdapter
463    * @param nonCachingRestProvider
464    */
465   private void initAutoSuggestionSynchronizer(ElasticSearchConfig esConfig,
466       ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
467       RestfulDataAccessor nonCachingRestProvider) {
468     LOG.info(AaiUiMsgs.INFO_GENERIC, "initAutoSuggestionSynchronizer");
469
470     // Initialize for entityautosuggestindex
471     try {
472       IndexIntegrityValidator autoSuggestionIndexValidator =
473           new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getAutosuggestIndexname(),
474               esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
475               esConfig.buildAutosuggestionTableConfig());
476
477       syncController.registerIndexValidator(autoSuggestionIndexValidator);
478
479       AutosuggestionSynchronizer suggestionSynchronizer =
480           new AutosuggestionSynchronizer(esConfig.getAutosuggestIndexname());
481       suggestionSynchronizer.setAaiDataProvider(aaiAdapter);
482       suggestionSynchronizer.setEsDataProvider(esAdapter);
483       syncController.registerEntitySynchronizer(suggestionSynchronizer);
484       
485       AggregationSuggestionSynchronizer aggregationSuggestionSynchronizer =
486           new AggregationSuggestionSynchronizer(esConfig.getAutosuggestIndexname());
487       aggregationSuggestionSynchronizer.setEsDataProvider(esAdapter);
488       syncController.registerEntitySynchronizer(aggregationSuggestionSynchronizer);
489
490       IndexCleaner autosuggestIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
491           esConfig.getAutosuggestIndexname(), esConfig.getType(), esConfig.getIpAddress(),
492           esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
493           syncConfig.getNumScrollContextItemsToRetrievePerRequest());
494
495       syncController.registerIndexCleaner(autosuggestIndexCleaner);
496     } catch (Exception exc) {
497       String message = "Error: failed to sync with message = " + exc.getMessage();
498       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
499     }
500   }
501   
502   /**
503    * Initialize the AggregationSynchronizer
504    * 
505    * @param esConfig
506    * @param aaiAdapter
507    * @param esAdapter
508    * @param nonCachingRestProvider
509    */
510   private void initAggregationSynchronizer(ElasticSearchConfig esConfig,
511       ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
512       RestfulDataAccessor nonCachingRestProvider) {
513     LOG.info(AaiUiMsgs.INFO_GENERIC, "initAggregationSynchronizer");
514
515     List<String> aggregationEntities = getAutosuggestableEntitiesFromOXM();
516
517     // For each index: create an IndexValidator, a Synchronizer, and an IndexCleaner
518     for (String entity : aggregationEntities) {
519       try {
520         String indexName = TierSupportUiConstants.getAggregationIndexName(entity);
521
522         IndexIntegrityValidator aggregationIndexValidator = new IndexIntegrityValidator(
523             nonCachingRestProvider, indexName, esConfig.getType(), esConfig.getIpAddress(),
524             esConfig.getHttpPort(), esConfig.buildAggregationTableConfig());
525
526         syncController.registerIndexValidator(aggregationIndexValidator);
527
528         /*
529          * TODO: This per-entity-synchronizer approach will eventually result in AAI / ES overload
530          * because of the existing dedicated thread pools for ES + AAI operations within the
531          * synchronizer. If we had 50 types to sync then the thread pools within each Synchronizer
532          * would cause some heartburn as there would be hundreds of threads trying to talk to AAI.
533          * Given that we our running out of time, let's make sure we can get it functional and then
534          * we'll re-visit.
535          */
536         AggregationSynchronizer aggSynchronizer = new AggregationSynchronizer(entity, indexName);
537         aggSynchronizer.setAaiDataProvider(aaiAdapter);
538         aggSynchronizer.setEsDataProvider(esAdapter);
539         syncController.registerEntitySynchronizer(aggSynchronizer);
540
541         IndexCleaner entityDataIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
542             indexName, esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
543             syncConfig.getScrollContextTimeToLiveInMinutes(),
544             syncConfig.getNumScrollContextItemsToRetrievePerRequest());
545
546         syncController.registerIndexCleaner(entityDataIndexCleaner);
547
548       } catch (Exception exc) {
549         String message = "Error: failed to sync with message = " + exc.getMessage();
550         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
551       }
552     }
553   }
554
555   /**
556    * Instantiates a new sync helper.
557    *
558    * @param loader the loader
559    */
560   public SyncHelper(OxmModelLoader loader) {
561     try {
562       this.contextMap = MDC.getCopyOfContextMap();
563       this.syncConfig = SynchronizerConfiguration.getConfig();
564       this.esConfig = ElasticSearchConfig.getConfig();
565       this.oxmModelLoader = loader;
566
567       UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
568
569         @Override
570         public void uncaughtException(Thread thread, Throwable exc) {
571           LOG.error(AaiUiMsgs.ERROR_GENERIC, thread.getName() + ": " + exc);
572         }
573       };
574
575       ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("SyncHelper-%d")
576           .setUncaughtExceptionHandler(uncaughtExceptionHandler).build();
577
578       periodicExecutor = Executors.newScheduledThreadPool(3, namedThreadFactory);
579
580       /*
581        * We only want to initialize the synchronizer if sync has been configured to start
582        */
583       if (syncConfig.isConfigOkForStartupSync() || syncConfig.isConfigOkForPeriodicSync()) {
584         initializeSyncController();
585       }
586       
587       if (syncConfig.isHistoricalEntitySummarizerEnabled()) {
588         initEntityCounterHistorySummarizer(); 
589       } else { 
590         LOG.info(AaiUiMsgs.INFO_GENERIC, "history summarizer disabled"); 
591       }
592        
593
594       // schedule startup synchronization
595       if (syncConfig.isConfigOkForStartupSync()) {
596
597         long taskInitialDelayInMs = syncConfig.getSyncTaskInitialDelayInMs();
598         if (taskInitialDelayInMs != SynchronizerConstants.DELAY_NO_STARTUP_SYNC_IN_MS) {
599           oneShotExecutor.schedule(new SyncTask(true), taskInitialDelayInMs, TimeUnit.MILLISECONDS);
600           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine startup synchronization is enabled.");
601         } else {
602           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine startup synchronization is disabled.");
603         }
604       }
605
606       // schedule periodic synchronization
607       if (syncConfig.isConfigOkForPeriodicSync()) {
608
609         TimeZone tz = TimeZone.getTimeZone(syncConfig.getSyncTaskStartTimeTimeZone());
610         Calendar calendar = Calendar.getInstance(tz);
611         sdf.setTimeZone(tz);
612
613         calendar.set(Calendar.HOUR_OF_DAY, syncConfig.getSyncTaskStartTimeHr());
614         calendar.set(Calendar.MINUTE, syncConfig.getSyncTaskStartTimeMin());
615         calendar.set(Calendar.SECOND, syncConfig.getSyncTaskStartTimeSec());
616
617         long timeCurrent = calendar.getTimeInMillis();
618         int taskFrequencyInDay = syncConfig.getSyncTaskFrequencyInDay();
619         timeNextSync.getAndSet(getFirstSyncTime(calendar, timeCurrent, taskFrequencyInDay));
620
621         long delayUntilFirstRegSyncInMs = 0;
622         delayUntilFirstRegSyncInMs = timeNextSync.get() - timeCurrent;
623
624         // Do all calculation in milliseconds
625         long taskFreqencyInMs = taskFrequencyInDay * SynchronizerConstants.MILLISEC_IN_A_DAY;
626
627         if (taskFreqencyInMs != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
628           periodicExecutor.scheduleAtFixedRate(new SyncTask(false), delayUntilFirstRegSyncInMs,
629               taskFreqencyInMs, TimeUnit.MILLISECONDS);
630           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine periodic synchronization is enabled.");
631           // case: when - startup sync is misconfigured or is disabled
632           // - give a clue to user when is the next periodic sync
633           if (!syncConfig.isConfigOkForStartupSync()
634               || syncConfig.isConfigDisabledForInitialSync()) {
635             LOG.info(AaiUiMsgs.SYNC_TO_BEGIN, syncController.getControllerName(),
636                 sdf.format(timeNextSync).replaceAll(SynchronizerConstants.TIME_STD,
637                     SynchronizerConstants.TIME_CONFIG_STD));
638           }
639         } else {
640           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine periodic synchronization is disabled.");
641         }
642       }
643
644       // schedule periodic synchronization
645       if (syncConfig.isHistoricalEntitySummarizerEnabled()) {
646         scheduleHistoricalCounterSyncTask();
647       }
648
649     } catch (Exception exc) {
650       String message = "Caught an exception while starting up the SyncHelper. Error cause = \n"
651           + ErrorUtil.extractStackTraceElements(5, exc);
652       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
653     }
654   }
655
656   /**
657    * Schedule historical counter sync task.
658    */
659   private void scheduleHistoricalCounterSyncTask() {
660     long taskFrequencyInMs =
661         syncConfig.getHistoricalEntitySummarizedFrequencyInMinutes() * 60 * 1000;
662     historicalExecutor.scheduleWithFixedDelay(new HistoricalEntityCountSummaryTask(), 0,
663         taskFrequencyInMs, TimeUnit.MILLISECONDS);
664     LOG.info(AaiUiMsgs.INFO_GENERIC,
665         "Historical Entity Count Summarizer synchronization is enabled.");
666   }
667
668   /**
669    * Shutdown.
670    */
671   public void shutdown() {
672
673     if (oneShotExecutor != null) {
674       oneShotExecutor.shutdown();
675     }
676
677     if (periodicExecutor != null) {
678       periodicExecutor.shutdown();
679     }
680
681     if (historicalExecutor != null) {
682       historicalExecutor.shutdown();
683     }
684
685     if (syncController != null) {
686       syncController.shutdown();
687     }
688
689     if (entityCounterHistorySummarizer != null) {
690       entityCounterHistorySummarizer.shutdown();
691     }
692
693   }
694
695   public OxmModelLoader getOxmModelLoader() {
696     return oxmModelLoader;
697   }
698
699   public void setOxmModelLoader(OxmModelLoader oxmModelLoader) {
700     this.oxmModelLoader = oxmModelLoader;
701   }
702 }