7c810742a38aac2ff249e192642dc956adf520f9
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / sync / SyncControllerImpl.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.text.SimpleDateFormat;
26 import java.util.Calendar;
27 import java.util.Collection;
28 import java.util.Date;
29 import java.util.LinkedHashSet;
30 import java.util.TimeZone;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.function.Supplier;
35
36 import org.onap.aai.cl.api.Logger;
37 import org.onap.aai.cl.eelf.LoggerFactory;
38 import org.onap.aai.sparky.logging.AaiUiMsgs;
39 import org.onap.aai.sparky.sync.config.SyncControllerConfig;
40 import org.onap.aai.sparky.sync.enumeration.OperationState;
41 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
42 import org.onap.aai.sparky.util.NodeUtils;
43 import org.springframework.beans.factory.annotation.Autowired;
44 import org.springframework.stereotype.Component;
45
46 /**
47  * The Class SyncController.
48  *
49  * @author davea.
50  */
51 public class SyncControllerImpl implements SyncController {
52   private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncControllerImpl.class);
53
54   /**
55    * The Enum InternalState.
56    */
57   private enum InternalState {
58     IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
59     TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
60   }
61
62   private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
63
64   /**
65    * The Enum SyncActions.
66    */
67   public enum SyncActions {
68     SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
69     SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
70   }
71
72   private Collection<IndexSynchronizer> registeredSynchronizers;
73   private Collection<IndexValidator> registeredIndexValidators;
74   private Collection<IndexCleaner> registeredIndexCleaners;
75   private InternalState currentInternalState;
76   private ExecutorService syncControllerExecutor;
77   private ExecutorService statReporterExecutor;
78   
79   private long delayInMs;
80   private long syncFrequencyInMs;
81   private Date syncStartTime;
82   
83   private Date lastExecutionDate;
84   private AtomicInteger runCount; 
85   private Semaphore performingActionGate;
86   private Calendar creationTime;
87   
88   private String syncStartTimeWithTimeZone;
89   private String controllerName;
90   
91   protected SyncControllerConfig syncControllerConfig;
92   
93   
94
95
96   /**
97    * Instantiates a new sync controller.
98    *
99    * @throws Exception the exception
100    */
101   public SyncControllerImpl(SyncControllerConfig syncControllerConfig) throws Exception {
102     this(syncControllerConfig,null);
103   }
104   
105   public SyncControllerImpl(SyncControllerConfig syncControllerConfig, String targetEntityType)
106       throws Exception {
107
108     this.syncControllerConfig = syncControllerConfig;
109
110     this.delayInMs = 0L;
111     this.syncFrequencyInMs = 86400000L;
112     this.syncStartTime = null;
113     this.lastExecutionDate = null;
114     this.runCount = new AtomicInteger(0);
115     this.performingActionGate = new Semaphore(1);
116     registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
117     registeredIndexValidators = new LinkedHashSet<IndexValidator>();
118     registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
119
120     String controllerName = syncControllerConfig.getControllerName();
121
122     if (targetEntityType != null) {
123       controllerName += " (" + targetEntityType + ")";
124     }
125     
126     this.controllerName = controllerName;
127
128     this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController-" + controllerName,
129         syncControllerConfig.getNumSyncControllerWorkers(), LOG);
130     this.statReporterExecutor =
131         NodeUtils.createNamedExecutor("StatReporter-" + controllerName, 1, LOG);
132
133     this.currentInternalState = InternalState.IDLE;
134
135     this.creationTime =
136         Calendar.getInstance(TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp()));
137
138   }
139
140   
141  
142
143   
144   
145   /**
146    * Change internal state.
147    *
148    * @param newState the new state
149    * @param causedByAction the caused by action
150    */
151   private void changeInternalState(InternalState newState, SyncActions causedByAction) {
152     LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
153         currentInternalState.toString(), newState.toString(), causedByAction.toString());
154
155     this.currentInternalState = newState;
156
157     performStateAction();
158   }
159   
160   
161   
162   /* (non-Javadoc)
163    * @see org.openecomp.sparky.synchronizer.SyncController2#getDelayInMs()
164    */
165   @Override
166   public long getDelayInMs() {
167     return delayInMs;
168   }
169
170   /* (non-Javadoc)
171    * @see org.openecomp.sparky.synchronizer.SyncController2#setDelayInMs(long)
172    */
173   @Override
174   public void setDelayInMs(long delayInMs) {
175     this.delayInMs = delayInMs;
176   }
177
178   /* (non-Javadoc)
179    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncFrequencyInMs()
180    */
181   @Override
182   public long getSyncFrequencyInMs() {
183     return syncFrequencyInMs;
184   }
185
186   /* (non-Javadoc)
187    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncFrequencyInMs(long)
188    */
189   @Override
190   public void setSyncFrequencyInMs(long syncFrequencyInMs) {
191     this.syncFrequencyInMs = syncFrequencyInMs;
192   }
193
194   /* (non-Javadoc)
195    * @see org.openecomp.sparky.synchronizer.SyncController2#getSyncStartTime()
196    */
197   @Override
198   public Date getSyncStartTime() {
199     return syncStartTime;
200   }
201
202   /* (non-Javadoc)
203    * @see org.openecomp.sparky.synchronizer.SyncController2#setSyncStartTime(java.util.Date)
204    */
205   @Override
206   public void setSyncStartTime(Date syncStartTime) {
207     this.syncStartTime = syncStartTime;
208   }
209
210   /* (non-Javadoc)
211    * @see org.openecomp.sparky.synchronizer.SyncController2#getLastExecutionDate()
212    */
213   @Override
214   public Date getLastExecutionDate() {
215     return lastExecutionDate;
216   }
217
218   /* (non-Javadoc)
219    * @see org.openecomp.sparky.synchronizer.SyncController2#setLastExecutionDate(java.util.Date)
220    */
221   @Override
222   public void setLastExecutionDate(Date lastExecutionDate) {
223     this.lastExecutionDate = lastExecutionDate;
224   }
225   
226   @Override
227   public String getControllerName() {
228     return controllerName;
229   }
230   
231  
232   
233
234   @Override
235   public OperationState performAction(SyncActions requestedAction) {
236
237     if (currentInternalState == InternalState.IDLE) {
238       
239       try {
240         
241         /*
242          * non-blocking semaphore acquire used to guarantee only 1 execution of the synchronization
243          * at a time.
244          */
245         
246         switch (requestedAction) {
247           case SYNCHRONIZE:
248
249             if (performingActionGate.tryAcquire()) {
250               try {
251
252                 long opStartTime = System.currentTimeMillis();
253
254                 LOG.info(AaiUiMsgs.INFO_GENERIC,
255                     getControllerName() + " started synchronization at "
256                         + this.simpleDateFormat.format(opStartTime).replaceAll(
257                             SynchronizerConstants.TIME_STD, SynchronizerConstants.TIME_CONFIG_STD));
258
259                 runCount.incrementAndGet();
260
261                 changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
262
263                 long opEndTime = System.currentTimeMillis();
264
265                 long opTime = (opEndTime - opStartTime);
266                 
267                 String durationMessage =
268                     String.format(getControllerName() + " synchronization took '%d' ms.", opTime);
269
270                 LOG.info(AaiUiMsgs.SYNC_DURATION, durationMessage);
271                 
272                 if (syncControllerConfig.isPeriodicSyncEnabled()) {
273
274                   LOG.info(AaiUiMsgs.INFO_GENERIC,
275                       getControllerName() + " next sync to begin at " + getNextSyncTime());
276
277                   TimeZone tz = TimeZone.getTimeZone(syncControllerConfig.getTimeZoneOfSyncStartTimeStamp());
278
279                   if (opTime > this.getSyncFrequencyInMs()) {
280
281                     String durationWasLongerMessage = String.format(
282                         getControllerName() + " synchronization took '%d' ms which is larger than"
283                             + " synchronization interval of '%d' ms.",
284                         opTime, this.getSyncFrequencyInMs());
285
286                     LOG.info(AaiUiMsgs.SYNC_DURATION, durationWasLongerMessage);
287                   }
288                 }
289
290               } catch (Exception syncException) {
291                 String message = "An error occurred while performing action = " + requestedAction
292                     + ". Error = " + syncException.getMessage();
293                 LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
294               } finally {
295                 performingActionGate.release();
296               }
297             } else {
298               return OperationState.IGNORED_SYNC_NOT_IDLE;
299             }
300
301             break;
302
303           default:
304             break;
305         }
306         
307         return OperationState.OK;
308
309       } catch (Exception exc) {
310         String message = "An error occurred while performing action = " + requestedAction
311             + ". Error = " + exc.getMessage();
312         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
313         return OperationState.ERROR;
314       } finally {
315         
316       }
317     } else {
318       LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
319       return OperationState.IGNORED_SYNC_NOT_IDLE;
320     }
321   }
322
323   /**
324    * Perform state action.
325    */
326   private void performStateAction() {
327
328     try {
329       switch (currentInternalState) {
330
331         case TEST_INDEX_INTEGRITY:
332           performIndexIntegrityValidation();
333           break;
334
335         case PRE_SYNC:
336           performPreSyncCleanupCollection();
337           break;
338
339         case SYNC_OPERATION:
340           performSynchronization();
341           break;
342
343         case POST_SYNC:
344           performIndexSyncPostCollection();
345           changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
346           break;
347
348         case SELECTIVE_DELETE:
349           performIndexCleanup();
350           changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
351           break;
352
353         case GENERATE_FINAL_REPORT:
354
355           dumpStatReport(true);
356           clearCaches();
357           changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
358           break;
359
360         case ABORTING_SYNC:
361           performSyncAbort();
362           break;
363
364         default:
365           break;
366       }
367     } catch (Exception exc) {
368       /*
369        * Perhaps we should abort the sync on an exception
370        */
371       String message = "Caught an error which performing action. Error = " + exc.getMessage();
372       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
373     }
374   }
375
376   @Override
377   public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
378
379     String indexName = entitySynchronizer.getIndexName();
380
381     if (indexName != null) {
382       registeredSynchronizers.add(entitySynchronizer);
383     } else {
384       String message = "Failed to register entity synchronizer because index name is null";
385       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
386     }
387
388   }
389
390   @Override
391   public void registerIndexValidator(IndexValidator indexValidator) {
392
393     String indexName = indexValidator.getIndexName();
394
395     if (indexName != null) {
396       registeredIndexValidators.add(indexValidator);
397     } else {
398       String message = "Failed to register index validator because index name is null";
399       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
400     }
401
402   }
403
404   @Override
405   public void registerIndexCleaner(IndexCleaner indexCleaner) {
406
407     String indexName = indexCleaner.getIndexName();
408
409     if (indexName != null) {
410       registeredIndexCleaners.add(indexCleaner);
411     } else {
412       String message = "Failed to register index cleaner because index name is null";
413       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
414     }
415   }
416
417   /*
418    * State machine should drive our flow dosync just dispatches an action and the state machine
419    * determines what is in play and what is next
420    */
421
422   /**
423    * Dump stat report.
424    *
425    * @param showFinalReport the show final report
426    */
427   private void dumpStatReport(boolean showFinalReport) {
428
429     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
430
431       String statReport = synchronizer.getStatReport(showFinalReport);
432
433       if (statReport != null) {
434         LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
435       }
436     }
437   }
438
439   /**
440    * Clear caches.
441    */
442   private void clearCaches() {
443
444     /*
445      * Any entity caches that were built as part of the sync operation should be cleared to save
446      * memory. The original intent of the caching was to provide a short-lived cache to satisfy
447      * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
448      */
449
450     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
451       synchronizer.clearCache();
452     }
453   }
454
455   /**
456    * Perform pre sync cleanup collection.
457    */
458   private void performPreSyncCleanupCollection() {
459
460     /*
461      * ask the index cleaners to collect the their pre-sync object id collections
462      */
463
464     for (IndexCleaner cleaner : registeredIndexCleaners) {
465       cleaner.populatePreOperationCollection();
466     }
467
468     changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
469
470   }
471
472   /**
473    * Perform index sync post collection.
474    */
475   private void performIndexSyncPostCollection() {
476
477     /*
478      * ask the entity purgers to collect the their pre-sync object id collections
479      */
480
481     for (IndexCleaner cleaner : registeredIndexCleaners) {
482       cleaner.populatePostOperationCollection();
483     }
484
485   }
486
487   /**
488    * Perform index cleanup.
489    */
490   private void performIndexCleanup() {
491
492     /*
493      * ask the entity purgers to collect the their pre-sync object id collections
494      */
495
496     for (IndexCleaner cleaner : registeredIndexCleaners) {
497       cleaner.performCleanup();
498     }
499
500   }
501
502   /**
503    * Perform sync abort.
504    */
505   private void performSyncAbort() {
506     changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
507   }
508
509   /**
510    * Perform index integrity validation.
511    */
512   private void performIndexIntegrityValidation() {
513
514     /*
515      * loop through registered index validators and test and fix, if needed
516      */
517
518     for (IndexValidator validator : registeredIndexValidators) {
519       try {
520         if (!validator.exists()) {
521           validator.createOrRepair();
522         }
523       } catch (Exception exc) {
524         String message = "Index validator caused an error = " + exc.getMessage();
525         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
526       }
527     }
528
529     changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
530
531   }
532
533   /* (non-Javadoc)
534    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#shutdown()
535    */
536   @Override
537   public void shutdown() {
538
539     this.syncControllerExecutor.shutdown();
540     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
541
542       try {
543         synchronizer.shutdown();
544       } catch (Exception exc) {
545         LOG.error(AaiUiMsgs.ERROR_GENERIC,
546             "Synchronizer shutdown caused an error = " + exc.getMessage());
547       }
548
549     }
550     this.statReporterExecutor.shutdown();
551   }
552
553   /*
554    * Need some kind of task running that responds to a transient boolean to kill it or we just stop
555    * the executor that it is in?
556    */
557
558
559
560   /**
561    * Perform synchronization.
562    */
563   private void performSynchronization() {
564
565     /*
566      * Get all the synchronizers running in parallel
567      */
568
569     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
570       supplyAsync(new Supplier<Void>() {
571
572         @Override
573         public Void get() {
574
575           synchronizer.doSync();
576           return null;
577         }
578
579       }, this.syncControllerExecutor).whenComplete((result, error) -> {
580
581         /*
582          * We don't bother checking the result, because it will always be null as the doSync() is
583          * non-blocking.
584          */
585
586         if (error != null) {
587           LOG.error(AaiUiMsgs.ERROR_GENERIC,
588               "doSync operation failed with an error = " + error.getMessage());
589         }
590       });
591     }
592
593     boolean allDone = false;
594     long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
595     boolean dumpPeriodicStatReport = false;
596
597     while (!allDone) {
598       int totalFinished = 0;
599
600       for (IndexSynchronizer synchronizer : registeredSynchronizers) {
601         if (dumpPeriodicStatReport) {
602           if (synchronizer.getState() == SynchronizerState.PERFORMING_SYNCHRONIZATION) {
603             String statReport = synchronizer.getStatReport(false);
604
605             if (statReport != null) {
606               LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
607             }
608           }
609         }
610
611         if (synchronizer.getState() == SynchronizerState.IDLE
612             || synchronizer.getState() == SynchronizerState.ABORTED) {
613           totalFinished++;
614         }
615       }
616
617       if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
618         dumpPeriodicStatReport = true;
619         nextReportTimeStampInMs = System.currentTimeMillis() + 30000L; 
620       } else {
621         dumpPeriodicStatReport = false;
622       }
623
624       allDone = (totalFinished == registeredSynchronizers.size());
625
626       try {
627         Thread.sleep(250);
628       } catch (InterruptedException exc) {
629         LOG.error(AaiUiMsgs.ERROR_GENERIC,
630             "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
631         Thread.currentThread().interrupt();
632       }
633
634     }
635
636     changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
637
638   }
639
640   /* (non-Javadoc)
641    * @see org.openecomp.sparky.synchronizer.SyncControllerInterface#getState()
642    */
643   @Override
644   public SynchronizerState getState() {
645
646     switch (currentInternalState) {
647
648       case IDLE: {
649         return SynchronizerState.IDLE;
650       }
651
652       default: {
653         return SynchronizerState.PERFORMING_SYNCHRONIZATION;
654
655       }
656     }
657
658   }
659
660   @Override
661   public Calendar getCreationTime() {
662     return creationTime;
663   }
664
665   @Override
666   public String getNextSyncTime() {
667     // TODO Auto-generated method stub
668     return null;
669   }
670
671   @Override
672   public boolean isPeriodicSyncEnabled() {
673     return syncControllerConfig.isPeriodicSyncEnabled();
674   }
675
676   @Override
677   public boolean isRunOnceSyncEnabled() {
678     return syncControllerConfig.isRunOnceSyncEnabled();
679   }
680   
681 }