Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / HistoricalEntitySummarizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.sql.Timestamp;
34 import java.text.SimpleDateFormat;
35 import java.util.Collection;
36 import java.util.EnumSet;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.Set;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.function.Supplier;
43
44 import javax.json.Json;
45
46 import org.openecomp.cl.api.Logger;
47 import org.openecomp.cl.eelf.LoggerFactory;
48 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
49 import org.openecomp.sparky.dal.rest.HttpMethod;
50 import org.openecomp.sparky.dal.rest.OperationResult;
51 import org.openecomp.sparky.logging.AaiUiMsgs;
52 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
53 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
54 import org.openecomp.sparky.util.NodeUtils;
55 import org.slf4j.MDC;
56
57 import org.openecomp.cl.mdc.MdcContext;
58
59 import org.openecomp.cl.mdc.MdcContext;
60 import com.fasterxml.jackson.databind.JsonNode;
61 import com.fasterxml.jackson.databind.node.ArrayNode;
62
63 /**
64  * The Class HistoricalEntitySummarizer.
65  */
66 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
67     implements IndexSynchronizer {
68
69   private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
70   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
71
72   private boolean allWorkEnumerated;
73   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
74   private boolean syncInProgress;
75   private Map<String, String> contextMap;
76
77   /**
78    * Instantiates a new historical entity summarizer.
79    *
80    * @param indexName the index name
81    * @throws Exception the exception
82    */
83   public HistoricalEntitySummarizer(String indexName) throws Exception {
84     super(LOG, "HES", 2, 5, 5, indexName);
85
86     this.allWorkEnumerated = false;
87     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
88     this.synchronizerName = "Historical Entity Summarizer";
89     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
90     this.syncInProgress = false;
91     this.contextMap = MDC.getCopyOfContextMap(); 
92   }
93
94   /**
95    * Collect all the work.
96    *
97    * @return the operation state
98    */
99   private OperationState collectAllTheWork() {
100         
101     Map<String, OxmEntityDescriptor> descriptorMap =
102         oxmModelLoader.getSearchableEntityDescriptors();
103
104     if (descriptorMap.isEmpty()) {
105       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
106
107       return OperationState.ERROR;
108     }
109
110     Collection<String> entityTypes = descriptorMap.keySet();
111
112     AtomicInteger asyncWoH = new AtomicInteger(0);
113
114     asyncWoH.set(entityTypes.size());
115
116     try {
117       for (String entityType : entityTypes) {
118
119         supplyAsync(new Supplier<Void>() {
120
121           @Override
122           public Void get() {
123                 MDC.setContextMap(contextMap);
124             try {
125               OperationResult typeLinksResult =
126                   aaiDataProvider.getSelfLinksByEntityType(entityType);
127               updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
128               processEntityTypeSelfLinks(entityType, typeLinksResult);
129             } catch (Exception exc) {
130               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
131               
132             }
133
134             return null;
135           }
136
137         }, aaiExecutor).whenComplete((result, error) -> {
138
139           asyncWoH.decrementAndGet();
140
141           if (error != null) {
142             LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
143           }
144
145         });
146
147       }
148
149
150       while (asyncWoH.get() > 0) {
151
152         if (LOG.isDebugEnabled()) {
153           LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
154         }
155
156         Thread.sleep(250);
157       }
158
159       esWorkOnHand.set(entityCounters.size());
160
161       // start doing the real work
162       allWorkEnumerated = true;
163
164       insertEntityTypeCounters();
165
166       if (LOG.isDebugEnabled()) {
167
168         StringBuilder sb = new StringBuilder(128);
169
170         sb.append("\n\nHistorical Entity Counters:");
171
172         for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
173           sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
174         }
175
176         LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
177
178       }
179
180     } catch (Exception exc) {
181       LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
182
183
184       esWorkOnHand.set(0);
185       allWorkEnumerated = true;
186
187       return OperationState.ERROR;
188     }
189
190     return OperationState.OK;
191
192   }
193
194   /* (non-Javadoc)
195    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
196    */
197   @Override
198   public OperationState doSync() {
199         String txnID = NodeUtils.getRandomTxnId();
200     MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
201         
202     if (syncInProgress) {
203       LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
204       return OperationState.PENDING;
205     }
206
207     clearCache();
208
209     syncInProgress = true;
210     this.syncStartedTimeStampInMs = System.currentTimeMillis();
211     allWorkEnumerated = false;
212
213     return collectAllTheWork();
214   }
215
216   /**
217    * Process entity type self links.
218    *
219    * @param entityType the entity type
220    * @param operationResult the operation result
221    */
222   private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
223
224     JsonNode rootNode = null;
225
226     final String jsonResult = operationResult.getResult();
227
228     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
229
230       try {
231         rootNode = mapper.readTree(jsonResult);
232       } catch (IOException exc) {
233         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
234         return;
235       }
236
237       JsonNode resultData = rootNode.get("result-data");
238       ArrayNode resultDataArrayNode = null;
239
240       if (resultData != null && resultData.isArray()) {
241         resultDataArrayNode = (ArrayNode) resultData;
242         entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
243       }
244     }
245
246   }
247
248   /**
249    * Insert entity type counters.
250    */
251   private void insertEntityTypeCounters() {
252
253     if (esWorkOnHand.get() <= 0) {
254       return;
255     }
256
257     SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
258     Timestamp timestamp = new Timestamp(System.currentTimeMillis());
259     String currentFormattedTimeStamp = dateFormat.format(timestamp);
260
261     Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
262
263     for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
264
265       supplyAsync(new Supplier<Void>() {
266
267         @Override
268         public Void get() {
269           MDC.setContextMap(contextMap);
270           String jsonString = Json.createObjectBuilder().add(
271               "count", entityCounterEntry.getValue().get())
272               .add("entityType", entityCounterEntry.getKey())
273               .add("timestamp", currentFormattedTimeStamp).build().toString();
274
275           String link = null;
276           try {
277             link = getElasticFullUrl("", indexName);
278             OperationResult or = esDataProvider.doPost(link, jsonString, "application/json");
279             updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
280           } catch (Exception exc) {
281             LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
282           }
283
284           return null;
285         }
286
287       }, esExecutor).whenComplete((result, error) -> {
288
289         esWorkOnHand.decrementAndGet();
290
291       });
292
293     }
294
295     while (esWorkOnHand.get() > 0) {
296
297       try {
298         Thread.sleep(500);
299       } catch (InterruptedException exc) {
300         LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
301       }
302     }
303
304   }
305
306   @Override
307   public SynchronizerState getState() {
308
309     if (!isSyncDone()) {
310       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
311     }
312
313     return SynchronizerState.IDLE;
314
315   }
316
317   /* (non-Javadoc)
318    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
319    */
320   @Override
321   public String getStatReport(boolean showFinalReport) {
322     return getStatReport(System.currentTimeMillis() - this.syncStartedTimeStampInMs,
323         showFinalReport);
324   }
325
326   /* (non-Javadoc)
327    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
328    */
329   @Override
330   public void shutdown() {
331     this.shutdownExecutors();
332   }
333
334   @Override
335   protected boolean isSyncDone() {
336
337     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
338
339     if (LOG.isDebugEnabled()) {
340       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
341           + " all work enumerated = " + allWorkEnumerated);
342     }
343
344     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
345       return false;
346     }
347
348     this.syncInProgress = false;
349
350     return true;
351   }
352
353   /* (non-Javadoc)
354    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
355    */
356   @Override
357   public void clearCache() {
358
359     if (syncInProgress) {
360       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
361       return;
362     }
363
364     super.clearCache();
365     this.resetCounters();
366     if (entityCounters != null) {
367       entityCounters.clear();
368     }
369
370     allWorkEnumerated = false;
371
372   }
373
374 }