Adding interfaces in documentation
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / sync / SyncControllerImpl.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.util.Calendar;
26 import java.util.Collection;
27 import java.util.Date;
28 import java.util.LinkedHashSet;
29 import java.util.TimeZone;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.function.Supplier;
34
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.aai.sparky.logging.AaiUiMsgs;
38 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
39 import org.onap.aai.sparky.sync.enumeration.OperationState;
40 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
41 import org.onap.aai.sparky.util.NodeUtils;
42
43 /**
44  * The Class SyncController.
45  *
46  * @author davea.
47  */
48 public class SyncControllerImpl implements SyncController {
49   private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
50
51   /**
52    * The Enum InternalState.
53    */
54   private enum InternalState {
55     IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
56     TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
57   }
58
59   /**
60    * The Enum SyncActions.
61    */
62   public enum SyncActions {
63     SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
64     SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
65   }
66
67   private Collection<IndexSynchronizer> registeredSynchronizers;
68   private Collection<IndexValidator> registeredIndexValidators;
69   private Collection<IndexCleaner> registeredIndexCleaners;
70   private InternalState currentInternalState;
71   private ExecutorService syncControllerExecutor;
72   private ExecutorService statReporterExecutor;
73   
74   private long delayInMs;
75   private long syncFrequencyInMs;
76   private Date syncStartTime;
77   
78   private Date lastExecutionDate;
79   private AtomicInteger runCount; 
80   private Semaphore performingActionGate;
81   private Calendar creationTime;
82   
83   private String syncStartTimeWithTimeZone;
84   private String controllerName;
85   
86   protected SyncControllerConfig syncControllerConfig;
87   
88   
89
90
91   /**
92    * Instantiates a new sync controller.
93    *
94    * @param name the name
95    * @throws Exception the exception
96    */
97   public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
98     this(syncControllerConfig,null);
99   }
100   
101   public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
102       throws Exception {
103
104     this.syncControllerConfig = syncControllerConfig;
105
106     this.delayInMs = 0L;
107     this.syncFrequencyInMs = 86400000L;
108     this.syncStartTime = null;
109     this.lastExecutionDate = null;
110     this.runCount = new AtomicInteger(0);
111     this.performingActionGate = new Semaphore(1);
112     registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
113     registeredIndexValidators = new LinkedHashSet<IndexValidator>();
114     registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
115
116     String controllerName = syncControllerConfig.getControllerName();
117
118     if (targetEntityType != null) {
119       controllerName += " (" + targetEntityType + ")";
120     }
121     
122     this.controllerName = controllerName;
123
124     this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
125         syncControllerConfig.getNumSyncControllerWorkers(), LOG);
126     this.statReporterExecutor =
127         NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
128
129     this.currentInternalState = InternalState.IDLE;
130
131     this.creationTime =
132         Calendar.getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
133
134   }
135
136   
137  
138
139   
140   
141   /**
142    * Change internal state.
143    *
144    * @param newState the new state
145    * @param causedByAction the caused by action
146    */
147   private void changeInternalState(InternalState newState, SyncActions causedByAction) {
148     LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
149         currentInternalState.toString(), newState.toString(), causedByAction.toString());
150
151     this.currentInternalState = newState;
152
153     performStateAction();
154   }
155   
156   
157   
158   /* (non-Javadoc)
159    * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
160    */
161   @Override
162   public long getDelayInMs() {
163     return delayInMs;
164   }
165
166   /* (non-Javadoc)
167    * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
168    */
169   @Override
170   public void setDelayInMs(long delayInMs) {
171     this.delayInMs = delayInMs;
172   }
173
174   /* (non-Javadoc)
175    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
176    */
177   @Override
178   public long getSyncFrequencyInMs() {
179     return syncFrequencyInMs;
180   }
181
182   /* (non-Javadoc)
183    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
184    */
185   @Override
186   public void setSyncFrequencyInMs(long syncFrequencyInMs) {
187     this.syncFrequencyInMs = syncFrequencyInMs;
188   }
189
190   /* (non-Javadoc)
191    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
192    */
193   @Override
194   public Date getSyncStartTime() {
195     return syncStartTime;
196   }
197
198   /* (non-Javadoc)
199    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
200    */
201   @Override
202   public void setSyncStartTime(Date syncStartTime) {
203     this.syncStartTime = syncStartTime;
204   }
205
206   /* (non-Javadoc)
207    * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
208    */
209   @Override
210   public Date getLastExecutionDate() {
211     return lastExecutionDate;
212   }
213
214   /* (non-Javadoc)
215    * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
216    */
217   @Override
218   public void setLastExecutionDate(Date lastExecutionDate) {
219     this.lastExecutionDate = lastExecutionDate;
220   }
221   
222   @Override
223   public String getControllerName() {
224     return controllerName;
225   }
226   
227  
228   
229
230   @Override
231   public OperationState performAction(SyncActions requestedAction) {
232
233     if (currentInternalState == InternalState.IDLE) {
234       
235       try {
236         
237         /*
238          * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
239          * at a time.
240          */
241         
242         switch (requestedAction) {
243           case SYNCHRONIZE:
244
245             if (performingActionGate.tryAcquire()) {
246               try {
247
248                 long opStartTime = System.currentTimeMillis();
249
250                 LOG.info(AaiUiMsgs.INFO_GENERIC,
251                     getControllerName() + " started synchronization at "
252                         + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
253                             SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
254
255                 runCount.incrementAndGet();
256
257                 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
258
259                 long opEndTime = System.currentTimeMillis();
260
261                 long opTime = (opEndTime - opStartTime);
262                 
263                 String durationMessage =
264                     String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
265
266                 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
267                 
268                 if (syncControllerConfig.isPeriodicSyncEnabled()) {
269
270                   LOG.info(AaiUiMsgs.INFO_GENERIC,
271                       getControllerName() + " next sync to begin at " + getNextSyncTime());
272
273                   TimeZone tz = TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
274
275                   if (opTime > this.getSyncFrequencyInMs()) {
276
277                     String durationWasLongerMessage = String.format(
278                         getControllerName() + " synchronization took '%d' ms which is larger than"
279                             + " synchronization interval of '%d' ms.",
280                         opTime, this.getSyncFrequencyInMs());
281
282                     LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
283                   }
284                 }
285
286               } catch (Exception syncException) {
287                 String message = "An error occurred while performing action = " + requestedAction
288                     + ". Error = " + syncException.getMessage();
289                 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
290               } finally {
291                 performingActionGate.release();
292               }
293             } else {
294               return OperationState.IGNORED_SYNC_NOT_IDLE;
295             }
296
297             break;
298
299           default:
300             break;
301         }
302         
303         return OperationState.OK;
304
305       } catch (Exception exc) {
306         String message = "An error occurred while performing action = " + requestedAction
307             + ". Error = " + exc.getMessage();
308         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
309         return OperationState.ERROR;
310       } finally {
311         
312       }
313     } else {
314       LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
315       return OperationState.IGNORED_SYNC_NOT_IDLE;
316     }
317   }
318
319   /**
320    * Perform state action.
321    */
322   private void performStateAction() {
323
324     try {
325       switch (currentInternalState) {
326
327         case TEST_INDEX_INTEGRITY:
328           performIndexIntegrityValidation();
329           break;
330
331         case PRE_SYNC:
332           performPreSyncCleanupCollection();
333           break;
334
335         case SYNC_OPERATION:
336           performSynchronization();
337           break;
338
339         case POST_SYNC:
340           performIndexSyncPostCollection();
341           changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
342           break;
343
344         case SELECTIVE_DELETE:
345           performIndexCleanup();
346           changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
347           break;
348
349         case GENERATE_FINAL_REPORT:
350
351           dumpStatReport(true);
352           clearCaches();
353           changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
354           break;
355
356         case ABORTING_SYNC:
357           performSyncAbort();
358           break;
359
360         default:
361           break;
362       }
363     } catch (Exception exc) {
364       /*
365        * Perhaps we should abort the sync on an exception
366        */
367       String message = "Caught an error which performing action. Error = " + exc.getMessage();
368       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
369     }
370   }
371
372   @Override
373   public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
374
375     String indexName = entitySynchronizer.getIndexName();
376
377     if (indexName != null) {
378       registeredSynchronizers.add(entitySynchronizer);
379     } else {
380       String message = "Failed to register entity synchronizer because index name is null";
381       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
382     }
383
384   }
385
386   @Override
387   public void registerIndexValidator(IndexValidator indexValidator) {
388
389     String indexName = indexValidator.getIndexName();
390
391     if (indexName != null) {
392       registeredIndexValidators.add(indexValidator);
393     } else {
394       String message = "Failed to register index validator because index name is null";
395       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
396     }
397
398   }
399
400   @Override
401   public void registerIndexCleaner(IndexCleaner indexCleaner) {
402
403     String indexName = indexCleaner.getIndexName();
404
405     if (indexName != null) {
406       registeredIndexCleaners.add(indexCleaner);
407     } else {
408       String message = "Failed to register index cleaner because index name is null";
409       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
410     }
411   }
412
413   /*
414    * State machine should drive our flow dosync just dispatches an action and the state machine
415    * determines what is in play and what is next
416    */
417
418   /**
419    * Dump stat report.
420    *
421    * @param showFinalReport the show final report
422    */
423   private void dumpStatReport(boolean showFinalReport) {
424
425     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
426
427       String statReport = synchronizer.getStatReport(showFinalReport);
428
429       if (statReport != null) {
430         LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
431       }
432     }
433   }
434
435   /**
436    * Clear caches.
437    */
438   private void clearCaches() {
439
440     /*
441      * Any entity caches that were built as part of the sync operation should be cleared to save
442      * memory. The original intent of the caching was to provide a short-lived cache to satisfy
443      * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
444      */
445
446     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
447       synchronizer.clearCache();
448     }
449   }
450
451   /**
452    * Perform pre sync cleanup collection.
453    */
454   private void performPreSyncCleanupCollection() {
455
456     /*
457      * ask the index cleaners to collect the their pre-sync object id collections
458      */
459
460     for (IndexCleaner cleaner : registeredIndexCleaners) {
461       cleaner.populatePreOperationCollection();
462     }
463
464     changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
465
466   }
467
468   /**
469    * Perform index sync post collection.
470    */
471   private void performIndexSyncPostCollection() {
472
473     /*
474      * ask the entity purgers to collect the their pre-sync object id collections
475      */
476
477     for (IndexCleaner cleaner : registeredIndexCleaners) {
478       cleaner.populatePostOperationCollection();
479     }
480
481   }
482
483   /**
484    * Perform index cleanup.
485    */
486   private void performIndexCleanup() {
487
488     /*
489      * ask the entity purgers to collect the their pre-sync object id collections
490      */
491
492     for (IndexCleaner cleaner : registeredIndexCleaners) {
493       cleaner.performCleanup();
494     }
495
496   }
497
498   /**
499    * Perform sync abort.
500    */
501   private void performSyncAbort() {
502     changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
503   }
504
505   /**
506    * Perform index integrity validation.
507    */
508   private void performIndexIntegrityValidation() {
509
510     /*
511      * loop through registered index validators and test and fix, if needed
512      */
513
514     for (IndexValidator validator : registeredIndexValidators) {
515       try {
516         if (!validator.exists()) {
517           validator.createOrRepair();
518         }
519       } catch (Exception exc) {
520         String message = "Index validator caused an error = " + exc.getMessage();
521         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
522       }
523     }
524
525     changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
526
527   }
528
529   /* (non-Javadoc)
530    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
531    */
532   @Override
533   public void shutdown() {
534
535     this.syncControllerExecutor.shutdown();
536     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
537
538       try {
539         synchronizer.shutdown();
540       } catch (Exception exc) {
541         LOG.error(AaiUiMsgs.ERROR_GENERIC,
542             "Synchronizer shutdown caused an error = " + exc.getMessage());
543       }
544
545     }
546     this.statReporterExecutor.shutdown();
547   }
548
549   /*
550    * Need some kind of task running that responds to a transient boolean to kill it or we just stop
551    * the executor that it is in?
552    */
553
554
555
556   /**
557    * Perform synchronization.
558    */
559   private void performSynchronization() {
560
561     /*
562      * Get all the synchronizers running in parallel
563      */
564
565     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
566       supplyAsync(new Supplier<Void>() {
567
568         @Override
569         public Void get() {
570
571           synchronizer.doSync();
572           return null;
573         }
574
575       }, this.syncControllerExecutor).whenComplete((result, error) -> {
576
577         /*
578          * We don't bother checking the result, because it will always be null as the doSync() is
579          * non-blocking.
580          */
581
582         if (error != null) {
583           LOG.error(AaiUiMsgs.ERROR_GENERIC,
584               "doSync operation failed with an error = " + error.getMessage());
585         }
586       });
587     }
588
589     boolean allDone = false;
590     long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
591     boolean dumpPeriodicStatReport = false;
592
593     while (!allDone) {
594       int totalFinished = 0;
595
596       for (IndexSynchronizer synchronizer : registeredSynchronizers) {
597         if (dumpPeriodicStatReport) {
598           if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
599             String statReport = synchronizer.getStatReport(false);
600
601             if (statReport != null) {
602               LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
603             }
604           }
605         }
606
607         if (synchronizer.getState() == SynchronizerState.IDLE
608             || synchronizer.getState() == SynchronizerState.ABORTED) {
609           totalFinished++;
610         }
611       }
612
613       if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
614         dumpPeriodicStatReport = true;
615         nextReportTimeStampInMs = System.currentTimeMillis() + 30000L; 
616       } else {
617         dumpPeriodicStatReport = false;
618       }
619
620       allDone = (totalFinished == registeredSynchronizers.size());
621
622       try {
623         Thread.sleep(250);
624       } catch (InterruptedException exc) {
625         LOG.error(AaiUiMsgs.ERROR_GENERIC,
626             "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
627       }
628
629     }
630
631     changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
632
633   }
634
635   /* (non-Javadoc)
636    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
637    */
638   @Override
639   public SynchronizerState getState() {
640
641     switch (currentInternalState) {
642
643       case IDLE: {
644         return SynchronizerState.IDLE;
645       }
646
647       default: {
648         return SynchronizerState.PERFORMING_SYNCHRONIZATION;
649
650       }
651     }
652
653   }
654
655   @Override
656   public Calendar getCreationTime() {
657     return creationTime;
658   }
659
660   @Override
661   public String getNextSyncTime() {
662     // TODO Auto-generated method stub
663     return null;
664   }
665
666   @Override
667   public boolean isPeriodicSyncEnabled() {
668     return syncControllerConfig.isPeriodicSyncEnabled();
669   }
670
671   @Override
672   public boolean isRunOnceSyncEnabled() {
673     return syncControllerConfig.isRunOnceSyncEnabled();
674   }
675   
676 }