Adding interfaces in documentation
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / sync / SyncControllerImpl.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.sync;
26
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
28
29 import java.util.Calendar;
30 import java.util.Collection;
31 import java.util.Date;
32 import java.util.LinkedHashSet;
33 import java.util.TimeZone;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Semaphore;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.Supplier;
38
39 import org.onap.aai.cl.api.Logger;
40 import org.onap.aai.cl.eelf.LoggerFactory;
41 import org.onap.aai.sparky.logging.AaiUiMsgs;
42 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
43 import org.onap.aai.sparky.sync.enumeration.OperationState;
44 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
45 import org.onap.aai.sparky.util.NodeUtils;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
48
49 /**
50  * The Class SyncController.
51  *
52  * @author davea.
53  */
54 public class SyncControllerImpl implements SyncController {
55   private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
56
57   /**
58    * The Enum InternalState.
59    */
60   private enum InternalState {
61     IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
62     TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
63   }
64
65   /**
66    * The Enum SyncActions.
67    */
68   public enum SyncActions {
69     SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
70     SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
71   }
72
73   private Collection<IndexSynchronizer> registeredSynchronizers;
74   private Collection<IndexValidator> registeredIndexValidators;
75   private Collection<IndexCleaner> registeredIndexCleaners;
76   private InternalState currentInternalState;
77   private ExecutorService syncControllerExecutor;
78   private ExecutorService statReporterExecutor;
79   
80   private long delayInMs;
81   private long syncFrequencyInMs;
82   private Date syncStartTime;
83   
84   private Date lastExecutionDate;
85   private AtomicInteger runCount; 
86   private Semaphore performingActionGate;
87   private Calendar creationTime;
88   
89   private String syncStartTimeWithTimeZone;
90   private String controllerName;
91   
92   protected SyncControllerConfig syncControllerConfig;
93   
94   
95
96
97   /**
98    * Instantiates a new sync controller.
99    *
100    * @param name the name
101    * @throws Exception the exception
102    */
103   public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
104     this(syncControllerConfig,null);
105   }
106   
107   public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
108       throws Exception {
109
110     this.syncControllerConfig = syncControllerConfig;
111
112     this.delayInMs = 0L;
113     this.syncFrequencyInMs = 86400000L;
114     this.syncStartTime = null;
115     this.lastExecutionDate = null;
116     this.runCount = new AtomicInteger(0);
117     this.performingActionGate = new Semaphore(1);
118     registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
119     registeredIndexValidators = new LinkedHashSet<IndexValidator>();
120     registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
121
122     String controllerName = syncControllerConfig.getControllerName();
123
124     if (targetEntityType != null) {
125       controllerName += " (" + targetEntityType + ")";
126     }
127     
128     this.controllerName = controllerName;
129
130     this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
131         syncControllerConfig.getNumSyncControllerWorkers(), LOG);
132     this.statReporterExecutor =
133         NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
134
135     this.currentInternalState = InternalState.IDLE;
136
137     this.creationTime =
138         Calendar.getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
139
140   }
141
142   
143  
144
145   
146   
147   /**
148    * Change internal state.
149    *
150    * @param newState the new state
151    * @param causedByAction the caused by action
152    */
153   private void changeInternalState(InternalState newState, SyncActions causedByAction) {
154     LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
155         currentInternalState.toString(), newState.toString(), causedByAction.toString());
156
157     this.currentInternalState = newState;
158
159     performStateAction();
160   }
161   
162   
163   
164   /* (non-Javadoc)
165    * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
166    */
167   @Override
168   public long getDelayInMs() {
169     return delayInMs;
170   }
171
172   /* (non-Javadoc)
173    * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
174    */
175   @Override
176   public void setDelayInMs(long delayInMs) {
177     this.delayInMs = delayInMs;
178   }
179
180   /* (non-Javadoc)
181    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
182    */
183   @Override
184   public long getSyncFrequencyInMs() {
185     return syncFrequencyInMs;
186   }
187
188   /* (non-Javadoc)
189    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
190    */
191   @Override
192   public void setSyncFrequencyInMs(long syncFrequencyInMs) {
193     this.syncFrequencyInMs = syncFrequencyInMs;
194   }
195
196   /* (non-Javadoc)
197    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
198    */
199   @Override
200   public Date getSyncStartTime() {
201     return syncStartTime;
202   }
203
204   /* (non-Javadoc)
205    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
206    */
207   @Override
208   public void setSyncStartTime(Date syncStartTime) {
209     this.syncStartTime = syncStartTime;
210   }
211
212   /* (non-Javadoc)
213    * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
214    */
215   @Override
216   public Date getLastExecutionDate() {
217     return lastExecutionDate;
218   }
219
220   /* (non-Javadoc)
221    * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
222    */
223   @Override
224   public void setLastExecutionDate(Date lastExecutionDate) {
225     this.lastExecutionDate = lastExecutionDate;
226   }
227   
228   @Override
229   public String getControllerName() {
230     return controllerName;
231   }
232   
233  
234   
235
236   @Override
237   public OperationState performAction(SyncActions requestedAction) {
238
239     if (currentInternalState == InternalState.IDLE) {
240       
241       try {
242         
243         /*
244          * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
245          * at a time.
246          */
247         
248         switch (requestedAction) {
249           case SYNCHRONIZE:
250
251             if (performingActionGate.tryAcquire()) {
252               try {
253
254                 long opStartTime = System.currentTimeMillis();
255
256                 LOG.info(AaiUiMsgs.INFO_GENERIC,
257                     getControllerName() + " started synchronization at "
258                         + SynchronizerConstants.SIMPLE_DATE_FORMAT.format(opStartTime).replaceAll(
259                             SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
260
261                 runCount.incrementAndGet();
262
263                 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
264
265                 long opEndTime = System.currentTimeMillis();
266
267                 long opTime = (opEndTime - opStartTime);
268                 
269                 String durationMessage =
270                     String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
271
272                 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
273                 
274                 if (syncControllerConfig.isPeriodicSyncEnabled()) {
275
276                   LOG.info(AaiUiMsgs.INFO_GENERIC,
277                       getControllerName() + " next sync to begin at " + getNextSyncTime());
278
279                   TimeZone tz = TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
280
281                   if (opTime > this.getSyncFrequencyInMs()) {
282
283                     String durationWasLongerMessage = String.format(
284                         getControllerName() + " synchronization took '%d' ms which is larger than"
285                             + " synchronization interval of '%d' ms.",
286                         opTime, this.getSyncFrequencyInMs());
287
288                     LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
289                   }
290                 }
291
292               } catch (Exception syncException) {
293                 String message = "An error occurred while performing action = " + requestedAction
294                     + ". Error = " + syncException.getMessage();
295                 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
296               } finally {
297                 performingActionGate.release();
298               }
299             } else {
300               return OperationState.IGNORED_SYNC_NOT_IDLE;
301             }
302
303             break;
304
305           default:
306             break;
307         }
308         
309         return OperationState.OK;
310
311       } catch (Exception exc) {
312         String message = "An error occurred while performing action = " + requestedAction
313             + ". Error = " + exc.getMessage();
314         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
315         return OperationState.ERROR;
316       } finally {
317         
318       }
319     } else {
320       LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
321       return OperationState.IGNORED_SYNC_NOT_IDLE;
322     }
323   }
324
325   /**
326    * Perform state action.
327    */
328   private void performStateAction() {
329
330     try {
331       switch (currentInternalState) {
332
333         case TEST_INDEX_INTEGRITY:
334           performIndexIntegrityValidation();
335           break;
336
337         case PRE_SYNC:
338           performPreSyncCleanupCollection();
339           break;
340
341         case SYNC_OPERATION:
342           performSynchronization();
343           break;
344
345         case POST_SYNC:
346           performIndexSyncPostCollection();
347           changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
348           break;
349
350         case SELECTIVE_DELETE:
351           performIndexCleanup();
352           changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
353           break;
354
355         case GENERATE_FINAL_REPORT:
356
357           dumpStatReport(true);
358           clearCaches();
359           changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
360           break;
361
362         case ABORTING_SYNC:
363           performSyncAbort();
364           break;
365
366         default:
367           break;
368       }
369     } catch (Exception exc) {
370       /*
371        * Perhaps we should abort the sync on an exception
372        */
373       String message = "Caught an error which performing action. Error = " + exc.getMessage();
374       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
375     }
376   }
377
378   @Override
379   public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
380
381     String indexName = entitySynchronizer.getIndexName();
382
383     if (indexName != null) {
384       registeredSynchronizers.add(entitySynchronizer);
385     } else {
386       String message = "Failed to register entity synchronizer because index name is null";
387       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
388     }
389
390   }
391
392   @Override
393   public void registerIndexValidator(IndexValidator indexValidator) {
394
395     String indexName = indexValidator.getIndexName();
396
397     if (indexName != null) {
398       registeredIndexValidators.add(indexValidator);
399     } else {
400       String message = "Failed to register index validator because index name is null";
401       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
402     }
403
404   }
405
406   @Override
407   public void registerIndexCleaner(IndexCleaner indexCleaner) {
408
409     String indexName = indexCleaner.getIndexName();
410
411     if (indexName != null) {
412       registeredIndexCleaners.add(indexCleaner);
413     } else {
414       String message = "Failed to register index cleaner because index name is null";
415       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
416     }
417   }
418
419   /*
420    * State machine should drive our flow dosync just dispatches an action and the state machine
421    * determines what is in play and what is next
422    */
423
424   /**
425    * Dump stat report.
426    *
427    * @param showFinalReport the show final report
428    */
429   private void dumpStatReport(boolean showFinalReport) {
430
431     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
432
433       String statReport = synchronizer.getStatReport(showFinalReport);
434
435       if (statReport != null) {
436         LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
437       }
438     }
439   }
440
441   /**
442    * Clear caches.
443    */
444   private void clearCaches() {
445
446     /*
447      * Any entity caches that were built as part of the sync operation should be cleared to save
448      * memory. The original intent of the caching was to provide a short-lived cache to satisfy
449      * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
450      */
451
452     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
453       synchronizer.clearCache();
454     }
455   }
456
457   /**
458    * Perform pre sync cleanup collection.
459    */
460   private void performPreSyncCleanupCollection() {
461
462     /*
463      * ask the index cleaners to collect the their pre-sync object id collections
464      */
465
466     for (IndexCleaner cleaner : registeredIndexCleaners) {
467       cleaner.populatePreOperationCollection();
468     }
469
470     changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
471
472   }
473
474   /**
475    * Perform index sync post collection.
476    */
477   private void performIndexSyncPostCollection() {
478
479     /*
480      * ask the entity purgers to collect the their pre-sync object id collections
481      */
482
483     for (IndexCleaner cleaner : registeredIndexCleaners) {
484       cleaner.populatePostOperationCollection();
485     }
486
487   }
488
489   /**
490    * Perform index cleanup.
491    */
492   private void performIndexCleanup() {
493
494     /*
495      * ask the entity purgers to collect the their pre-sync object id collections
496      */
497
498     for (IndexCleaner cleaner : registeredIndexCleaners) {
499       cleaner.performCleanup();
500     }
501
502   }
503
504   /**
505    * Perform sync abort.
506    */
507   private void performSyncAbort() {
508     changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
509   }
510
511   /**
512    * Perform index integrity validation.
513    */
514   private void performIndexIntegrityValidation() {
515
516     /*
517      * loop through registered index validators and test and fix, if needed
518      */
519
520     for (IndexValidator validator : registeredIndexValidators) {
521       try {
522         if (!validator.exists()) {
523           validator.createOrRepair();
524         }
525       } catch (Exception exc) {
526         String message = "Index validator caused an error = " + exc.getMessage();
527         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
528       }
529     }
530
531     changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
532
533   }
534
535   /* (non-Javadoc)
536    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
537    */
538   @Override
539   public void shutdown() {
540
541     this.syncControllerExecutor.shutdown();
542     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
543
544       try {
545         synchronizer.shutdown();
546       } catch (Exception exc) {
547         LOG.error(AaiUiMsgs.ERROR_GENERIC,
548             "Synchronizer shutdown caused an error = " + exc.getMessage());
549       }
550
551     }
552     this.statReporterExecutor.shutdown();
553   }
554
555   /*
556    * Need some kind of task running that responds to a transient boolean to kill it or we just stop
557    * the executor that it is in?
558    */
559
560
561
562   /**
563    * Perform synchronization.
564    */
565   private void performSynchronization() {
566
567     /*
568      * Get all the synchronizers running in parallel
569      */
570
571     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
572       supplyAsync(new Supplier<Void>() {
573
574         @Override
575         public Void get() {
576
577           synchronizer.doSync();
578           return null;
579         }
580
581       }, this.syncControllerExecutor).whenComplete((result, error) -> {
582
583         /*
584          * We don't bother checking the result, because it will always be null as the doSync() is
585          * non-blocking.
586          */
587
588         if (error != null) {
589           LOG.error(AaiUiMsgs.ERROR_GENERIC,
590               "doSync operation failed with an error = " + error.getMessage());
591         }
592       });
593     }
594
595     boolean allDone = false;
596     long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
597     boolean dumpPeriodicStatReport = false;
598
599     while (!allDone) {
600       int totalFinished = 0;
601
602       for (IndexSynchronizer synchronizer : registeredSynchronizers) {
603         if (dumpPeriodicStatReport) {
604           if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
605             String statReport = synchronizer.getStatReport(false);
606
607             if (statReport != null) {
608               LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
609             }
610           }
611         }
612
613         if (synchronizer.getState() == SynchronizerState.IDLE
614             || synchronizer.getState() == SynchronizerState.ABORTED) {
615           totalFinished++;
616         }
617       }
618
619       if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
620         dumpPeriodicStatReport = true;
621         nextReportTimeStampInMs = System.currentTimeMillis() + 30000L; 
622       } else {
623         dumpPeriodicStatReport = false;
624       }
625
626       allDone = (totalFinished == registeredSynchronizers.size());
627
628       try {
629         Thread.sleep(250);
630       } catch (InterruptedException exc) {
631         LOG.error(AaiUiMsgs.ERROR_GENERIC,
632             "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
633       }
634
635     }
636
637     changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
638
639   }
640
641   /* (non-Javadoc)
642    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
643    */
644   @Override
645   public SynchronizerState getState() {
646
647     switch (currentInternalState) {
648
649       case IDLE: {
650         return SynchronizerState.IDLE;
651       }
652
653       default: {
654         return SynchronizerState.PERFORMING_SYNCHRONIZATION;
655
656       }
657     }
658
659   }
660
661   @Override
662   public Calendar getCreationTime() {
663     return creationTime;
664   }
665
666   @Override
667   public String getNextSyncTime() {
668     // TODO Auto-generated method stub
669     return null;
670   }
671
672   @Override
673   public boolean isPeriodicSyncEnabled() {
674     return syncControllerConfig.isPeriodicSyncEnabled();
675   }
676
677   @Override
678   public boolean isRunOnceSyncEnabled() {
679     return syncControllerConfig.isRunOnceSyncEnabled();
680   }
681   
682 }