Convert Sparky to Spring-Boot
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / aggregation / sync / HistoricalEntitySummarizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.aggregation.sync;
26
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
28
29 import java.io.IOException;
30 import java.sql.Timestamp;
31 import java.text.SimpleDateFormat;
32 import java.util.Collection;
33 import java.util.EnumSet;
34 import java.util.Map;
35 import java.util.Map.Entry;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.function.Supplier;
40
41 import javax.json.Json;
42 import javax.ws.rs.core.MediaType;
43
44 import org.onap.aai.cl.api.Logger;
45 import org.onap.aai.cl.eelf.LoggerFactory;
46 import org.onap.aai.cl.mdc.MdcContext;
47 import org.onap.aai.restclient.client.OperationResult;
48 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
49 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
50 import org.onap.aai.sparky.dal.rest.HttpMethod;
51 import org.onap.aai.sparky.logging.AaiUiMsgs;
52 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
53 import org.onap.aai.sparky.sync.IndexSynchronizer;
54 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
55 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
56 import org.onap.aai.sparky.sync.enumeration.OperationState;
57 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
58 import org.onap.aai.sparky.util.NodeUtils;
59 import org.slf4j.MDC;
60
61 import com.fasterxml.jackson.databind.JsonNode;
62 import com.fasterxml.jackson.databind.node.ArrayNode;
63
64 /**
65  * The Class HistoricalEntitySummarizer.
66  */
67 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
68     implements IndexSynchronizer {
69
70   private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
71   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
72
73   private boolean allWorkEnumerated;
74   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
75   private boolean syncInProgress;
76   private Map<String, String> contextMap;
77   private ElasticSearchSchemaConfig schemaConfig;
78   private SearchableEntityLookup searchableEntityLookup;
79
80   /**
81    * Instantiates a new historical entity summarizer.
82    *
83    * @param indexName the index name
84    * @throws Exception the exception
85    */
86   public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
87       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
88       NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup)
89       throws Exception {
90     super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
91
92     this.schemaConfig = schemaConfig;
93     this.allWorkEnumerated = false;
94     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
95     this.synchronizerName = "Historical Entity Summarizer";
96     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
97     this.syncInProgress = false;
98     this.contextMap = MDC.getCopyOfContextMap(); 
99     this.syncDurationInMs = -1;
100     this.searchableEntityLookup = searchableEntityLookup;
101   }
102
103   /**
104    * Collect all the work.
105    *
106    * @return the operation state
107    */
108   private OperationState collectAllTheWork() {
109         
110     Map<String, SearchableOxmEntityDescriptor> descriptorMap =
111         searchableEntityLookup.getSearchableEntityDescriptors();
112
113     if (descriptorMap.isEmpty()) {
114       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
115
116       return OperationState.ERROR;
117     }
118
119     Collection<String> entityTypes = descriptorMap.keySet();
120
121     AtomicInteger asyncWoH = new AtomicInteger(0);
122
123     asyncWoH.set(entityTypes.size());
124
125     try {
126       for (String entityType : entityTypes) {
127
128         supplyAsync(new Supplier<Void>() {
129
130           @Override
131           public Void get() {
132                 MDC.setContextMap(contextMap);
133             try {
134               OperationResult typeLinksResult =
135                   aaiAdapter.getSelfLinksByEntityType(entityType);
136               updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
137               processEntityTypeSelfLinks(entityType, typeLinksResult);
138             } catch (Exception exc) {
139               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
140               
141             }
142
143             return null;
144           }
145
146         }, aaiExecutor).whenComplete((result, error) -> {
147
148           asyncWoH.decrementAndGet();
149
150           if (error != null) {
151             LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
152           }
153
154         });
155
156       }
157
158
159       while (asyncWoH.get() > 0) {
160
161         if (LOG.isDebugEnabled()) {
162           LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
163         }
164
165         Thread.sleep(250);
166       }
167
168       esWorkOnHand.set(entityCounters.size());
169
170       // start doing the real work
171       allWorkEnumerated = true;
172
173       insertEntityTypeCounters();
174
175       if (LOG.isDebugEnabled()) {
176
177         StringBuilder sb = new StringBuilder(128);
178
179         sb.append("\n\nHistorical Entity Counters:");
180
181         for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
182           sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
183         }
184
185         LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
186
187       }
188
189     } catch (Exception exc) {
190       LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
191
192
193       esWorkOnHand.set(0);
194       allWorkEnumerated = true;
195
196       return OperationState.ERROR;
197     }
198
199     return OperationState.OK;
200
201   }
202
203   /* (non-Javadoc)
204    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
205    */
206   @Override
207   public OperationState doSync() {
208     this.syncDurationInMs = -1;
209         String txnID = NodeUtils.getRandomTxnId();
210     MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
211         
212     if (syncInProgress) {
213       LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
214       return OperationState.PENDING;
215     }
216
217     clearCache();
218
219     syncInProgress = true;
220     this.syncStartedTimeStampInMs = System.currentTimeMillis();
221     allWorkEnumerated = false;
222
223     return collectAllTheWork();
224   }
225
226   /**
227    * Process entity type self links.
228    *
229    * @param entityType the entity type
230    * @param operationResult the operation result
231    */
232   private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
233
234     JsonNode rootNode = null;
235
236     final String jsonResult = operationResult.getResult();
237
238     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
239
240       try {
241         rootNode = mapper.readTree(jsonResult);
242       } catch (IOException exc) {
243         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
244         return;
245       }
246
247       JsonNode resultData = rootNode.get("result-data");
248       ArrayNode resultDataArrayNode = null;
249
250       if (resultData != null && resultData.isArray()) {
251         resultDataArrayNode = (ArrayNode) resultData;
252         entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
253       }
254     }
255
256   }
257
258   /**
259    * Insert entity type counters.
260    */
261   private void insertEntityTypeCounters() {
262
263     if (esWorkOnHand.get() <= 0) {
264       return;
265     }
266
267     SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
268     Timestamp timestamp = new Timestamp(System.currentTimeMillis());
269     String currentFormattedTimeStamp = dateFormat.format(timestamp);
270
271     Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
272
273     for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
274
275       supplyAsync(new Supplier<Void>() {
276
277         @Override
278         public Void get() {
279           MDC.setContextMap(contextMap);
280           String jsonString = Json.createObjectBuilder().add(
281               "count", entityCounterEntry.getValue().get())
282               .add("entityType", entityCounterEntry.getKey())
283               .add("timestamp", currentFormattedTimeStamp).build().toString();
284
285           String link = null;
286           try {
287             link = elasticSearchAdapter.buildElasticSearchPostUrl(indexName);
288             OperationResult or = elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE);
289             updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
290           } catch (Exception exc) {
291             LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
292           }
293
294           return null;
295         }
296
297       }, esExecutor).whenComplete((result, error) -> {
298
299         esWorkOnHand.decrementAndGet();
300
301       });
302
303     }
304
305     while (esWorkOnHand.get() > 0) {
306
307       try {
308         Thread.sleep(500);
309       } catch (InterruptedException exc) {
310         LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
311       }
312     }
313
314   }
315
316   @Override
317   public SynchronizerState getState() {
318
319     if (!isSyncDone()) {
320       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
321     }
322
323     return SynchronizerState.IDLE;
324
325   }
326
327   /* (non-Javadoc)
328    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
329    */
330   @Override
331   public String getStatReport(boolean showFinalReport) {
332     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
333     return this.getStatReport(syncDurationInMs, showFinalReport);
334   }
335
336   /* (non-Javadoc)
337    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
338    */
339   @Override
340   public void shutdown() {
341     this.shutdownExecutors();
342   }
343
344   @Override
345   protected boolean isSyncDone() {
346
347     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
348
349     if (LOG.isDebugEnabled()) {
350       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
351           + " all work enumerated = " + allWorkEnumerated);
352     }
353
354     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
355       return false;
356     }
357
358     this.syncInProgress = false;
359
360     return true;
361   }
362
363   /* (non-Javadoc)
364    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
365    */
366   @Override
367   public void clearCache() {
368
369     if (syncInProgress) {
370       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
371       return;
372     }
373
374     super.clearCache();
375     this.resetCounters();
376     if (entityCounters != null) {
377       entityCounters.clear();
378     }
379
380     allWorkEnumerated = false;
381
382   }
383
384 }