Update the dependencies to use project version
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / synchronizer / SyncController.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.synchronizer;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.util.Collection;
28 import java.util.LinkedHashSet;
29 import java.util.concurrent.ExecutorService;
30 import java.util.function.Supplier;
31
32 import org.onap.aai.sparky.logging.AaiUiMsgs;
33 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
34 import org.onap.aai.sparky.util.NodeUtils;
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37
38 /**
39  * The Class SyncController.
40  *
41  * @author davea.
42  */
43 public class SyncController {
44   private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncController.class);
45
46   /**
47    * The Enum InternalState.
48    */
49   private enum InternalState {
50     IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC, TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
51   }
52
53   /**
54    * The Enum SyncActions.
55    */
56   public enum SyncActions {
57     SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE, SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
58   }
59
60   private Collection<IndexSynchronizer> registeredSynchronizers;
61   private Collection<IndexValidator> registeredIndexValidators;
62   private Collection<IndexCleaner> registeredIndexCleaners;
63   private InternalState currentInternalState;
64   private ExecutorService syncControllerExecutor;
65   private ExecutorService statReporterExecutor;
66   private final String controllerName;
67
68   /**
69    * Instantiates a new sync controller.
70    *
71    * @param name the name
72    * @throws Exception the exception
73    */
74   public SyncController(String name) throws Exception {
75
76     this.controllerName = name;
77     /*
78      * Does LHS result in a non-duplicated object collection?? What happens if you double-add an
79      * object?
80      */
81
82     registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
83     registeredIndexValidators = new LinkedHashSet<IndexValidator>();
84     registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
85
86     this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController", 5, LOG);
87     this.statReporterExecutor = NodeUtils.createNamedExecutor("StatReporter", 1, LOG);
88
89     this.currentInternalState = InternalState.IDLE;
90   }
91
92   /**
93    * Change internal state.
94    *
95    * @param newState the new state
96    * @param causedByAction the caused by action
97    */
98   private void changeInternalState(InternalState newState, SyncActions causedByAction) {
99     LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName, currentInternalState.toString(),
100         newState.toString(), causedByAction.toString());
101
102     this.currentInternalState = newState;
103
104     performStateAction();
105   }
106
107   public String getControllerName() {
108     return controllerName;
109   }
110
111   /**
112    * Perform action.
113    *
114    * @param requestedAction the requested action
115    */
116   public void performAction(SyncActions requestedAction) {
117
118     if (currentInternalState == InternalState.IDLE) {
119
120       try {
121         switch (requestedAction) {
122           case SYNCHRONIZE:
123             changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
124             break;
125
126           default:
127             break;
128         }
129
130       } catch (Exception exc) {
131         String message = "An error occurred while performing action = " + requestedAction
132             + ". Error = " + exc.getMessage();
133         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
134       }
135     } else {
136       LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
137     }
138   }
139
140   /**
141    * Perform state action.
142    */
143   private void performStateAction() {
144
145     try {
146       switch (currentInternalState) {
147
148         case TEST_INDEX_INTEGRITY:
149           performIndexIntegrityValidation();
150           break;
151
152         case PRE_SYNC:
153           performPreSyncCleanupCollection();
154           break;
155
156         case SYNC_OPERATION:
157           performSynchronization();
158           break;
159
160         case POST_SYNC:
161           performIndexSyncPostCollection();
162           changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
163           break;
164
165         case SELECTIVE_DELETE:
166           performIndexCleanup();
167           changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
168           break;
169
170         case GENERATE_FINAL_REPORT:
171
172           dumpStatReport(true);
173           clearCaches();
174           changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
175           break;
176
177         case ABORTING_SYNC:
178           performSyncAbort();
179           break;
180
181         default:
182           break;
183       }
184     } catch (Exception exc) {
185       String message = "Caught an error which performing action. Error = " + exc.getMessage();
186       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
187     }
188   }
189
190   /**
191    * Register entity synchronizer.
192    *
193    * @param entitySynchronizer the entity synchronizer
194    */
195   public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
196
197     String indexName = entitySynchronizer.getIndexName();
198
199     if (indexName != null) {
200       registeredSynchronizers.add(entitySynchronizer);
201     } else {
202       String message = "Failed to register entity synchronizer because index name is null";
203       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
204     }
205
206   }
207
208   /**
209    * Register index validator.
210    *
211    * @param indexValidator the index validator
212    */
213   public void registerIndexValidator(IndexValidator indexValidator) {
214
215     String indexName = indexValidator.getIndexName();
216
217     if (indexName != null) {
218       registeredIndexValidators.add(indexValidator);
219     } else {
220       String message = "Failed to register index validator because index name is null";
221       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
222     }
223
224   }
225
226   /**
227    * Register index cleaner.
228    *
229    * @param indexCleaner the index cleaner
230    */
231   public void registerIndexCleaner(IndexCleaner indexCleaner) {
232
233     String indexName = indexCleaner.getIndexName();
234
235     if (indexName != null) {
236       registeredIndexCleaners.add(indexCleaner);
237     } else {
238       String message = "Failed to register index cleaner because index name is null";
239       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
240     }
241   }
242
243   /*
244    * State machine should drive our flow dosync just dispatches an action and the state machine
245    * determines what is in play and what is next
246    */
247
248   /**
249    * Dump stat report.
250    *
251    * @param showFinalReport the show final report
252    */
253   private void dumpStatReport(boolean showFinalReport) {
254
255     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
256
257       String statReport = synchronizer.getStatReport(showFinalReport);
258
259       if (statReport != null) {
260         LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
261       }
262     }
263   }
264
265   /**
266    * Clear caches.
267    */
268   private void clearCaches() {
269
270     /*
271      * Any entity caches that were built as part of the sync operation should be cleared to save
272      * memory. The original intent of the caching was to provide a short-lived cache to satisfy
273      * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
274      */
275
276     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
277       synchronizer.clearCache();
278     }
279   }
280
281   /**
282    * Perform pre sync cleanup collection.
283    */
284   private void performPreSyncCleanupCollection() {
285
286     /*
287      * ask the index cleaners to collect the their pre-sync object id collections
288      */
289
290     for (IndexCleaner cleaner : registeredIndexCleaners) {
291       cleaner.populatePreOperationCollection();
292     }
293
294     changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
295
296   }
297
298   /**
299    * Perform index sync post collection.
300    */
301   private void performIndexSyncPostCollection() {
302
303     /*
304      * ask the entity purgers to collect the their pre-sync object id collections
305      */
306
307     for (IndexCleaner cleaner : registeredIndexCleaners) {
308       cleaner.populatePostOperationCollection();
309     }
310
311   }
312
313   /**
314    * Perform index cleanup.
315    */
316   private void performIndexCleanup() {
317
318     /*
319      * ask the entity purgers to collect the their pre-sync object id collections
320      */
321
322     for (IndexCleaner cleaner : registeredIndexCleaners) {
323       cleaner.performCleanup();
324     }
325
326   }
327
328   /**
329    * Perform sync abort.
330    */
331   private void performSyncAbort() {
332     changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
333   }
334
335   /**
336    * Perform index integrity validation.
337    */
338   private void performIndexIntegrityValidation() {
339
340     /*
341      * loop through registered index validators and test and fix, if needed
342      */
343
344     for (IndexValidator validator : registeredIndexValidators) {
345       try {
346         if (!validator.exists()) {
347           validator.createOrRepair();
348         }
349       } catch (Exception exc) {
350         String message = "Index validator caused an error = " + exc.getMessage();
351         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
352       }
353     }
354
355     changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
356
357   }
358
359   /**
360    * Shutdown.
361    */
362   public void shutdown() {
363
364     this.syncControllerExecutor.shutdown();
365     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
366
367       try {
368         synchronizer.shutdown();
369       } catch (Exception exc) {
370         LOG.error(AaiUiMsgs.ERROR_GENERIC,
371             "Synchronizer shutdown caused an error = " + exc.getMessage());
372       }
373
374     }
375     this.statReporterExecutor.shutdown();
376   }
377
378   /*
379    * Need some kind of task running that responds to a transient boolean to kill it or we just stop
380    * the executor that it is in?
381    */
382
383
384
385   /**
386    * Perform synchronization.
387    */
388   private void performSynchronization() {
389
390     /*
391      * Get all the synchronizers running in parallel
392      */
393
394     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
395       supplyAsync(new Supplier<Void>() {
396
397         @Override
398         public Void get() {
399
400           synchronizer.doSync();
401           return null;
402         }
403
404       }, this.syncControllerExecutor).whenComplete((result, error) -> {
405
406         /*
407          * We don't bother checking the result, because it will always be null as the doSync() is
408          * non-blocking.
409          */
410
411         if (error != null) {
412           LOG.error(AaiUiMsgs.ERROR_GENERIC,
413               "doSync operation failed with an error = " + error.getMessage());
414         }
415       });
416     }
417
418     boolean allDone = false;
419     long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
420     boolean dumpPeriodicStatReport = false;
421
422     while (!allDone) {
423
424       int totalFinished = 0;
425
426       for (IndexSynchronizer synchronizer : registeredSynchronizers) {
427         if (dumpPeriodicStatReport) {
428           if (synchronizer.getState() != SynchronizerState.IDLE) {
429             String statReport = synchronizer.getStatReport(false);
430             if (statReport != null) {
431               LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
432             }
433           }
434           if (synchronizer.getState() == SynchronizerState.IDLE) {
435             totalFinished++;
436           }
437         }
438       }
439       if (System.currentTimeMillis() > nextReportTimeStampInMs) {
440         dumpPeriodicStatReport = true;
441         nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
442       } else {
443         dumpPeriodicStatReport = false;
444       }
445       allDone = (totalFinished == registeredSynchronizers.size());
446
447       try {
448         Thread.sleep(250);
449       } catch (InterruptedException exc) {
450         LOG.error(AaiUiMsgs.ERROR_GENERIC,
451             "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
452       }
453
454     }
455
456     changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
457
458   }
459
460   public SynchronizerState getState() {
461
462     switch (currentInternalState) {
463
464       case IDLE: {
465         return SynchronizerState.IDLE;
466       }
467
468       default: {
469         return SynchronizerState.PERFORMING_SYNCHRONIZATION;
470
471       }
472     }
473
474   }
475
476 }