Add collaboration feature
[sdc.git] / openecomp-be / tools / zusammen-tools / src / main / java / org / openecomp / core / tools / Commands / HealAll.java
1 package org.openecomp.core.tools.Commands;
2
3 import org.apache.commons.collections.CollectionUtils;
4 import org.openecomp.core.tools.concurrent.ItemHealingTask;
5 import org.openecomp.core.tools.loaders.VersionInfoCassandraLoader;
6 import org.openecomp.sdc.healing.api.HealingManager;
7 import org.openecomp.sdc.healing.factory.HealingManagerFactory;
8 import org.openecomp.sdc.vendorsoftwareproduct.VendorSoftwareProductConstants;
9 import org.openecomp.sdc.vendorsoftwareproduct.VendorSoftwareProductManager;
10 import org.openecomp.sdc.vendorsoftwareproduct.VspManagerFactory;
11 import org.openecomp.sdc.versioning.VersioningManager;
12 import org.openecomp.sdc.versioning.VersioningManagerFactory;
13 import org.openecomp.sdc.versioning.dao.types.Version;
14 import org.openecomp.sdc.versioning.dao.types.VersionInfoEntity;
15
16 import java.io.BufferedWriter;
17 import java.io.FileWriter;
18 import java.io.IOException;
19 import java.time.Duration;
20 import java.time.Instant;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Objects;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.stream.Stream;
29
30 /**
31  * Created by ayalaben on 11/6/2017
32  */
33 public class HealAll {
34
35   private static final String HEALING_USER = "healing_user";
36   private static final int defaulThreadNumber = 100;
37   private static BufferedWriter log;
38   private static List<ItemHealingTask> tasks = new ArrayList<>();
39   private static VendorSoftwareProductManager vspManager = VspManagerFactory
40       .getInstance().createInterface();
41   private static HealingManager healingManager = HealingManagerFactory.getInstance()
42       .createInterface();
43   private static VersioningManager versioningManager = VersioningManagerFactory.getInstance()
44       .createInterface();
45
46   static {
47     try {
48       log =
49           new BufferedWriter(new FileWriter("healing.log", true));
50     } catch (IOException e) {
51       if (log != null) {
52         try {
53           log.close();
54         } catch (IOException e1) {
55           throw new RuntimeException("can't initial healing log file: " + e1.getMessage());
56         }
57       }
58       throw new RuntimeException("can't initial healing log file: " + e.getMessage());
59     }
60   }
61
62   public static void healAll(String threadNumber) {
63
64     writeToLog("----starting healing------");
65     Instant startTime = Instant.now();
66
67     int numberOfThreads = Objects.nonNull(threadNumber) ? Integer.valueOf(threadNumber) :
68         defaulThreadNumber;
69     ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
70
71     filterByEntityType(VersionInfoCassandraLoader.list(),
72         VendorSoftwareProductConstants.VENDOR_SOFTWARE_PRODUCT_VERSIONABLE_TYPE).forEach
73         (HealAll::addTaskToTasks);
74
75     executeAllTasks(executor);
76
77     writeToLog("----finished healing------");
78     Instant endTime = Instant.now();
79     writeToLog("Total runtime was: " + Duration.between(startTime, endTime));
80
81     try {
82       if (log != null) {
83         log.close();
84       }
85     } catch (IOException e) {
86       writeToLog("Error:" + e.getMessage());
87     }
88
89     System.exit(1);
90   }
91
92   private static void executeAllTasks(ExecutorService executor) {
93     List<Future<String>> futureTasks;
94     try {
95       futureTasks = executor.invokeAll(tasks);
96       futureTasks.forEach(future -> {
97         try {
98           log.write(future.get());
99           log.newLine();
100         } catch (Exception e) {
101           writeToLog(e.getMessage());
102         }
103       });
104     } catch (InterruptedException e) {
105       writeToLog("migration tasks failed with message: " + e.getMessage());
106       throw new RuntimeException(e);
107     }
108
109     boolean isThreadOpen = true;
110     while (isThreadOpen) {
111       isThreadOpen = futureTasks.stream().anyMatch(future -> !future.isDone());
112     }
113   }
114
115   private static Version resolveVersion(VersionInfoEntity versionInfoEntity) {
116     if (Objects.nonNull(versionInfoEntity.getCandidate())) {
117       return versionInfoEntity.getCandidate().getVersion();
118     } else if (!CollectionUtils.isEmpty(versionInfoEntity.getViewableVersions())) {
119       return versionInfoEntity.getViewableVersions().stream().max(Version::compateTo).get();
120     }
121     return versionInfoEntity.getActiveVersion();
122   }
123
124   private static void writeToLog(String message) {
125     try {
126       log.write(message);
127       log.newLine();
128     } catch (IOException e) {
129       throw new RuntimeException("unable to write to healing all log file.");
130     }
131   }
132
133   private static Stream<VersionInfoEntity> filterByEntityType(
134       Collection<VersionInfoEntity> versionInfoEntities, String entityType) {
135     return versionInfoEntities.stream().filter(versionInfoEntity -> versionInfoEntity
136         .getEntityType().equals(entityType));
137   }
138
139   private static void addTaskToTasks(VersionInfoEntity versionInfoEntity) {
140     tasks.add(new ItemHealingTask(versionInfoEntity.getEntityId(), resolveVersion
141         (versionInfoEntity).toString(),
142         vspManager, healingManager));
143   }
144
145 }