2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.synchronizer;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
27 import java.lang.Thread.UncaughtExceptionHandler;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Calendar;
31 import java.util.List;
33 import java.util.TimeZone;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
40 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
41 import org.onap.aai.sparky.config.oxm.OxmModelLoader;
42 import org.onap.aai.sparky.dal.aai.ActiveInventoryAdapter;
43 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
44 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryRestConfig;
45 import org.onap.aai.sparky.dal.cache.EntityCache;
46 import org.onap.aai.sparky.dal.cache.InMemoryEntityCache;
47 import org.onap.aai.sparky.dal.cache.PersistentEntityCache;
48 import org.onap.aai.sparky.dal.elasticsearch.ElasticSearchAdapter;
49 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
50 import org.onap.aai.sparky.dal.rest.RestClientBuilder;
51 import org.onap.aai.sparky.dal.rest.RestfulDataAccessor;
52 import org.onap.aai.sparky.logging.AaiUiMsgs;
53 import org.onap.aai.sparky.synchronizer.SyncController.SyncActions;
54 import org.onap.aai.sparky.synchronizer.config.SynchronizerConfiguration;
55 import org.onap.aai.sparky.synchronizer.config.SynchronizerConstants;
56 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
57 import org.onap.aai.sparky.util.ErrorUtil;
58 import org.onap.aai.sparky.viewandinspect.config.TierSupportUiConstants;
59 import org.onap.aai.cl.api.Logger;
60 import org.onap.aai.cl.eelf.LoggerFactory;
64 * The Class SyncHelper.
68 public class SyncHelper {
70 private final Logger LOG = LoggerFactory.getInstance().getLogger(SyncHelper.class);
71 private SyncController syncController = null;
72 private SyncController entityCounterHistorySummarizer = null;
74 private ScheduledExecutorService oneShotExecutor = Executors.newSingleThreadScheduledExecutor();
75 private ScheduledExecutorService periodicExecutor = null;
76 private ScheduledExecutorService historicalExecutor =
77 Executors.newSingleThreadScheduledExecutor();
79 private SynchronizerConfiguration syncConfig;
80 private ElasticSearchConfig esConfig;
81 private OxmModelLoader oxmModelLoader;
83 private Boolean initialSyncRunning = false;
84 private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
85 private AtomicLong timeNextSync = new AtomicLong();
86 Map<String, String> contextMap;
91 private class SyncTask implements Runnable {
93 private boolean isInitialSync;
95 public boolean isInitialSync() {
99 public void setInitialSync(boolean isInitialSync) {
100 this.isInitialSync = isInitialSync;
104 * Instantiates a new sync task.
106 * @param initialSync the initial sync
108 public SyncTask(boolean initialSync) {
109 this.isInitialSync = initialSync;
115 * @see java.lang.Runnable#run()
119 long opStartTime = System.currentTimeMillis();
120 MDC.setContextMap(contextMap);
122 LOG.info(AaiUiMsgs.SEARCH_ENGINE_SYNC_STARTED, sdf.format(opStartTime)
123 .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
127 if (syncController == null) {
128 LOG.error(AaiUiMsgs.SYNC_SKIPPED_SYNCCONTROLLER_NOT_INITIALIZED);
132 int taskFrequencyInDays = SynchronizerConfiguration.getConfig().getSyncTaskFrequencyInDay();
135 * Do nothing if the initial start-up sync hasn't finished yet, but the regular sync
136 * scheduler fired up a regular sync.
138 if (!initialSyncRunning) {
140 initialSyncRunning = true;
142 // update 'timeNextSync' for periodic sync
143 timeNextSync.getAndAdd(taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
147 LOG.info(AaiUiMsgs.INFO_GENERIC, "SyncTask, starting syncrhonization");
149 syncController.performAction(SyncActions.SYNCHRONIZE);
151 while (syncController.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
156 LOG.info(AaiUiMsgs.SKIP_PERIODIC_SYNC_AS_SYNC_DIDNT_FINISH, sdf.format(opStartTime)
157 .replaceAll(SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
162 long opEndTime = System.currentTimeMillis();
166 * Handle corner case when start-up sync operation overlapped with a scheduled
167 * sync-start-time. Note that the scheduled sync does nothing if 'initialSyncRunning' is
168 * TRUE. So the actual next-sync is one more sync-cycle away
170 long knownNextSyncTime = timeNextSync.get();
171 if (knownNextSyncTime != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS
172 && opEndTime > knownNextSyncTime) {
173 timeNextSync.compareAndSet(knownNextSyncTime,
174 knownNextSyncTime + taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
175 initialSyncRunning = false;
179 String durationMessage =
180 String.format(syncController.getControllerName() + " synchronization took '%d' ms.",
181 (opEndTime - opStartTime));
183 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
185 // Provide log about the time for next synchronization
186 if (syncConfig.isConfigOkForPeriodicSync()
187 && timeNextSync.get() != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
188 TimeZone tz = TimeZone.getTimeZone(syncConfig.getSyncTaskStartTimeTimeZone());
190 if (opEndTime - opStartTime > taskFrequencyInDays
191 * SynchronizerConstants.MILLISEC_IN_A_DAY) {
192 String durationWasLongerMessage = String.format(
193 syncController.getControllerName()
194 + " synchronization took '%d' ms which is larger than"
195 + " synchronization interval of '%d' ms.",
196 (opEndTime - opStartTime),
197 taskFrequencyInDays * SynchronizerConstants.MILLISEC_IN_A_DAY);
199 LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
202 LOG.info(AaiUiMsgs.SYNC_TO_BEGIN, syncController.getControllerName(),
203 sdf.format(timeNextSync).replaceAll(SynchronizerConstants.TIME_STD,
204 SynchronizerConstants.TIME_CONFIG_STD));
207 } catch (Exception exc) {
208 String message = "Caught an exception while attempt to synchronize elastic search "
209 + "with an error cause = " + ErrorUtil.extractStackTraceElements(5, exc);
210 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
219 * Gets the first sync time.
221 * @param calendar the calendar
222 * @param timeNow the time now
223 * @param taskFreqInDay the task freq in day
224 * @return the first sync time
226 public long getFirstSyncTime(Calendar calendar, long timeNow, int taskFreqInDay) {
227 if (taskFreqInDay == SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
228 return SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS;
229 } else if (timeNow > calendar.getTimeInMillis()) {
230 calendar.add(Calendar.DAY_OF_MONTH, taskFreqInDay);
232 return calendar.getTimeInMillis();
236 * Boot strap and configure the moving pieces of the Sync Controller.
239 private void initializeSyncController() {
244 * TODO: it would be nice to have XML IoC / dependency injection kind of thing for these
245 * pieces maybe Spring?
249 * Sync Controller itself
252 syncController = new SyncController("entitySyncController");
255 * Create common elements
258 ActiveInventoryAdapter aaiAdapter = new ActiveInventoryAdapter(new RestClientBuilder());
259 ActiveInventoryRestConfig aaiRestConfig =
260 ActiveInventoryConfig.getConfig().getAaiRestConfig();
263 EntityCache cache = null;
265 if (aaiRestConfig.isCacheEnabled()) {
266 cache = new PersistentEntityCache(aaiRestConfig.getStorageFolderOverride(),
267 aaiRestConfig.getNumCacheWorkers());
269 cache = new InMemoryEntityCache();
272 RestClientBuilder clientBuilder = new RestClientBuilder();
274 aaiAdapter.setCacheEnabled(true);
275 aaiAdapter.setEntityCache(cache);
277 clientBuilder.setUseHttps(false);
279 RestfulDataAccessor nonCachingRestProvider = new RestfulDataAccessor(clientBuilder);
281 ElasticSearchConfig esConfig = ElasticSearchConfig.getConfig();
282 ElasticSearchAdapter esAdapter = new ElasticSearchAdapter(nonCachingRestProvider, esConfig);
285 * Register Index Validators
288 IndexIntegrityValidator entitySearchIndexValidator =
289 new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getIndexName(),
290 esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
291 esConfig.buildElasticSearchTableConfig());
293 syncController.registerIndexValidator(entitySearchIndexValidator);
295 // TODO: Insert IndexValidator for TopographicalEntityIndex
296 // we should have one, but one isn't 100% required as none of the fields are analyzed
299 * Register Synchronizers
302 SearchableEntitySynchronizer ses = new SearchableEntitySynchronizer(esConfig.getIndexName());
303 ses.setAaiDataProvider(aaiAdapter);
304 ses.setEsDataProvider(esAdapter);
305 syncController.registerEntitySynchronizer(ses);
307 CrossEntityReferenceSynchronizer cers = new CrossEntityReferenceSynchronizer(
308 esConfig.getIndexName(), ActiveInventoryConfig.getConfig());
309 cers.setAaiDataProvider(aaiAdapter);
310 cers.setEsDataProvider(esAdapter);
311 syncController.registerEntitySynchronizer(cers);
313 if (syncConfig.isAutosuggestSynchronizationEnabled()) {
314 initAutoSuggestionSynchronizer(esConfig, aaiAdapter, esAdapter, nonCachingRestProvider);
315 initAggregationSynchronizer(esConfig, aaiAdapter, esAdapter, nonCachingRestProvider);
322 IndexCleaner searchableIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
323 esConfig.getIndexName(), esConfig.getType(), esConfig.getIpAddress(),
324 esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
325 syncConfig.getNumScrollContextItemsToRetrievePerRequest());
327 syncController.registerIndexCleaner(searchableIndexCleaner);
329 } catch (Exception exc) {
330 String message = "Error: failed to sync with message = " + exc.getMessage();
331 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
336 private List<String> getAutosuggestableEntitiesFromOXM() {
337 Map<String, OxmEntityDescriptor> map = oxmModelLoader.getSuggestionSearchEntityDescriptors();
338 List<String> suggestableEntities = new ArrayList<String>();
340 for (String entity: map.keySet()){
341 suggestableEntities.add(entity);
343 return suggestableEntities;
347 * Initialize the AutosuggestionSynchronizer and
348 * AggregationSuggestionSynchronizer
353 * @param nonCachingRestProvider
355 private void initAutoSuggestionSynchronizer(ElasticSearchConfig esConfig,
356 ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
357 RestfulDataAccessor nonCachingRestProvider) {
358 LOG.info(AaiUiMsgs.INFO_GENERIC, "initAutoSuggestionSynchronizer");
360 // Initialize for entityautosuggestindex
362 IndexIntegrityValidator autoSuggestionIndexValidator =
363 new IndexIntegrityValidator(nonCachingRestProvider, esConfig.getAutosuggestIndexname(),
364 esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
365 esConfig.buildAutosuggestionTableConfig());
367 syncController.registerIndexValidator(autoSuggestionIndexValidator);
369 AutosuggestionSynchronizer suggestionSynchronizer =
370 new AutosuggestionSynchronizer(esConfig.getAutosuggestIndexname());
371 suggestionSynchronizer.setAaiDataProvider(aaiAdapter);
372 suggestionSynchronizer.setEsDataProvider(esAdapter);
373 syncController.registerEntitySynchronizer(suggestionSynchronizer);
375 AggregationSuggestionSynchronizer aggregationSuggestionSynchronizer =
376 new AggregationSuggestionSynchronizer(esConfig.getAutosuggestIndexname());
377 aggregationSuggestionSynchronizer.setEsDataProvider(esAdapter);
378 syncController.registerEntitySynchronizer(aggregationSuggestionSynchronizer);
380 IndexCleaner autosuggestIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
381 esConfig.getAutosuggestIndexname(), esConfig.getType(), esConfig.getIpAddress(),
382 esConfig.getHttpPort(), syncConfig.getScrollContextTimeToLiveInMinutes(),
383 syncConfig.getNumScrollContextItemsToRetrievePerRequest());
385 syncController.registerIndexCleaner(autosuggestIndexCleaner);
386 } catch (Exception exc) {
387 String message = "Error: failed to sync with message = " + exc.getMessage();
388 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
393 * Initialize the AggregationSynchronizer
398 * @param nonCachingRestProvider
400 private void initAggregationSynchronizer(ElasticSearchConfig esConfig,
401 ActiveInventoryAdapter aaiAdapter, ElasticSearchAdapter esAdapter,
402 RestfulDataAccessor nonCachingRestProvider) {
403 LOG.info(AaiUiMsgs.INFO_GENERIC, "initAggregationSynchronizer");
405 List<String> aggregationEntities = getAutosuggestableEntitiesFromOXM();
407 // For each index: create an IndexValidator, a Synchronizer, and an IndexCleaner
408 for (String entity : aggregationEntities) {
410 String indexName = TierSupportUiConstants.getAggregationIndexName(entity);
412 IndexIntegrityValidator aggregationIndexValidator = new IndexIntegrityValidator(
413 nonCachingRestProvider, indexName, esConfig.getType(), esConfig.getIpAddress(),
414 esConfig.getHttpPort(), esConfig.buildAggregationTableConfig());
416 syncController.registerIndexValidator(aggregationIndexValidator);
419 * TODO: This per-entity-synchronizer approach will eventually result in AAI / ES overload
420 * because of the existing dedicated thread pools for ES + AAI operations within the
421 * synchronizer. If we had 50 types to sync then the thread pools within each Synchronizer
422 * would cause some heartburn as there would be hundreds of threads trying to talk to AAI.
423 * Given that we our running out of time, let's make sure we can get it functional and then
426 AggregationSynchronizer aggSynchronizer = new AggregationSynchronizer(entity, indexName);
427 aggSynchronizer.setAaiDataProvider(aaiAdapter);
428 aggSynchronizer.setEsDataProvider(esAdapter);
429 syncController.registerEntitySynchronizer(aggSynchronizer);
431 IndexCleaner entityDataIndexCleaner = new ElasticSearchIndexCleaner(nonCachingRestProvider,
432 indexName, esConfig.getType(), esConfig.getIpAddress(), esConfig.getHttpPort(),
433 syncConfig.getScrollContextTimeToLiveInMinutes(),
434 syncConfig.getNumScrollContextItemsToRetrievePerRequest());
436 syncController.registerIndexCleaner(entityDataIndexCleaner);
438 } catch (Exception exc) {
439 String message = "Error: failed to sync with message = " + exc.getMessage();
440 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
446 * Instantiates a new sync helper.
448 * @param loader the loader
450 public SyncHelper(OxmModelLoader loader) {
452 this.contextMap = MDC.getCopyOfContextMap();
453 this.syncConfig = SynchronizerConfiguration.getConfig();
454 this.esConfig = ElasticSearchConfig.getConfig();
455 this.oxmModelLoader = loader;
457 UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
460 public void uncaughtException(Thread thread, Throwable exc) {
461 LOG.error(AaiUiMsgs.ERROR_GENERIC, thread.getName() + ": " + exc);
465 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("SyncHelper-%d")
466 .setUncaughtExceptionHandler(uncaughtExceptionHandler).build();
468 periodicExecutor = Executors.newScheduledThreadPool(3, namedThreadFactory);
471 * We only want to initialize the synchronizer if sync has been configured to start
473 if (syncConfig.isConfigOkForStartupSync() || syncConfig.isConfigOkForPeriodicSync()) {
474 initializeSyncController();
477 // schedule startup synchronization
478 if (syncConfig.isConfigOkForStartupSync()) {
480 long taskInitialDelayInMs = syncConfig.getSyncTaskInitialDelayInMs();
481 if (taskInitialDelayInMs != SynchronizerConstants.DELAY_NO_STARTUP_SYNC_IN_MS) {
482 oneShotExecutor.schedule(new SyncTask(true), taskInitialDelayInMs, TimeUnit.MILLISECONDS);
483 LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine startup synchronization is enabled.");
485 LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine startup synchronization is disabled.");
489 // schedule periodic synchronization
490 if (syncConfig.isConfigOkForPeriodicSync()) {
492 TimeZone tz = TimeZone.getTimeZone(syncConfig.getSyncTaskStartTimeTimeZone());
493 Calendar calendar = Calendar.getInstance(tz);
496 calendar.set(Calendar.HOUR_OF_DAY, syncConfig.getSyncTaskStartTimeHr());
497 calendar.set(Calendar.MINUTE, syncConfig.getSyncTaskStartTimeMin());
498 calendar.set(Calendar.SECOND, syncConfig.getSyncTaskStartTimeSec());
500 long timeCurrent = calendar.getTimeInMillis();
501 int taskFrequencyInDay = syncConfig.getSyncTaskFrequencyInDay();
502 timeNextSync.getAndSet(getFirstSyncTime(calendar, timeCurrent, taskFrequencyInDay));
504 long delayUntilFirstRegSyncInMs = 0;
505 delayUntilFirstRegSyncInMs = timeNextSync.get() - timeCurrent;
507 // Do all calculation in milliseconds
508 long taskFreqencyInMs = taskFrequencyInDay * SynchronizerConstants.MILLISEC_IN_A_DAY;
510 if (taskFreqencyInMs != SynchronizerConstants.DELAY_NO_PERIODIC_SYNC_IN_MS) {
511 periodicExecutor.scheduleAtFixedRate(new SyncTask(false), delayUntilFirstRegSyncInMs,
512 taskFreqencyInMs, TimeUnit.MILLISECONDS);
513 LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine periodic synchronization is enabled.");
514 // case: when - startup sync is misconfigured or is disabled
515 // - give a clue to user when is the next periodic sync
516 if (!syncConfig.isConfigOkForStartupSync()
517 || syncConfig.isConfigDisabledForInitialSync()) {
518 LOG.info(AaiUiMsgs.SYNC_TO_BEGIN, syncController.getControllerName(),
519 sdf.format(timeNextSync).replaceAll(SynchronizerConstants.TIME_STD,
520 SynchronizerConstants.TIME_CONFIG_STD));
523 LOG.info(AaiUiMsgs.INFO_GENERIC, "Search Engine periodic synchronization is disabled.");
527 } catch (Exception exc) {
528 String message = "Caught an exception while starting up the SyncHelper. Error cause = \n"
529 + ErrorUtil.extractStackTraceElements(5, exc);
530 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
538 public void shutdown() {
540 if (oneShotExecutor != null) {
541 oneShotExecutor.shutdown();
544 if (periodicExecutor != null) {
545 periodicExecutor.shutdown();
548 if (historicalExecutor != null) {
549 historicalExecutor.shutdown();
552 if (syncController != null) {
553 syncController.shutdown();
556 if (entityCounterHistorySummarizer != null) {
557 entityCounterHistorySummarizer.shutdown();
562 public OxmModelLoader getOxmModelLoader() {
563 return oxmModelLoader;
566 public void setOxmModelLoader(OxmModelLoader oxmModelLoader) {
567 this.oxmModelLoader = oxmModelLoader;