Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / SyncController.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.util.Collection;
31 import java.util.LinkedHashSet;
32 import java.util.concurrent.ExecutorService;
33 import java.util.function.Supplier;
34
35 import org.openecomp.cl.api.Logger;
36 import org.openecomp.cl.eelf.LoggerFactory;
37 import org.openecomp.sparky.logging.AaiUiMsgs;
38 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
39 import org.openecomp.sparky.util.NodeUtils;
40
41 /**
42  * The Class SyncController.
43  *
44  * @author davea.
45  */
46 public class SyncController {
47   private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncController.class);
48
49   /**
50    * The Enum InternalState.
51    */
52   private enum InternalState {
53     IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
54     TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
55   }
56
57   /**
58    * The Enum SyncActions.
59    */
60   public enum SyncActions {
61     SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
62     SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
63   }
64
65   private Collection<IndexSynchronizer> registeredSynchronizers;
66   private Collection<IndexValidator> registeredIndexValidators;
67   private Collection<IndexCleaner> registeredIndexCleaners;
68   private InternalState currentInternalState;
69   private ExecutorService syncControllerExecutor;
70   private ExecutorService statReporterExecutor;
71   private final String controllerName;
72
73   /**
74    * Instantiates a new sync controller.
75    *
76    * @param name the name
77    * @throws Exception the exception
78    */
79   public SyncController(String name) throws Exception {
80
81     this.controllerName = name;
82     /*
83      * Does LHS result in a non-duplicated object collection?? What happens if you double-add an
84      * object?
85      */
86
87     registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
88     registeredIndexValidators = new LinkedHashSet<IndexValidator>();
89     registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();
90
91     this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController", 5, LOG);
92     this.statReporterExecutor = NodeUtils.createNamedExecutor("StatReporter", 1, LOG);
93
94     this.currentInternalState = InternalState.IDLE;
95   }
96   
97   /**
98    * Change internal state.
99    *
100    * @param newState the new state
101    * @param causedByAction the caused by action
102    */
103   private void changeInternalState(InternalState newState, SyncActions causedByAction) {
104     LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
105         currentInternalState.toString(), newState.toString(), causedByAction.toString());
106
107     this.currentInternalState = newState;
108
109     performStateAction();
110   }
111
112   public String getControllerName() {
113     return controllerName;
114   }
115
116   /**
117    * Perform action.
118    *
119    * @param requestedAction the requested action
120    */
121   public void performAction(SyncActions requestedAction) {
122
123     if (currentInternalState == InternalState.IDLE) {
124
125       try {
126         switch (requestedAction) {
127           case SYNCHRONIZE:
128             changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
129             break;
130
131           default:
132             break;
133         }
134
135       } catch (Exception exc) {
136         String message = "An error occurred while performing action = " + requestedAction
137             + ". Error = " + exc.getMessage();
138         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
139       }
140     } else {
141       LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
142     }
143   }
144
145   /**
146    * Perform state action.
147    */
148   private void performStateAction() {
149
150     try {
151       switch (currentInternalState) {
152
153         case TEST_INDEX_INTEGRITY:
154           performIndexIntegrityValidation();
155           break;
156
157         case PRE_SYNC:
158           performPreSyncCleanupCollection();
159           break;
160
161         case SYNC_OPERATION:
162           performSynchronization();
163           break;
164
165         case POST_SYNC:
166           performIndexSyncPostCollection();
167           changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
168           break;
169
170         case SELECTIVE_DELETE:
171           performIndexCleanup();
172           changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
173           break;
174
175         case GENERATE_FINAL_REPORT:
176
177           dumpStatReport(true);
178           clearCaches();
179           changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
180           break;
181
182         case ABORTING_SYNC:
183           performSyncAbort();
184           break;
185
186         default:
187           break;
188       }
189     } catch (Exception exc) {
190       String message = "Caught an error which performing action. Error = " + exc.getMessage();
191       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
192     }
193   }
194
195   /**
196    * Register entity synchronizer.
197    *
198    * @param entitySynchronizer the entity synchronizer
199    */
200   public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {
201
202     String indexName = entitySynchronizer.getIndexName();
203
204     if (indexName != null) {
205       registeredSynchronizers.add(entitySynchronizer);
206     } else {
207       String message = "Failed to register entity synchronizer because index name is null";
208       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
209     }
210
211   }
212
213   /**
214    * Register index validator.
215    *
216    * @param indexValidator the index validator
217    */
218   public void registerIndexValidator(IndexValidator indexValidator) {
219
220     String indexName = indexValidator.getIndexName();
221
222     if (indexName != null) {
223       registeredIndexValidators.add(indexValidator);
224     } else {
225       String message = "Failed to register index validator because index name is null";
226       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
227     }
228
229   }
230
231   /**
232    * Register index cleaner.
233    *
234    * @param indexCleaner the index cleaner
235    */
236   public void registerIndexCleaner(IndexCleaner indexCleaner) {
237
238     String indexName = indexCleaner.getIndexName();
239
240     if (indexName != null) {
241       registeredIndexCleaners.add(indexCleaner);
242     } else {
243       String message = "Failed to register index cleaner because index name is null";
244       LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
245     }
246   }
247
248   /*
249    * State machine should drive our flow dosync just dispatches an action and the state machine
250    * determines what is in play and what is next
251    */
252
253   /**
254    * Dump stat report.
255    *
256    * @param showFinalReport the show final report
257    */
258   private void dumpStatReport(boolean showFinalReport) {
259
260     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
261
262       String statReport = synchronizer.getStatReport(showFinalReport);
263
264       if (statReport != null) {
265         LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
266       }
267     }
268   }
269
270   /**
271    * Clear caches.
272    */
273   private void clearCaches() {
274
275     /*
276      * Any entity caches that were built as part of the sync operation should be cleared to save
277      * memory. The original intent of the caching was to provide a short-lived cache to satisfy
278      * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
279      */
280
281     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
282       synchronizer.clearCache();
283     }
284   }
285
286   /**
287    * Perform pre sync cleanup collection.
288    */
289   private void performPreSyncCleanupCollection() {
290
291     /*
292      * ask the index cleaners to collect the their pre-sync object id collections
293      */
294
295     for (IndexCleaner cleaner : registeredIndexCleaners) {
296       cleaner.populatePreOperationCollection();
297     }
298
299     changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);
300
301   }
302
303   /**
304    * Perform index sync post collection.
305    */
306   private void performIndexSyncPostCollection() {
307
308     /*
309      * ask the entity purgers to collect the their pre-sync object id collections
310      */
311
312     for (IndexCleaner cleaner : registeredIndexCleaners) {
313       cleaner.populatePostOperationCollection();
314     }
315
316   }
317
318   /**
319    * Perform index cleanup.
320    */
321   private void performIndexCleanup() {
322
323     /*
324      * ask the entity purgers to collect the their pre-sync object id collections
325      */
326
327     for (IndexCleaner cleaner : registeredIndexCleaners) {
328       cleaner.performCleanup();
329     }
330
331   }
332
333   /**
334    * Perform sync abort.
335    */
336   private void performSyncAbort() {
337     changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
338   }
339
340   /**
341    * Perform index integrity validation.
342    */
343   private void performIndexIntegrityValidation() {
344
345     /*
346      * loop through registered index validators and test and fix, if needed
347      */
348
349     for (IndexValidator validator : registeredIndexValidators) {
350       try {
351         if (!validator.exists()) {
352           validator.createOrRepair();
353         }
354       } catch (Exception exc) {
355         String message = "Index validator caused an error = " + exc.getMessage();
356         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
357       }
358     }
359
360     changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);
361
362   }
363
364   /**
365    * Shutdown.
366    */
367   public void shutdown() {
368
369     this.syncControllerExecutor.shutdown();
370     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
371
372       try {
373         synchronizer.shutdown();
374       } catch (Exception exc) {
375         LOG.error(AaiUiMsgs.ERROR_GENERIC,
376             "Synchronizer shutdown caused an error = " + exc.getMessage());
377       }
378
379     }
380     this.statReporterExecutor.shutdown();
381   }
382
383   /*
384    * Need some kind of task running that responds to a transient boolean to kill it or we just stop
385    * the executor that it is in?
386    */
387
388
389
390   /**
391    * Perform synchronization.
392    */
393   private void performSynchronization() {
394
395     /*
396      * Get all the synchronizers running in parallel
397      */
398
399     for (IndexSynchronizer synchronizer : registeredSynchronizers) {
400       supplyAsync(new Supplier<Void>() {
401
402         @Override
403         public Void get() {
404
405           synchronizer.doSync();
406           return null;
407         }
408
409       }, this.syncControllerExecutor).whenComplete((result, error) -> {
410
411         /*
412          * We don't bother checking the result, because it will always be null as the doSync() is
413          * non-blocking.
414          */
415
416         if (error != null) {
417           LOG.error(AaiUiMsgs.ERROR_GENERIC,
418               "doSync operation failed with an error = " + error.getMessage());
419         }
420       });
421     }
422
423     boolean allDone = false;
424     long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
425
426     while (!allDone) {
427
428       // allDone = false;
429
430       int totalFinished = 0;
431
432       for (IndexSynchronizer synchronizer : registeredSynchronizers) {
433         if (System.currentTimeMillis() > nextReportTimeStampInMs) {
434
435           nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;
436
437           String statReport = synchronizer.getStatReport(false);
438
439           if (statReport != null) {
440             LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
441           }
442         }
443
444         if (synchronizer.getState() == SynchronizerState.IDLE) {
445           totalFinished++;
446         }
447       }
448
449       allDone = (totalFinished == registeredSynchronizers.size());
450
451       try {
452         Thread.sleep(250);
453       } catch (InterruptedException exc) {
454         LOG.error(AaiUiMsgs.ERROR_GENERIC,
455             "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
456       }
457
458     }
459
460     changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);
461
462   }
463
464   public SynchronizerState getState() {
465
466     switch (currentInternalState) {
467
468       case IDLE: {
469         return SynchronizerState.IDLE;
470       }
471
472       default: {
473         return SynchronizerState.PERFORMING_SYNCHRONIZATION;
474
475       }
476     }
477
478   }
479
480 }