Adding UI extensibility
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / sync / SyncControllerImpl.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.util.Calendar;
28 import java.util.Collection;
29 import java.util.Date;
30 import java.util.LinkedHashSet;
31 import java.util.TimeZone;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Semaphore;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.function.Supplier;
36
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.sparky.logging.AaiUiMsgs;
40 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
41 import org.onap.aai.sparky.sync.enumeration.OperationState;
42 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
43 import org.onap.aai.sparky.util.NodeUtils;
44
45 /**
46  * The Class SyncController.
47  *
48  * @author davea.
49  */
50 public class SyncControllerImpl implements SyncController {
51   private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
52
53   /**
54    * The Enum InternalState.
55    */
56   private enum InternalState {
57     IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC, TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
58   }
59
60   /**
61    * The Enum SyncActions.
62    */
63   public enum SyncActions {
64     SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE, SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
65   }
66
67   private Collection<IndexSynchronizer> registeredSynchronizers;
68   private Collection<IndexValidator> registeredIndexValidators;
69   private Collection<IndexCleaner> registeredIndexCleaners;
70   private InternalState currentInternalState;
71   private ExecutorService syncControllerExecutor;
72   private ExecutorService statReporterExecutor;
73
74   private long delayInMs;
75   private long syncFrequencyInMs;
76   private Date syncStartTime;
77
78   private Date lastExecutionDate;
79   private AtomicInteger runCount;
80   private Semaphore performingActionGate;
81   private Calendar creationTime;
82
83   private String syncStartTimeWithTimeZone;
84   private String controllerName;
85
86   protected SyncControllerConfig syncControllerConfig;
87
88
89
90   /**
91    * Instantiates a new sync controller.
92    *
93    * @param name the name
94    * @throws Exception the exception
95    */
96   public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
97     this(syncControllerConfig, null);
98   }
99
100   public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
101       throws Exception {
102
103     this.syncControllerConfig = syncControllerConfig;
104
105     this.delayInMs = 0L;
106     this.syncFrequencyInMs = 86400000L;
107     this.syncStartTime = null;
108     this.lastExecutionDate = null;
109     this.runCount = new AtomicInteger(0);
110     this.performingActionGate = new Semaphore(1);
111     registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
112     registeredIndexValidators = new LinkedHashSet<IndexValidator>();
113     registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
114
115     String controllerName = syncControllerConfig.getControllerName();
116
117     if (targetEntityType != null) {
118       controllerName += " (" + targetEntityType + ")";
119     }
120
121     this.controllerName = controllerName;
122
123     this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
124         syncControllerConfig.getNumSyncControllerWorkers(), LOG);
125     this.statReporterExecutor =
126         NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
127
128     this.currentInternalState = InternalState.IDLE;
129
130     this.creationTime = Calendar
131         .getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
132
133   }
134
135
136
137   /**
138    * Change internal state.
139    *
140    * @param newState the new state
141    * @param causedByAction the caused by action
142    */
143   private void changeInternalState(InternalState newState, SyncActions causedByAction) {
144     LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName, currentInternalState.toString(),
145         newState.toString(), causedByAction.toString());
146
147     this.currentInternalState = newState;
148
149     performStateAction();
150   }
151
152
153
154   /*
155    * (non-Javadoc)
156    * 
157    * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
158    */
159   @Override
160   public long getDelayInMs() {
161     return delayInMs;
162   }
163
164   /*
165    * (non-Javadoc)
166    * 
167    * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
168    */
169   @Override
170   public void setDelayInMs(long delayInMs) {
171     this.delayInMs = delayInMs;
172   }
173
174   /*
175    * (non-Javadoc)
176    * 
177    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
178    */
179   @Override
180   public long getSyncFrequencyInMs() {
181     return syncFrequencyInMs;
182   }
183
184   /*
185    * (non-Javadoc)
186    * 
187    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
188    */
189   @Override
190   public void setSyncFrequencyInMs(long syncFrequencyInMs) {
191     this.syncFrequencyInMs = syncFrequencyInMs;
192   }
193
194   /*
195    * (non-Javadoc)
196    * 
197    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
198    */
199   @Override
200   public Date getSyncStartTime() {
201     return syncStartTime;
202   }
203
204   /*
205    * (non-Javadoc)
206    * 
207    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
208    */
209   @Override
210   public void setSyncStartTime(Date syncStartTime) {
211     this.syncStartTime = syncStartTime;
212   }
213
214   /*
215    * (non-Javadoc)
216    * 
217    * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
218    */
219   @Override
220   public Date getLastExecutionDate() {
221     return lastExecutionDate;
222   }
223
224   /*
225    * (non-Javadoc)
226    * 
227    * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
228    */
229   @Override
230   public void setLastExecutionDate(Date lastExecutionDate) {
231     this.lastExecutionDate = lastExecutionDate;
232   }
233
234   @Override
235   public String getControllerName() {
236     return controllerName;
237   }
238
239
240
241   @Override
242   public OperationState performAction(SyncActions requestedAction) {
243
244     if (currentInternalState == InternalState.IDLE) {
245
246       try {
247
248         /*
249          * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
250          * at a time.
251          */
252
253         switch (requestedAction) {
254           case SYNCHRONIZE:
255
256             if (performingActionGate.tryAcquire()) {
257               try {
258
259                 long opStartTime = System.currentTimeMillis();
260
261                 LOG.info(AaiUiMsgs.INFO_GENERIC,
262                     getControllerName() + " started synchronization at "
263                         + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
264                             SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
265
266                 runCount.incrementAndGet();
267
268                 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
269
270                 long opEndTime = System.currentTimeMillis();
271
272                 long opTime = (opEndTime - opStartTime);
273
274                 String durationMessage =
275                     String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
276
277                 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
278
279                 if (syncControllerConfig.isPeriodicSyncEnabled()) {
280
281                   LOG.info(AaiUiMsgs.INFO_GENERIC,
282                       getControllerName() + " next sync to begin at " + getNextSyncTime());
283
284                   TimeZone tz =
285                       TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
286
287                   if (opTime > this.getSyncFrequencyInMs()) {
288
289                     String durationWasLongerMessage = String.format(
290                         getControllerName() + " synchronization took '%d' ms which is larger than"
291                             + " synchronization interval of '%d' ms.",
292                         opTime, this.getSyncFrequencyInMs());
293
294                     LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
295                   }
296                 }
297
298               } catch (Exception syncException) {
299                 String message = "An error occurred while performing action = " + requestedAction
300                     + ". Error = " + syncException.getMessage();
301                 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
302               } finally {
303                 performingActionGate.release();
304               }
305             } else {
306               return OperationState.IGNORED_SYNC_NOT_IDLE;
307             }
308
309             break;
310
311           default:
312             break;
313         }
314
315         return OperationState.OK;
316
317       } catch (Exception exc) {
318         String message = "An error occurred while performing action = " + requestedAction
319             + ". Error = " + exc.getMessage();
320         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
321         return OperationState.ERROR;
322       } finally {
323
324       }
325     } else {
326       LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
327       return OperationState.IGNORED_SYNC_NOT_IDLE;
328     }
329   }
330
331   /**
332    * Perform state action.
333    */
334   private void performStateAction() {
335
336     try {
337       switch (currentInternalState) {
338
339         case TEST_INDEX_INTEGRITY:
340           performIndexIntegrityValidation();
341           break;
342
343         case PRE_SYNC:
344           performPreSyncCleanupCollection();
345           break;
346
347         case SYNC_OPERATION:
348           performSynchronization();
349           break;
350
351         case POST_SYNC:
352           performIndexSyncPostCollection();
353           changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
354           break;
355
356         case SELECTIVE_DELETE:
357           performIndexCleanup();
358           changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
359           break;
360
361         case GENERATE_FINAL_REPORT:
362
363           dumpStatReport(true);
364           clearCaches();
365           changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
366           break;
367
368         case ABORTING_SYNC:
369           performSyncAbort();
370           break;
371
372         default:
373           break;
374       }
375     } catch (Exception exc) {
376       /*
377        * Perhaps we should abort the sync on an exception
378        */
379       String message = "Caught an error which performing action. Error = " + exc.getMessage();
380       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
381     }
382   }
383
384   @Override
385   public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
386
387     String indexName = entitySynchronizer.getIndexName();
388
389     if (indexName != null) {
390       registeredSynchronizers.add(entitySynchronizer);
391     } else {
392       String message = "Failed to register entity synchronizer because index name is null";
393       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
394     }
395
396   }
397
398   @Override
399   public void registerIndexValidator(IndexValidator indexValidator) {
400
401     String indexName = indexValidator.getIndexName();
402
403     if (indexName != null) {
404       registeredIndexValidators.add(indexValidator);
405     } else {
406       String message = "Failed to register index validator because index name is null";
407       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
408     }
409
410   }
411
412   @Override
413   public void registerIndexCleaner(IndexCleaner indexCleaner) {
414
415     String indexName = indexCleaner.getIndexName();
416
417     if (indexName != null) {
418       registeredIndexCleaners.add(indexCleaner);
419     } else {
420       String message = "Failed to register index cleaner because index name is null";
421       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
422     }
423   }
424
425   /*
426    * State machine should drive our flow dosync just dispatches an action and the state machine
427    * determines what is in play and what is next
428    */
429
430   /**
431    * Dump stat report.
432    *
433    * @param showFinalReport the show final report
434    */
435   private void dumpStatReport(boolean showFinalReport) {
436
437     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
438
439       String statReport = synchronizer.getStatReport(showFinalReport);
440
441       if (statReport != null) {
442         LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
443       }
444     }
445   }
446
447   /**
448    * Clear caches.
449    */
450   private void clearCaches() {
451
452     /*
453      * Any entity caches that were built as part of the sync operation should be cleared to save
454      * memory. The original intent of the caching was to provide a short-lived cache to satisfy
455      * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
456      */
457
458     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
459       synchronizer.clearCache();
460     }
461   }
462
463   /**
464    * Perform pre sync cleanup collection.
465    */
466   private void performPreSyncCleanupCollection() {
467
468     /*
469      * ask the index cleaners to collect the their pre-sync object id collections
470      */
471
472     for (IndexCleaner cleaner : registeredIndexCleaners) {
473       cleaner.populatePreOperationCollection();
474     }
475
476     changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
477
478   }
479
480   /**
481    * Perform index sync post collection.
482    */
483   private void performIndexSyncPostCollection() {
484
485     /*
486      * ask the entity purgers to collect the their pre-sync object id collections
487      */
488
489     for (IndexCleaner cleaner : registeredIndexCleaners) {
490       cleaner.populatePostOperationCollection();
491     }
492
493   }
494
495   /**
496    * Perform index cleanup.
497    */
498   private void performIndexCleanup() {
499
500     /*
501      * ask the entity purgers to collect the their pre-sync object id collections
502      */
503
504     for (IndexCleaner cleaner : registeredIndexCleaners) {
505       cleaner.performCleanup();
506     }
507
508   }
509
510   /**
511    * Perform sync abort.
512    */
513   private void performSyncAbort() {
514     changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
515   }
516
517   /**
518    * Perform index integrity validation.
519    */
520   private void performIndexIntegrityValidation() {
521
522     /*
523      * loop through registered index validators and test and fix, if needed
524      */
525
526     for (IndexValidator validator : registeredIndexValidators) {
527       try {
528         if (!validator.exists()) {
529           validator.createOrRepair();
530         }
531       } catch (Exception exc) {
532         String message = "Index validator caused an error = " + exc.getMessage();
533         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
534       }
535     }
536
537     changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
538
539   }
540
541   /*
542    * (non-Javadoc)
543    * 
544    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
545    */
546   @Override
547   public void shutdown() {
548
549     this.syncControllerExecutor.shutdown();
550     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
551
552       try {
553         synchronizer.shutdown();
554       } catch (Exception exc) {
555         LOG.error(AaiUiMsgs.ERROR_GENERIC,
556             "Synchronizer shutdown caused an error = " + exc.getMessage());
557       }
558
559     }
560     this.statReporterExecutor.shutdown();
561   }
562
563   /*
564    * Need some kind of task running that responds to a transient boolean to kill it or we just stop
565    * the executor that it is in?
566    */
567
568
569
570   /**
571    * Perform synchronization.
572    */
573   private void performSynchronization() {
574
575     /*
576      * Get all the synchronizers running in parallel
577      */
578
579     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
580       supplyAsync(new Supplier<Void>() {
581
582         @Override
583         public Void get() {
584
585           synchronizer.doSync();
586           return null;
587         }
588
589       }, this.syncControllerExecutor).whenComplete((result, error) -> {
590
591         /*
592          * We don't bother checking the result, because it will always be null as the doSync() is
593          * non-blocking.
594          */
595
596         if (error != null) {
597           LOG.error(AaiUiMsgs.ERROR_GENERIC,
598               "doSync operation failed with an error = " + error.getMessage());
599         }
600       });
601     }
602
603     boolean allDone = false;
604     long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
605     boolean dumpPeriodicStatReport = false;
606
607     while (!allDone) {
608       int totalFinished = 0;
609
610       for (IndexSynchronizer synchronizer : registeredSynchronizers) {
611         if (dumpPeriodicStatReport) {
612           if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
613             String statReport = synchronizer.getStatReport(false);
614
615             if (statReport != null) {
616               LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
617             }
618           }
619         }
620
621         if (synchronizer.getState() == SynchronizerState.IDLE
622             || synchronizer.getState() == SynchronizerState.ABORTED) {
623           totalFinished++;
624         }
625       }
626
627       if (System.currentTimeMillis() > nextReportTimeStampInMs) {
628         dumpPeriodicStatReport = true;
629         nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
630       } else {
631         dumpPeriodicStatReport = false;
632       }
633
634       allDone = (totalFinished == registeredSynchronizers.size());
635
636       try {
637         Thread.sleep(250);
638       } catch (InterruptedException exc) {
639         LOG.error(AaiUiMsgs.ERROR_GENERIC,
640             "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
641       }
642
643     }
644
645     changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
646
647   }
648
649   /*
650    * (non-Javadoc)
651    * 
652    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
653    */
654   @Override
655   public SynchronizerState getState() {
656
657     switch (currentInternalState) {
658
659       case IDLE: {
660         return SynchronizerState.IDLE;
661       }
662
663       default: {
664         return SynchronizerState.PERFORMING_SYNCHRONIZATION;
665
666       }
667     }
668
669   }
670
671   @Override
672   public Calendar getCreationTime() {
673     return creationTime;
674   }
675
676   @Override
677   public String getNextSyncTime() {
678     // TODO Auto-generated method stub
679     return null;
680   }
681
682   @Override
683   public boolean isPeriodicSyncEnabled() {
684     return syncControllerConfig.isPeriodicSyncEnabled();
685   }
686
687   @Override
688   public boolean isRunOnceSyncEnabled() {
689     return syncControllerConfig.isRunOnceSyncEnabled();
690   }
691
692 }