Changing the license and trademark
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / SyncController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.openecomp.sparky.synchronizer;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.util.Collection;
28 import java.util.LinkedHashSet;
29 import java.util.concurrent.ExecutorService;
30 import java.util.function.Supplier;
31
32 import org.openecomp.cl.api.Logger;
33 import org.openecomp.cl.eelf.LoggerFactory;
34 import org.openecomp.sparky.logging.AaiUiMsgs;
35 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
36 import org.openecomp.sparky.util.NodeUtils;
37
38 /**
39  * The Class SyncController.
40  *
41  * @author davea.
42  */
43 public class SyncController {
44   private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncController.class);
45
46   /**
47    * The Enum InternalState.
48    */
49   private enum InternalState {
50     IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
51     TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
52   }
53
54   /**
55    * The Enum SyncActions.
56    */
57   public enum SyncActions {
58     SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
59     SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
60   }
61
62   private Collection<IndexSynchronizer> registeredSynchronizers;
63   private Collection<IndexValidator> registeredIndexValidators;
64   private Collection<IndexCleaner> registeredIndexCleaners;
65   private InternalState currentInternalState;
66   private ExecutorService syncControllerExecutor;
67   private ExecutorService statReporterExecutor;
68   private final String controllerName;
69
70   /**
71    * Instantiates a new sync controller.
72    *
73    * @param name the name
74    * @throws Exception the exception
75    */
76   public SyncController(String name) throws Exception {
77
78     this.controllerName = name;
79     /*
80      * Does LHS result in a non-duplicated object collection?? What happens if you double-add an
81      * object?
82      */
83
84     registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
85     registeredIndexValidators = new LinkedHashSet<IndexValidator>();
86     registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
87
88     this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController", 5, LOG);
89     this.statReporterExecutor = NodeUtils.createNamedExecutor("StatReporter", 1, LOG);
90
91     this.currentInternalState = InternalState.IDLE;
92   }
93   
94   /**
95    * Change internal state.
96    *
97    * @param newState the new state
98    * @param causedByAction the caused by action
99    */
100   private void changeInternalState(InternalState newState, SyncActions causedByAction) {
101     LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
102         currentInternalState.toString(), newState.toString(), causedByAction.toString());
103
104     this.currentInternalState = newState;
105
106     performStateAction();
107   }
108
109   public String getControllerName() {
110     return controllerName;
111   }
112
113   /**
114    * Perform action.
115    *
116    * @param requestedAction the requested action
117    */
118   public void performAction(SyncActions requestedAction) {
119
120     if (currentInternalState == InternalState.IDLE) {
121
122       try {
123         switch (requestedAction) {
124           case SYNCHRONIZE:
125             changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
126             break;
127
128           default:
129             break;
130         }
131
132       } catch (Exception exc) {
133         String message = "An error occurred while performing action = " + requestedAction
134             + ". Error = " + exc.getMessage();
135         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
136       }
137     } else {
138       LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
139     }
140   }
141
142   /**
143    * Perform state action.
144    */
145   private void performStateAction() {
146
147     try {
148       switch (currentInternalState) {
149
150         case TEST_INDEX_INTEGRITY:
151           performIndexIntegrityValidation();
152           break;
153
154         case PRE_SYNC:
155           performPreSyncCleanupCollection();
156           break;
157
158         case SYNC_OPERATION:
159           performSynchronization();
160           break;
161
162         case POST_SYNC:
163           performIndexSyncPostCollection();
164           changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
165           break;
166
167         case SELECTIVE_DELETE:
168           performIndexCleanup();
169           changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
170           break;
171
172         case GENERATE_FINAL_REPORT:
173
174           dumpStatReport(true);
175           clearCaches();
176           changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
177           break;
178
179         case ABORTING_SYNC:
180           performSyncAbort();
181           break;
182
183         default:
184           break;
185       }
186     } catch (Exception exc) {
187       String message = "Caught an error which performing action. Error = " + exc.getMessage();
188       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
189     }
190   }
191
192   /**
193    * Register entity synchronizer.
194    *
195    * @param entitySynchronizer the entity synchronizer
196    */
197   public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
198
199     String indexName = entitySynchronizer.getIndexName();
200
201     if (indexName != null) {
202       registeredSynchronizers.add(entitySynchronizer);
203     } else {
204       String message = "Failed to register entity synchronizer because index name is null";
205       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
206     }
207
208   }
209
210   /**
211    * Register index validator.
212    *
213    * @param indexValidator the index validator
214    */
215   public void registerIndexValidator(IndexValidator indexValidator) {
216
217     String indexName = indexValidator.getIndexName();
218
219     if (indexName != null) {
220       registeredIndexValidators.add(indexValidator);
221     } else {
222       String message = "Failed to register index validator because index name is null";
223       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
224     }
225
226   }
227
228   /**
229    * Register index cleaner.
230    *
231    * @param indexCleaner the index cleaner
232    */
233   public void registerIndexCleaner(IndexCleaner indexCleaner) {
234
235     String indexName = indexCleaner.getIndexName();
236
237     if (indexName != null) {
238       registeredIndexCleaners.add(indexCleaner);
239     } else {
240       String message = "Failed to register index cleaner because index name is null";
241       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
242     }
243   }
244
245   /*
246    * State machine should drive our flow dosync just dispatches an action and the state machine
247    * determines what is in play and what is next
248    */
249
250   /**
251    * Dump stat report.
252    *
253    * @param showFinalReport the show final report
254    */
255   private void dumpStatReport(boolean showFinalReport) {
256
257     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
258
259       String statReport = synchronizer.getStatReport(showFinalReport);
260
261       if (statReport != null) {
262         LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
263       }
264     }
265   }
266
267   /**
268    * Clear caches.
269    */
270   private void clearCaches() {
271
272     /*
273      * Any entity caches that were built as part of the sync operation should be cleared to save
274      * memory. The original intent of the caching was to provide a short-lived cache to satisfy
275      * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
276      */
277
278     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
279       synchronizer.clearCache();
280     }
281   }
282
283   /**
284    * Perform pre sync cleanup collection.
285    */
286   private void performPreSyncCleanupCollection() {
287
288     /*
289      * ask the index cleaners to collect the their pre-sync object id collections
290      */
291
292     for (IndexCleaner cleaner : registeredIndexCleaners) {
293       cleaner.populatePreOperationCollection();
294     }
295
296     changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
297
298   }
299
300   /**
301    * Perform index sync post collection.
302    */
303   private void performIndexSyncPostCollection() {
304
305     /*
306      * ask the entity purgers to collect the their pre-sync object id collections
307      */
308
309     for (IndexCleaner cleaner : registeredIndexCleaners) {
310       cleaner.populatePostOperationCollection();
311     }
312
313   }
314
315   /**
316    * Perform index cleanup.
317    */
318   private void performIndexCleanup() {
319
320     /*
321      * ask the entity purgers to collect the their pre-sync object id collections
322      */
323
324     for (IndexCleaner cleaner : registeredIndexCleaners) {
325       cleaner.performCleanup();
326     }
327
328   }
329
330   /**
331    * Perform sync abort.
332    */
333   private void performSyncAbort() {
334     changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
335   }
336
337   /**
338    * Perform index integrity validation.
339    */
340   private void performIndexIntegrityValidation() {
341
342     /*
343      * loop through registered index validators and test and fix, if needed
344      */
345
346     for (IndexValidator validator : registeredIndexValidators) {
347       try {
348         if (!validator.exists()) {
349           validator.createOrRepair();
350         }
351       } catch (Exception exc) {
352         String message = "Index validator caused an error = " + exc.getMessage();
353         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
354       }
355     }
356
357     changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
358
359   }
360
361   /**
362    * Shutdown.
363    */
364   public void shutdown() {
365
366     this.syncControllerExecutor.shutdown();
367     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
368
369       try {
370         synchronizer.shutdown();
371       } catch (Exception exc) {
372         LOG.error(AaiUiMsgs.ERROR_GENERIC,
373             "Synchronizer shutdown caused an error = " + exc.getMessage());
374       }
375
376     }
377     this.statReporterExecutor.shutdown();
378   }
379
380   /*
381    * Need some kind of task running that responds to a transient boolean to kill it or we just stop
382    * the executor that it is in?
383    */
384
385
386
387   /**
388    * Perform synchronization.
389    */
390   private void performSynchronization() {
391
392     /*
393      * Get all the synchronizers running in parallel
394      */
395
396     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
397       supplyAsync(new Supplier<Void>() {
398
399         @Override
400         public Void get() {
401
402           synchronizer.doSync();
403           return null;
404         }
405
406       }, this.syncControllerExecutor).whenComplete((result, error) -> {
407
408         /*
409          * We don't bother checking the result, because it will always be null as the doSync() is
410          * non-blocking.
411          */
412
413         if (error != null) {
414           LOG.error(AaiUiMsgs.ERROR_GENERIC,
415               "doSync operation failed with an error = " + error.getMessage());
416         }
417       });
418     }
419
420     boolean allDone = false;
421     long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
422     boolean dumpPeriodicStatReport = false;
423
424     while (!allDone) {
425
426       int totalFinished = 0;
427
428       for (IndexSynchronizer synchronizer : registeredSynchronizers) {
429           if (dumpPeriodicStatReport) {
430               if (synchronizer.getState() != SynchronizerState.IDLE) {
431                 String statReport = synchronizer.getStatReport(false);
432                   if (statReport != null) {
433                             LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
434                           }
435               }
436                 if (synchronizer.getState() == SynchronizerState.IDLE) {
437                   totalFinished++;
438                 }
439           }
440       }
441       if ( System.currentTimeMillis() > nextReportTimeStampInMs) {
442           dumpPeriodicStatReport = true;
443           nextReportTimeStampInMs = System.currentTimeMillis() + 30000L; 
444         } else {
445           dumpPeriodicStatReport = false;
446         }
447       allDone = (totalFinished == registeredSynchronizers.size());
448
449       try {
450         Thread.sleep(250);
451       } catch (InterruptedException exc) {
452         LOG.error(AaiUiMsgs.ERROR_GENERIC,
453             "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
454       }
455
456     }
457
458     changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
459
460   }
461
462   public SynchronizerState getState() {
463
464     switch (currentInternalState) {
465
466       case IDLE: {
467         return SynchronizerState.IDLE;
468       }
469
470       default: {
471         return SynchronizerState.PERFORMING_SYNCHRONIZATION;
472
473       }
474     }
475
476   }
477
478 }