Increase junit coverage
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / synchronizer / SyncHelper.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.synchronizer;
24
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26
27 import java.lang.Thread.UncaughtExceptionHandler;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Calendar;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.TimeZone;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
41 import org.onap.aai.sparky.config.oxm.OxmModelLoader;
42 import org.onap.aai.sparky.dal.aai.ActiveInventoryAdapter;
43 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
44 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryRestConfig;
45 import org.onap.aai.sparky.dal.cache.EntityCache;
46 import org.onap.aai.sparky.dal.cache.InMemoryEntityCache;
47 import org.onap.aai.sparky.dal.cache.PersistentEntityCache;
48 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchAdapter;
49 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
50 import org.onap.aai.sparky.dal.rest.RestClientBuilder;
51 import org.onap.aai.sparky.dal.rest.RestfulDataAccessor;
52 import org.onap.aai.sparky.logging.AaiUiMsgs;
53 import org.onap.aai.sparky.synchronizer.SyncController.SyncActions;
54 import org.onap.aai.sparky.synchronizer.config.SynchronizerConfiguration;
55 import org.onap.aai.sparky.synchronizer.config.SynchronizerConstants;
56 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
57 import org.onap.aai.sparky.util.ErrorUtil;
58 import org.onap.aai.sparky.viewandinspect.config.TierSupportUiConstants;
59 import org.onap.aai.cl.api.Logger;
60 import org.onap.aai.cl.eelf.LoggerFactory;
61 import org.slf4j.MDC;
62
63 /**
64  * The Class SyncHelper.
65  *
66  * @author davea.
67  */
68 public class SyncHelper {
69
70   private final Logger LOG = LoggerFactory.getInstance().getLogger(SyncHelper.class);
71   private SyncController syncController = null;
72   private SyncController entityCounterHistorySummarizer = null;
73
74   private ScheduledExecutorService oneShotExecutor = Executors.newSingleThreadScheduledExecutor();
75   private ScheduledExecutorService periodicExecutor = null;
76   private ScheduledExecutorService historicalExecutor =
77       Executors.newSingleThreadScheduledExecutor();
78
79   private SynchronizerConfiguration syncConfig;
80   private ElasticSearchConfig esConfig;
81   private OxmModelLoader oxmModelLoader;
82
83   private Boolean initialSyncRunning = false;
84   private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
85   private AtomicLong timeNextSync = new AtomicLong();
86   Map<String, String> contextMap;
87
88   /**
89    * The Class SyncTask.
90    */
91   private class SyncTask implements Runnable {
92
93     private boolean isInitialSync;
94
95     /**
96      * Instantiates a new sync task.
97      *
98      * @param initialSync the initial sync
99      */
100     public SyncTask(boolean initialSync) {
101       this.isInitialSync = initialSync;
102     }
103
104     /*
105      * (non-Javadoc)
106      * 
107      * @see java.lang.Runnable#run()
108      */
109     @Override
110     public void run() {
111       long opStartTime = System.currentTimeMillis();
112       MDC.setContextMap(contextMap);
113
114       LOG.info(AaiUiMsgs.SEARCH_ENGINE_SYNC_STARTED, sdf.format(opStartTime)
115           .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
116
117       try {
118
119         if (syncController == null) {
120           LOG.error(AaiUiMsgs.SYNC_SKIPPED_SYNCCONTROLLER_NOT_INITIALIZED);
121           return;
122         }
123
124         int taskFrequencyInDays = SynchronizerConfiguration.getConfig().getSyncTaskFrequencyInDay();
125
126         /*
127          * Do nothing if the initial start-up sync hasn't finished yet, but the regular sync
128          * scheduler fired up a regular sync.
129          */
130         if (!initialSyncRunning) {
131           if (isInitialSync) {
132             initialSyncRunning = true;
133           } else {
134             // update 'timeNextSync' for periodic sync
135             timeNextSync.getAndAdd(taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
136
137           }
138
139           LOG.info(AaiUiMsgs.INFO_GENERIC, "SyncTask, starting syncrhonization");
140
141           syncController.performAction(SyncActions.SYNCHRONIZE);
142
143           while (syncController.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
144             Thread.sleep(1000);
145           }
146
147         } else {
148           LOG.info(AaiUiMsgs.SKIP_PERIODIC_SYNC_AS_SYNC_DIDNT_FINISH, sdf.format(opStartTime)
149               .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
150
151           return;
152         }
153
154         long opEndTime = System.currentTimeMillis();
155
156         if (isInitialSync) {
157           /*
158            * Handle corner case when start-up sync operation overlapped with a scheduled
159            * sync-start-time. Note that the scheduled sync does nothing if 'initialSyncRunning' is
160            * TRUE. So the actual next-sync is one more sync-cycle away
161            */
162           long knownNextSyncTime = timeNextSync.get();
163           if (knownNextSyncTime != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS
164               && opEndTime > knownNextSyncTime) {
165             timeNextSync.compareAndSet(knownNextSyncTime,
166                 knownNextSyncTime + taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
167             initialSyncRunning = false;
168           }
169         }
170
171         String durationMessage =
172             String.format(syncController.getControllerName() + " synchronization took '%d' ms.",
173                 (opEndTime - opStartTime));
174
175         LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
176
177         // Provide log about the time for next synchronization
178         if (syncConfig.isConfigOkForPeriodicSync()
179             && timeNextSync.get() != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
180           TimeZone tz = TimeZone.getTimeZone(syncConfig.getSyncTaskStartTimeTimeZone());
181           sdf.setTimeZone(tz);
182           if (opEndTime - opStartTime > taskFrequencyInDays
183               * SynchronizerConstants.MILLISEC_IN_A_DAY) {
184             String durationWasLongerMessage = String.format(
185                 syncController.getControllerName()
186                     + " synchronization took '%d' ms which is larger than"
187                     + " synchronization interval of '%d' ms.",
188                 (opEndTime - opStartTime),
189                 taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
190
191             LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
192           }
193
194           LOG.info(AaiUiMsgs.SYNC_TO_BEGIN, syncController.getControllerName(),
195               sdf.format(timeNextSync).replaceAll(SynchronizerConstants.TIME_STD,
196                   SynchronizerConstants.TIME_CONFIG_STD));
197         }
198
199       } catch (Exception exc) {
200         String message = "Caught an exception while attempt to synchronize elastic search "
201             + "with an error cause = " + ErrorUtil.extractStackTraceElements(5, exc);
202         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
203       }
204
205     }
206
207   }
208
209
210   /**
211    * Gets the first sync time.
212    *
213    * @param calendar the calendar
214    * @param timeNow the time now
215    * @param taskFreqInDay the task freq in day
216    * @return the first sync time
217    */
218   public long getFirstSyncTime(Calendar calendar, long timeNow, int taskFreqInDay) {
219     if (taskFreqInDay == SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
220       return SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS;
221     } else if (timeNow > calendar.getTimeInMillis()) {
222       calendar.add(Calendar.DAY_OF_MONTH, taskFreqInDay);
223     }
224     return calendar.getTimeInMillis();
225   }
226
227   /**
228    * Boot strap and configure the moving pieces of the Sync Controller.
229    */
230
231   private void initializeSyncController() {
232
233     try {
234
235       /*
236        * TODO: it would be nice to have XML IoC / dependency injection kind of thing for these
237        * pieces maybe Spring?
238        */
239
240       /*
241        * Sync Controller itself
242        */
243
244       syncController = new SyncController("entitySyncController");
245
246       /*
247        * Create common elements
248        */
249
250       ActiveInventoryAdapter aaiAdapter = new ActiveInventoryAdapter(new RestClientBuilder());
251       ActiveInventoryRestConfig aaiRestConfig =
252           ActiveInventoryConfig.getConfig().getAaiRestConfig();
253
254
255       EntityCache cache = null;
256
257       if (aaiRestConfig.isCacheEnabled()) {
258         cache = new PersistentEntityCache(aaiRestConfig.getStorageFolderOverride(),
259             aaiRestConfig.getNumCacheWorkers());
260       } else {
261         cache = new InMemoryEntityCache();
262       }
263
264       RestClientBuilder clientBuilder = new RestClientBuilder();
265
266       aaiAdapter.setCacheEnabled(true);
267       aaiAdapter.setEntityCache(cache);
268
269       clientBuilder.setUseHttps(false);
270
271       RestfulDataAccessor nonCachingRestProvider = new RestfulDataAccessor(clientBuilder);
272
273       ElasticSearchConfig esConfig = ElasticSearchConfig.getConfig();
274       ElasticSearchAdapter esAdapter = new ElasticSearchAdapter(nonCachingRestProvider, esConfig);
275
276       /*
277        * Register Index Validators
278        */
279
280       IndexIntegrityValidator entitySearchIndexValidator =
281           new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getIndexName(),
282               esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
283               esConfig.buildElasticSearchTableConfig());
284
285       syncController.registerIndexValidator(entitySearchIndexValidator);
286
287       // TODO: Insert IndexValidator for TopographicalEntityIndex
288       // we should have one, but one isn't 100% required as none of the fields are analyzed
289
290       /*
291        * Register Synchronizers
292        */
293
294       SearchableEntitySynchronizer ses = new SearchableEntitySynchronizer(esConfig.getIndexName());
295       ses.setAaiDataProvider(aaiAdapter);
296       ses.setEsDataProvider(esAdapter);
297       syncController.registerEntitySynchronizer(ses);
298
299       CrossEntityReferenceSynchronizer cers = new CrossEntityReferenceSynchronizer(
300           esConfig.getIndexName(), ActiveInventoryConfig.getConfig());
301       cers.setAaiDataProvider(aaiAdapter);
302       cers.setEsDataProvider(esAdapter);
303       syncController.registerEntitySynchronizer(cers);
304
305       if (syncConfig.isAutosuggestSynchronizationEnabled()) {
306         initAutoSuggestionSynchronizer(esConfig, aaiAdapter, esAdapter, nonCachingRestProvider);
307         initAggregationSynchronizer(esConfig, aaiAdapter, esAdapter, nonCachingRestProvider);
308       }
309
310       /*
311        * Register Cleaners
312        */
313
314       IndexCleaner searchableIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
315           esConfig.getIndexName(), esConfig.getType(), esConfig.getIpAddress(),
316           esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
317           syncConfig.getNumScrollContextItemsToRetrievePerRequest());
318
319       syncController.registerIndexCleaner(searchableIndexCleaner);
320
321     } catch (Exception exc) {
322       String message = "Error: failed to sync with message = " + exc.getMessage();
323       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
324     }
325
326   }
327
328   private List<String> getAutosuggestableEntitiesFromOXM() {
329     Map<String, OxmEntityDescriptor> map = oxmModelLoader.getSuggestionSearchEntityDescriptors();
330     List<String> suggestableEntities = new ArrayList<String>();
331     
332     for (String entity: map.keySet()){
333       suggestableEntities.add(entity);
334     }
335     return suggestableEntities;
336   }
337
338   /**
339    * Initialize the AutosuggestionSynchronizer and 
340    * AggregationSuggestionSynchronizer
341    * 
342    * @param esConfig
343    * @param aaiAdapter
344    * @param esAdapter
345    * @param nonCachingRestProvider
346    */
347   private void initAutoSuggestionSynchronizer(ElasticSearchConfig esConfig,
348       ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
349       RestfulDataAccessor nonCachingRestProvider) {
350     LOG.info(AaiUiMsgs.INFO_GENERIC, "initAutoSuggestionSynchronizer");
351
352     // Initialize for entityautosuggestindex
353     try {
354       IndexIntegrityValidator autoSuggestionIndexValidator =
355           new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getAutosuggestIndexname(),
356               esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
357               esConfig.buildAutosuggestionTableConfig());
358
359       syncController.registerIndexValidator(autoSuggestionIndexValidator);
360
361       AutosuggestionSynchronizer suggestionSynchronizer =
362           new AutosuggestionSynchronizer(esConfig.getAutosuggestIndexname());
363       suggestionSynchronizer.setAaiDataProvider(aaiAdapter);
364       suggestionSynchronizer.setEsDataProvider(esAdapter);
365       syncController.registerEntitySynchronizer(suggestionSynchronizer);
366       
367       AggregationSuggestionSynchronizer aggregationSuggestionSynchronizer =
368           new AggregationSuggestionSynchronizer(esConfig.getAutosuggestIndexname());
369       aggregationSuggestionSynchronizer.setEsDataProvider(esAdapter);
370       syncController.registerEntitySynchronizer(aggregationSuggestionSynchronizer);
371
372       IndexCleaner autosuggestIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
373           esConfig.getAutosuggestIndexname(), esConfig.getType(), esConfig.getIpAddress(),
374           esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
375           syncConfig.getNumScrollContextItemsToRetrievePerRequest());
376
377       syncController.registerIndexCleaner(autosuggestIndexCleaner);
378     } catch (Exception exc) {
379       String message = "Error: failed to sync with message = " + exc.getMessage();
380       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
381     }
382   }
383   
384   /**
385    * Initialize the AggregationSynchronizer
386    * 
387    * @param esConfig
388    * @param aaiAdapter
389    * @param esAdapter
390    * @param nonCachingRestProvider
391    */
392   private void initAggregationSynchronizer(ElasticSearchConfig esConfig,
393       ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
394       RestfulDataAccessor nonCachingRestProvider) {
395     LOG.info(AaiUiMsgs.INFO_GENERIC, "initAggregationSynchronizer");
396
397     List<String> aggregationEntities = getAutosuggestableEntitiesFromOXM();
398
399     // For each index: create an IndexValidator, a Synchronizer, and an IndexCleaner
400     for (String entity : aggregationEntities) {
401       try {
402         String indexName = TierSupportUiConstants.getAggregationIndexName(entity);
403
404         IndexIntegrityValidator aggregationIndexValidator = new IndexIntegrityValidator(
405             nonCachingRestProvider, indexName, esConfig.getType(), esConfig.getIpAddress(),
406             esConfig.getHttpPort(), esConfig.buildAggregationTableConfig());
407
408         syncController.registerIndexValidator(aggregationIndexValidator);
409
410         /*
411          * TODO: This per-entity-synchronizer approach will eventually result in AAI / ES overload
412          * because of the existing dedicated thread pools for ES + AAI operations within the
413          * synchronizer. If we had 50 types to sync then the thread pools within each Synchronizer
414          * would cause some heartburn as there would be hundreds of threads trying to talk to AAI.
415          * Given that we our running out of time, let's make sure we can get it functional and then
416          * we'll re-visit.
417          */
418         AggregationSynchronizer aggSynchronizer = new AggregationSynchronizer(entity, indexName);
419         aggSynchronizer.setAaiDataProvider(aaiAdapter);
420         aggSynchronizer.setEsDataProvider(esAdapter);
421         syncController.registerEntitySynchronizer(aggSynchronizer);
422
423         IndexCleaner entityDataIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
424             indexName, esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
425             syncConfig.getScrollContextTimeToLiveInMinutes(),
426             syncConfig.getNumScrollContextItemsToRetrievePerRequest());
427
428         syncController.registerIndexCleaner(entityDataIndexCleaner);
429
430       } catch (Exception exc) {
431         String message = "Error: failed to sync with message = " + exc.getMessage();
432         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
433       }
434     }
435   }
436
437   /**
438    * Instantiates a new sync helper.
439    *
440    * @param loader the loader
441    */
442   public SyncHelper(OxmModelLoader loader) {
443     try {
444       this.contextMap = MDC.getCopyOfContextMap();
445       this.syncConfig = SynchronizerConfiguration.getConfig();
446       this.esConfig = ElasticSearchConfig.getConfig();
447       this.oxmModelLoader = loader;
448
449       UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
450
451         @Override
452         public void uncaughtException(Thread thread, Throwable exc) {
453           LOG.error(AaiUiMsgs.ERROR_GENERIC, thread.getName() + ": " + exc);
454         }
455       };
456
457       ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("SyncHelper-%d")
458           .setUncaughtExceptionHandler(uncaughtExceptionHandler).build();
459
460       periodicExecutor = Executors.newScheduledThreadPool(3, namedThreadFactory);
461
462       /*
463        * We only want to initialize the synchronizer if sync has been configured to start
464        */
465       if (syncConfig.isConfigOkForStartupSync() || syncConfig.isConfigOkForPeriodicSync()) {
466         initializeSyncController();
467       }
468
469       // schedule startup synchronization
470       if (syncConfig.isConfigOkForStartupSync()) {
471
472         long taskInitialDelayInMs = syncConfig.getSyncTaskInitialDelayInMs();
473         if (taskInitialDelayInMs != SynchronizerConstants.DELAY_NO_STARTUP_SYNC_IN_MS) {
474           oneShotExecutor.schedule(new SyncTask(true), taskInitialDelayInMs, TimeUnit.MILLISECONDS);
475           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine startup synchronization is enabled.");
476         } else {
477           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine startup synchronization is disabled.");
478         }
479       }
480
481       // schedule periodic synchronization
482       if (syncConfig.isConfigOkForPeriodicSync()) {
483
484         TimeZone tz = TimeZone.getTimeZone(syncConfig.getSyncTaskStartTimeTimeZone());
485         Calendar calendar = Calendar.getInstance(tz);
486         sdf.setTimeZone(tz);
487
488         calendar.set(Calendar.HOUR_OF_DAY, syncConfig.getSyncTaskStartTimeHr());
489         calendar.set(Calendar.MINUTE, syncConfig.getSyncTaskStartTimeMin());
490         calendar.set(Calendar.SECOND, syncConfig.getSyncTaskStartTimeSec());
491
492         long timeCurrent = calendar.getTimeInMillis();
493         int taskFrequencyInDay = syncConfig.getSyncTaskFrequencyInDay();
494         timeNextSync.getAndSet(getFirstSyncTime(calendar, timeCurrent, taskFrequencyInDay));
495
496         long delayUntilFirstRegSyncInMs = 0;
497         delayUntilFirstRegSyncInMs = timeNextSync.get() - timeCurrent;
498
499         // Do all calculation in milliseconds
500         long taskFreqencyInMs = taskFrequencyInDay * SynchronizerConstants.MILLISEC_IN_A_DAY;
501
502         if (taskFreqencyInMs != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
503           periodicExecutor.scheduleAtFixedRate(new SyncTask(false), delayUntilFirstRegSyncInMs,
504               taskFreqencyInMs, TimeUnit.MILLISECONDS);
505           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine periodic synchronization is enabled.");
506           // case: when - startup sync is misconfigured or is disabled
507           // - give a clue to user when is the next periodic sync
508           if (!syncConfig.isConfigOkForStartupSync()
509               || syncConfig.isConfigDisabledForInitialSync()) {
510             LOG.info(AaiUiMsgs.SYNC_TO_BEGIN, syncController.getControllerName(),
511                 sdf.format(timeNextSync).replaceAll(SynchronizerConstants.TIME_STD,
512                     SynchronizerConstants.TIME_CONFIG_STD));
513           }
514         } else {
515           LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine periodic synchronization is disabled.");
516         }
517       }
518
519     } catch (Exception exc) {
520       String message = "Caught an exception while starting up the SyncHelper. Error cause = \n"
521           + ErrorUtil.extractStackTraceElements(5, exc);
522       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
523     }
524   }
525
526
527   /**
528    * Shutdown.
529    */
530   public void shutdown() {
531
532     if (oneShotExecutor != null) {
533       oneShotExecutor.shutdown();
534     }
535
536     if (periodicExecutor != null) {
537       periodicExecutor.shutdown();
538     }
539
540     if (historicalExecutor != null) {
541       historicalExecutor.shutdown();
542     }
543
544     if (syncController != null) {
545       syncController.shutdown();
546     }
547
548     if (entityCounterHistorySummarizer != null) {
549       entityCounterHistorySummarizer.shutdown();
550     }
551
552   }
553
554   public OxmModelLoader getOxmModelLoader() {
555     return oxmModelLoader;
556   }
557
558   public void setOxmModelLoader(OxmModelLoader oxmModelLoader) {
559     this.oxmModelLoader = oxmModelLoader;
560   }
561 }