Update license and poms
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / sync / SyncControllerImpl.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.util.Calendar;
26 import java.util.Collection;
27 import java.util.Date;
28 import java.util.LinkedHashSet;
29 import java.util.TimeZone;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.function.Supplier;
34
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.aai.sparky.logging.AaiUiMsgs;
38 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
39 import org.onap.aai.sparky.sync.enumeration.OperationState;
40 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
41 import org.onap.aai.sparky.util.NodeUtils;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.stereotype.Component;
44
45 /**
46  * The Class SyncController.
47  *
48  * @author davea.
49  */
50 public class SyncControllerImpl implements SyncController {
51   private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
52
53   /**
54    * The Enum InternalState.
55    */
56   private enum InternalState {
57     IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
58     TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
59   }
60
61   /**
62    * The Enum SyncActions.
63    */
64   public enum SyncActions {
65     SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
66     SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
67   }
68
69   private Collection<IndexSynchronizer> registeredSynchronizers;
70   private Collection<IndexValidator> registeredIndexValidators;
71   private Collection<IndexCleaner> registeredIndexCleaners;
72   private InternalState currentInternalState;
73   private ExecutorService syncControllerExecutor;
74   private ExecutorService statReporterExecutor;
75   
76   private long delayInMs;
77   private long syncFrequencyInMs;
78   private Date syncStartTime;
79   
80   private Date lastExecutionDate;
81   private AtomicInteger runCount; 
82   private Semaphore performingActionGate;
83   private Calendar creationTime;
84   
85   private String syncStartTimeWithTimeZone;
86   private String controllerName;
87   
88   protected SyncControllerConfig syncControllerConfig;
89   
90   
91
92
93   /**
94    * Instantiates a new sync controller.
95    *
96    * @param name the name
97    * @throws Exception the exception
98    */
99   public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
100     this(syncControllerConfig,null);
101   }
102   
103   public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
104       throws Exception {
105
106     this.syncControllerConfig = syncControllerConfig;
107
108     this.delayInMs = 0L;
109     this.syncFrequencyInMs = 86400000L;
110     this.syncStartTime = null;
111     this.lastExecutionDate = null;
112     this.runCount = new AtomicInteger(0);
113     this.performingActionGate = new Semaphore(1);
114     registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
115     registeredIndexValidators = new LinkedHashSet<IndexValidator>();
116     registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
117
118     String controllerName = syncControllerConfig.getControllerName();
119
120     if (targetEntityType != null) {
121       controllerName += " (" + targetEntityType + ")";
122     }
123     
124     this.controllerName = controllerName;
125
126     this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
127         syncControllerConfig.getNumSyncControllerWorkers(), LOG);
128     this.statReporterExecutor =
129         NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
130
131     this.currentInternalState = InternalState.IDLE;
132
133     this.creationTime =
134         Calendar.getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
135
136   }
137
138   
139  
140
141   
142   
143   /**
144    * Change internal state.
145    *
146    * @param newState the new state
147    * @param causedByAction the caused by action
148    */
149   private void changeInternalState(InternalState newState, SyncActions causedByAction) {
150     LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
151         currentInternalState.toString(), newState.toString(), causedByAction.toString());
152
153     this.currentInternalState = newState;
154
155     performStateAction();
156   }
157   
158   
159   
160   /* (non-Javadoc)
161    * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
162    */
163   @Override
164   public long getDelayInMs() {
165     return delayInMs;
166   }
167
168   /* (non-Javadoc)
169    * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
170    */
171   @Override
172   public void setDelayInMs(long delayInMs) {
173     this.delayInMs = delayInMs;
174   }
175
176   /* (non-Javadoc)
177    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
178    */
179   @Override
180   public long getSyncFrequencyInMs() {
181     return syncFrequencyInMs;
182   }
183
184   /* (non-Javadoc)
185    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
186    */
187   @Override
188   public void setSyncFrequencyInMs(long syncFrequencyInMs) {
189     this.syncFrequencyInMs = syncFrequencyInMs;
190   }
191
192   /* (non-Javadoc)
193    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
194    */
195   @Override
196   public Date getSyncStartTime() {
197     return syncStartTime;
198   }
199
200   /* (non-Javadoc)
201    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
202    */
203   @Override
204   public void setSyncStartTime(Date syncStartTime) {
205     this.syncStartTime = syncStartTime;
206   }
207
208   /* (non-Javadoc)
209    * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
210    */
211   @Override
212   public Date getLastExecutionDate() {
213     return lastExecutionDate;
214   }
215
216   /* (non-Javadoc)
217    * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
218    */
219   @Override
220   public void setLastExecutionDate(Date lastExecutionDate) {
221     this.lastExecutionDate = lastExecutionDate;
222   }
223   
224   @Override
225   public String getControllerName() {
226     return controllerName;
227   }
228   
229  
230   
231
232   @Override
233   public OperationState performAction(SyncActions requestedAction) {
234
235     if (currentInternalState == InternalState.IDLE) {
236       
237       try {
238         
239         /*
240          * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
241          * at a time.
242          */
243         
244         switch (requestedAction) {
245           case SYNCHRONIZE:
246
247             if (performingActionGate.tryAcquire()) {
248               try {
249
250                 long opStartTime = System.currentTimeMillis();
251
252                 LOG.info(AaiUiMsgs.INFO_GENERIC,
253                     getControllerName() + " started synchronization at "
254                         + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
255                             SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
256
257                 runCount.incrementAndGet();
258
259                 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
260
261                 long opEndTime = System.currentTimeMillis();
262
263                 long opTime = (opEndTime - opStartTime);
264                 
265                 String durationMessage =
266                     String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
267
268                 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
269                 
270                 if (syncControllerConfig.isPeriodicSyncEnabled()) {
271
272                   LOG.info(AaiUiMsgs.INFO_GENERIC,
273                       getControllerName() + " next sync to begin at " + getNextSyncTime());
274
275                   TimeZone tz = TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
276
277                   if (opTime > this.getSyncFrequencyInMs()) {
278
279                     String durationWasLongerMessage = String.format(
280                         getControllerName() + " synchronization took '%d' ms which is larger than"
281                             + " synchronization interval of '%d' ms.",
282                         opTime, this.getSyncFrequencyInMs());
283
284                     LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
285                   }
286                 }
287
288               } catch (Exception syncException) {
289                 String message = "An error occurred while performing action = " + requestedAction
290                     + ". Error = " + syncException.getMessage();
291                 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
292               } finally {
293                 performingActionGate.release();
294               }
295             } else {
296               return OperationState.IGNORED_SYNC_NOT_IDLE;
297             }
298
299             break;
300
301           default:
302             break;
303         }
304         
305         return OperationState.OK;
306
307       } catch (Exception exc) {
308         String message = "An error occurred while performing action = " + requestedAction
309             + ". Error = " + exc.getMessage();
310         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
311         return OperationState.ERROR;
312       } finally {
313         
314       }
315     } else {
316       LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
317       return OperationState.IGNORED_SYNC_NOT_IDLE;
318     }
319   }
320
321   /**
322    * Perform state action.
323    */
324   private void performStateAction() {
325
326     try {
327       switch (currentInternalState) {
328
329         case TEST_INDEX_INTEGRITY:
330           performIndexIntegrityValidation();
331           break;
332
333         case PRE_SYNC:
334           performPreSyncCleanupCollection();
335           break;
336
337         case SYNC_OPERATION:
338           performSynchronization();
339           break;
340
341         case POST_SYNC:
342           performIndexSyncPostCollection();
343           changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
344           break;
345
346         case SELECTIVE_DELETE:
347           performIndexCleanup();
348           changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
349           break;
350
351         case GENERATE_FINAL_REPORT:
352
353           dumpStatReport(true);
354           clearCaches();
355           changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
356           break;
357
358         case ABORTING_SYNC:
359           performSyncAbort();
360           break;
361
362         default:
363           break;
364       }
365     } catch (Exception exc) {
366       /*
367        * Perhaps we should abort the sync on an exception
368        */
369       String message = "Caught an error which performing action. Error = " + exc.getMessage();
370       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
371     }
372   }
373
374   @Override
375   public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
376
377     String indexName = entitySynchronizer.getIndexName();
378
379     if (indexName != null) {
380       registeredSynchronizers.add(entitySynchronizer);
381     } else {
382       String message = "Failed to register entity synchronizer because index name is null";
383       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
384     }
385
386   }
387
388   @Override
389   public void registerIndexValidator(IndexValidator indexValidator) {
390
391     String indexName = indexValidator.getIndexName();
392
393     if (indexName != null) {
394       registeredIndexValidators.add(indexValidator);
395     } else {
396       String message = "Failed to register index validator because index name is null";
397       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
398     }
399
400   }
401
402   @Override
403   public void registerIndexCleaner(IndexCleaner indexCleaner) {
404
405     String indexName = indexCleaner.getIndexName();
406
407     if (indexName != null) {
408       registeredIndexCleaners.add(indexCleaner);
409     } else {
410       String message = "Failed to register index cleaner because index name is null";
411       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
412     }
413   }
414
415   /*
416    * State machine should drive our flow dosync just dispatches an action and the state machine
417    * determines what is in play and what is next
418    */
419
420   /**
421    * Dump stat report.
422    *
423    * @param showFinalReport the show final report
424    */
425   private void dumpStatReport(boolean showFinalReport) {
426
427     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
428
429       String statReport = synchronizer.getStatReport(showFinalReport);
430
431       if (statReport != null) {
432         LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
433       }
434     }
435   }
436
437   /**
438    * Clear caches.
439    */
440   private void clearCaches() {
441
442     /*
443      * Any entity caches that were built as part of the sync operation should be cleared to save
444      * memory. The original intent of the caching was to provide a short-lived cache to satisfy
445      * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
446      */
447
448     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
449       synchronizer.clearCache();
450     }
451   }
452
453   /**
454    * Perform pre sync cleanup collection.
455    */
456   private void performPreSyncCleanupCollection() {
457
458     /*
459      * ask the index cleaners to collect the their pre-sync object id collections
460      */
461
462     for (IndexCleaner cleaner : registeredIndexCleaners) {
463       cleaner.populatePreOperationCollection();
464     }
465
466     changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
467
468   }
469
470   /**
471    * Perform index sync post collection.
472    */
473   private void performIndexSyncPostCollection() {
474
475     /*
476      * ask the entity purgers to collect the their pre-sync object id collections
477      */
478
479     for (IndexCleaner cleaner : registeredIndexCleaners) {
480       cleaner.populatePostOperationCollection();
481     }
482
483   }
484
485   /**
486    * Perform index cleanup.
487    */
488   private void performIndexCleanup() {
489
490     /*
491      * ask the entity purgers to collect the their pre-sync object id collections
492      */
493
494     for (IndexCleaner cleaner : registeredIndexCleaners) {
495       cleaner.performCleanup();
496     }
497
498   }
499
500   /**
501    * Perform sync abort.
502    */
503   private void performSyncAbort() {
504     changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
505   }
506
507   /**
508    * Perform index integrity validation.
509    */
510   private void performIndexIntegrityValidation() {
511
512     /*
513      * loop through registered index validators and test and fix, if needed
514      */
515
516     for (IndexValidator validator : registeredIndexValidators) {
517       try {
518         if (!validator.exists()) {
519           validator.createOrRepair();
520         }
521       } catch (Exception exc) {
522         String message = "Index validator caused an error = " + exc.getMessage();
523         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
524       }
525     }
526
527     changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
528
529   }
530
531   /* (non-Javadoc)
532    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
533    */
534   @Override
535   public void shutdown() {
536
537     this.syncControllerExecutor.shutdown();
538     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
539
540       try {
541         synchronizer.shutdown();
542       } catch (Exception exc) {
543         LOG.error(AaiUiMsgs.ERROR_GENERIC,
544             "Synchronizer shutdown caused an error = " + exc.getMessage());
545       }
546
547     }
548     this.statReporterExecutor.shutdown();
549   }
550
551   /*
552    * Need some kind of task running that responds to a transient boolean to kill it or we just stop
553    * the executor that it is in?
554    */
555
556
557
558   /**
559    * Perform synchronization.
560    */
561   private void performSynchronization() {
562
563     /*
564      * Get all the synchronizers running in parallel
565      */
566
567     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
568       supplyAsync(new Supplier<Void>() {
569
570         @Override
571         public Void get() {
572
573           synchronizer.doSync();
574           return null;
575         }
576
577       }, this.syncControllerExecutor).whenComplete((result, error) -> {
578
579         /*
580          * We don't bother checking the result, because it will always be null as the doSync() is
581          * non-blocking.
582          */
583
584         if (error != null) {
585           LOG.error(AaiUiMsgs.ERROR_GENERIC,
586               "doSync operation failed with an error = " + error.getMessage());
587         }
588       });
589     }
590
591     boolean allDone = false;
592     long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
593     boolean dumpPeriodicStatReport = false;
594
595     while (!allDone) {
596       int totalFinished = 0;
597
598       for (IndexSynchronizer synchronizer : registeredSynchronizers) {
599         if (dumpPeriodicStatReport) {
600           if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
601             String statReport = synchronizer.getStatReport(false);
602
603             if (statReport != null) {
604               LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
605             }
606           }
607         }
608
609         if (synchronizer.getState() == SynchronizerState.IDLE
610             || synchronizer.getState() == SynchronizerState.ABORTED) {
611           totalFinished++;
612         }
613       }
614
615       if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
616         dumpPeriodicStatReport = true;
617         nextReportTimeStampInMs = System.currentTimeMillis() + 30000L; 
618       } else {
619         dumpPeriodicStatReport = false;
620       }
621
622       allDone = (totalFinished == registeredSynchronizers.size());
623
624       try {
625         Thread.sleep(250);
626       } catch (InterruptedException exc) {
627         LOG.error(AaiUiMsgs.ERROR_GENERIC,
628             "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
629       }
630
631     }
632
633     changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
634
635   }
636
637   /* (non-Javadoc)
638    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
639    */
640   @Override
641   public SynchronizerState getState() {
642
643     switch (currentInternalState) {
644
645       case IDLE: {
646         return SynchronizerState.IDLE;
647       }
648
649       default: {
650         return SynchronizerState.PERFORMING_SYNCHRONIZATION;
651
652       }
653     }
654
655   }
656
657   @Override
658   public Calendar getCreationTime() {
659     return creationTime;
660   }
661
662   @Override
663   public String getNextSyncTime() {
664     // TODO Auto-generated method stub
665     return null;
666   }
667
668   @Override
669   public boolean isPeriodicSyncEnabled() {
670     return syncControllerConfig.isPeriodicSyncEnabled();
671   }
672
673   @Override
674   public boolean isRunOnceSyncEnabled() {
675     return syncControllerConfig.isRunOnceSyncEnabled();
676   }
677   
678 }