0fc79c635acdc5698161cc8f3d2a1d6a6e0c9c15
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / HistoricalEntitySummarizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.io.IOException;
31 import java.sql.Timestamp;
32 import java.text.SimpleDateFormat;
33 import java.util.Collection;
34 import java.util.EnumSet;
35 import java.util.Map;
36 import java.util.Map.Entry;
37 import java.util.Set;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Supplier;
41
42 import javax.json.Json;
43
44 import org.openecomp.cl.api.Logger;
45 import org.openecomp.cl.eelf.LoggerFactory;
46 import org.openecomp.cl.mdc.MdcContext;
47 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
48 import org.openecomp.sparky.dal.rest.HttpMethod;
49 import org.openecomp.sparky.dal.rest.OperationResult;
50 import org.openecomp.sparky.logging.AaiUiMsgs;
51 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
52 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
53 import org.openecomp.sparky.util.NodeUtils;
54 import org.slf4j.MDC;
55
56 import com.fasterxml.jackson.databind.JsonNode;
57 import com.fasterxml.jackson.databind.node.ArrayNode;
58
59 /**
60  * The Class HistoricalEntitySummarizer.
61  */
62 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
63     implements IndexSynchronizer {
64
65   private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
66   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
67
68   private boolean allWorkEnumerated;
69   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
70   private boolean syncInProgress;
71   private Map<String, String> contextMap;
72
73   /**
74    * Instantiates a new historical entity summarizer.
75    *
76    * @param indexName the index name
77    * @throws Exception the exception
78    */
79   public HistoricalEntitySummarizer(String indexName) throws Exception {
80     super(LOG, "HES", 2, 5, 5, indexName);
81
82     this.allWorkEnumerated = false;
83     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
84     this.synchronizerName = "Historical Entity Summarizer";
85     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
86     this.syncInProgress = false;
87     this.contextMap = MDC.getCopyOfContextMap(); 
88     this.syncDurationInMs = -1;
89   }
90
91   /**
92    * Collect all the work.
93    *
94    * @return the operation state
95    */
96   private OperationState collectAllTheWork() {
97         
98     Map<String, OxmEntityDescriptor> descriptorMap =
99         oxmModelLoader.getSearchableEntityDescriptors();
100
101     if (descriptorMap.isEmpty()) {
102       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
103
104       return OperationState.ERROR;
105     }
106
107     Collection<String> entityTypes = descriptorMap.keySet();
108
109     AtomicInteger asyncWoH = new AtomicInteger(0);
110
111     asyncWoH.set(entityTypes.size());
112
113     try {
114       for (String entityType : entityTypes) {
115
116         supplyAsync(new Supplier<Void>() {
117
118           @Override
119           public Void get() {
120                 MDC.setContextMap(contextMap);
121             try {
122               OperationResult typeLinksResult =
123                   aaiDataProvider.getSelfLinksByEntityType(entityType);
124               updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
125               processEntityTypeSelfLinks(entityType, typeLinksResult);
126             } catch (Exception exc) {
127               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
128               
129             }
130
131             return null;
132           }
133
134         }, aaiExecutor).whenComplete((result, error) -> {
135
136           asyncWoH.decrementAndGet();
137
138           if (error != null) {
139             LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
140           }
141
142         });
143
144       }
145
146
147       while (asyncWoH.get() > 0) {
148
149         if (LOG.isDebugEnabled()) {
150           LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
151         }
152
153         Thread.sleep(250);
154       }
155
156       esWorkOnHand.set(entityCounters.size());
157
158       // start doing the real work
159       allWorkEnumerated = true;
160
161       insertEntityTypeCounters();
162
163       if (LOG.isDebugEnabled()) {
164
165         StringBuilder sb = new StringBuilder(128);
166
167         sb.append("\n\nHistorical Entity Counters:");
168
169         for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
170           sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
171         }
172
173         LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
174
175       }
176
177     } catch (Exception exc) {
178       LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
179
180
181       esWorkOnHand.set(0);
182       allWorkEnumerated = true;
183
184       return OperationState.ERROR;
185     }
186
187     return OperationState.OK;
188
189   }
190
191   /* (non-Javadoc)
192    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
193    */
194   @Override
195   public OperationState doSync() {
196         String txnID = NodeUtils.getRandomTxnId();
197     MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
198         
199     if (syncInProgress) {
200       LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
201       return OperationState.PENDING;
202     }
203
204     clearCache();
205
206     syncInProgress = true;
207     this.syncStartedTimeStampInMs = System.currentTimeMillis();
208     allWorkEnumerated = false;
209
210     return collectAllTheWork();
211   }
212
213   /**
214    * Process entity type self links.
215    *
216    * @param entityType the entity type
217    * @param operationResult the operation result
218    */
219   private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
220
221     JsonNode rootNode = null;
222
223     final String jsonResult = operationResult.getResult();
224
225     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
226
227       try {
228         rootNode = mapper.readTree(jsonResult);
229       } catch (IOException exc) {
230         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
231         return;
232       }
233
234       JsonNode resultData = rootNode.get("result-data");
235       ArrayNode resultDataArrayNode = null;
236
237       if (resultData != null && resultData.isArray()) {
238         resultDataArrayNode = (ArrayNode) resultData;
239         entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
240       }
241     }
242
243   }
244
245   /**
246    * Insert entity type counters.
247    */
248   private void insertEntityTypeCounters() {
249
250     if (esWorkOnHand.get() <= 0) {
251       return;
252     }
253
254     SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
255     Timestamp timestamp = new Timestamp(System.currentTimeMillis());
256     String currentFormattedTimeStamp = dateFormat.format(timestamp);
257
258     Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
259
260     for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
261
262       supplyAsync(new Supplier<Void>() {
263
264         @Override
265         public Void get() {
266           MDC.setContextMap(contextMap);
267           String jsonString = Json.createObjectBuilder().add(
268               "count", entityCounterEntry.getValue().get())
269               .add("entityType", entityCounterEntry.getKey())
270               .add("timestamp", currentFormattedTimeStamp).build().toString();
271
272           String link = null;
273           try {
274             link = getElasticFullUrl("", indexName);
275             OperationResult or = esDataProvider.doPost(link, jsonString, "application/json");
276             updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
277           } catch (Exception exc) {
278             LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
279           }
280
281           return null;
282         }
283
284       }, esExecutor).whenComplete((result, error) -> {
285
286         esWorkOnHand.decrementAndGet();
287
288       });
289
290     }
291
292     while (esWorkOnHand.get() > 0) {
293
294       try {
295         Thread.sleep(500);
296       } catch (InterruptedException exc) {
297         LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
298       }
299     }
300
301   }
302
303   @Override
304   public SynchronizerState getState() {
305
306     if (!isSyncDone()) {
307       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
308     }
309
310     return SynchronizerState.IDLE;
311
312   }
313
314   /* (non-Javadoc)
315    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
316    */
317   @Override
318   public String getStatReport(boolean showFinalReport) {
319           syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
320           return this.getStatReport(syncDurationInMs, showFinalReport);
321   }
322
323   /* (non-Javadoc)
324    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
325    */
326   @Override
327   public void shutdown() {
328     this.shutdownExecutors();
329   }
330
331   @Override
332   protected boolean isSyncDone() {
333
334     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
335
336     if (LOG.isDebugEnabled()) {
337       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
338           + " all work enumerated = " + allWorkEnumerated);
339     }
340
341     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
342       return false;
343     }
344
345     this.syncInProgress = false;
346
347     return true;
348   }
349
350   /* (non-Javadoc)
351    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
352    */
353   @Override
354   public void clearCache() {
355
356     if (syncInProgress) {
357       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
358       return;
359     }
360
361     super.clearCache();
362     this.resetCounters();
363     if (entityCounters != null) {
364       entityCounters.clear();
365     }
366
367     allWorkEnumerated = false;
368
369   }
370
371 }