Update license and poms
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / aggregation / sync / HistoricalEntitySummarizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.aggregation.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.sql.Timestamp;
27 import java.text.SimpleDateFormat;
28 import java.util.Collection;
29 import java.util.EnumSet;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.function.Supplier;
36
37 import javax.json.Json;
38 import javax.ws.rs.core.MediaType;
39
40 import org.onap.aai.cl.api.Logger;
41 import org.onap.aai.cl.eelf.LoggerFactory;
42 import org.onap.aai.cl.mdc.MdcContext;
43 import org.onap.aai.restclient.client.OperationResult;
44 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
45 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
46 import org.onap.aai.sparky.dal.rest.HttpMethod;
47 import org.onap.aai.sparky.logging.AaiUiMsgs;
48 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
49 import org.onap.aai.sparky.sync.IndexSynchronizer;
50 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
51 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
52 import org.onap.aai.sparky.sync.enumeration.OperationState;
53 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
54 import org.onap.aai.sparky.util.NodeUtils;
55 import org.slf4j.MDC;
56
57 import com.fasterxml.jackson.databind.JsonNode;
58 import com.fasterxml.jackson.databind.node.ArrayNode;
59
60 /**
61  * The Class HistoricalEntitySummarizer.
62  */
63 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
64     implements IndexSynchronizer {
65
66   private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
67   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
68
69   private boolean allWorkEnumerated;
70   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
71   private boolean syncInProgress;
72   private Map<String, String> contextMap;
73   private ElasticSearchSchemaConfig schemaConfig;
74   private SearchableEntityLookup searchableEntityLookup;
75
76   /**
77    * Instantiates a new historical entity summarizer.
78    *
79    * @param indexName the index name
80    * @throws Exception the exception
81    */
82   public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
83       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
84       NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup)
85       throws Exception {
86     super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
87
88     this.schemaConfig = schemaConfig;
89     this.allWorkEnumerated = false;
90     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
91     this.synchronizerName = "Historical Entity Summarizer";
92     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
93     this.syncInProgress = false;
94     this.contextMap = MDC.getCopyOfContextMap(); 
95     this.syncDurationInMs = -1;
96     this.searchableEntityLookup = searchableEntityLookup;
97   }
98
99   /**
100    * Collect all the work.
101    *
102    * @return the operation state
103    */
104   private OperationState collectAllTheWork() {
105         
106     Map<String, SearchableOxmEntityDescriptor> descriptorMap =
107         searchableEntityLookup.getSearchableEntityDescriptors();
108
109     if (descriptorMap.isEmpty()) {
110       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
111
112       return OperationState.ERROR;
113     }
114
115     Collection<String> entityTypes = descriptorMap.keySet();
116
117     AtomicInteger asyncWoH = new AtomicInteger(0);
118
119     asyncWoH.set(entityTypes.size());
120
121     try {
122       for (String entityType : entityTypes) {
123
124         supplyAsync(new Supplier<Void>() {
125
126           @Override
127           public Void get() {
128                 MDC.setContextMap(contextMap);
129             try {
130               OperationResult typeLinksResult =
131                   aaiAdapter.getSelfLinksByEntityType(entityType);
132               updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
133               processEntityTypeSelfLinks(entityType, typeLinksResult);
134             } catch (Exception exc) {
135               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
136               
137             }
138
139             return null;
140           }
141
142         }, aaiExecutor).whenComplete((result, error) -> {
143
144           asyncWoH.decrementAndGet();
145
146           if (error != null) {
147             LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
148           }
149
150         });
151
152       }
153
154
155       while (asyncWoH.get() > 0) {
156
157         if (LOG.isDebugEnabled()) {
158           LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
159         }
160
161         Thread.sleep(250);
162       }
163
164       esWorkOnHand.set(entityCounters.size());
165
166       // start doing the real work
167       allWorkEnumerated = true;
168
169       insertEntityTypeCounters();
170
171       if (LOG.isDebugEnabled()) {
172
173         StringBuilder sb = new StringBuilder(128);
174
175         sb.append("\n\nHistorical Entity Counters:");
176
177         for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
178           sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
179         }
180
181         LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
182
183       }
184
185     } catch (Exception exc) {
186       LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
187
188
189       esWorkOnHand.set(0);
190       allWorkEnumerated = true;
191
192       return OperationState.ERROR;
193     }
194
195     return OperationState.OK;
196
197   }
198
199   /* (non-Javadoc)
200    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
201    */
202   @Override
203   public OperationState doSync() {
204     this.syncDurationInMs = -1;
205         String txnID = NodeUtils.getRandomTxnId();
206     MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
207         
208     if (syncInProgress) {
209       LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
210       return OperationState.PENDING;
211     }
212
213     clearCache();
214
215     syncInProgress = true;
216     this.syncStartedTimeStampInMs = System.currentTimeMillis();
217     allWorkEnumerated = false;
218
219     return collectAllTheWork();
220   }
221
222   /**
223    * Process entity type self links.
224    *
225    * @param entityType the entity type
226    * @param operationResult the operation result
227    */
228   private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
229
230     JsonNode rootNode = null;
231
232     final String jsonResult = operationResult.getResult();
233
234     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
235
236       try {
237         rootNode = mapper.readTree(jsonResult);
238       } catch (IOException exc) {
239         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
240         return;
241       }
242
243       JsonNode resultData = rootNode.get("result-data");
244       ArrayNode resultDataArrayNode = null;
245
246       if (resultData != null && resultData.isArray()) {
247         resultDataArrayNode = (ArrayNode) resultData;
248         entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
249       }
250     }
251
252   }
253
254   /**
255    * Insert entity type counters.
256    */
257   private void insertEntityTypeCounters() {
258
259     if (esWorkOnHand.get() <= 0) {
260       return;
261     }
262
263     SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
264     Timestamp timestamp = new Timestamp(System.currentTimeMillis());
265     String currentFormattedTimeStamp = dateFormat.format(timestamp);
266
267     Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
268
269     for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
270
271       supplyAsync(new Supplier<Void>() {
272
273         @Override
274         public Void get() {
275           MDC.setContextMap(contextMap);
276           String jsonString = Json.createObjectBuilder().add(
277               "count", entityCounterEntry.getValue().get())
278               .add("entityType", entityCounterEntry.getKey())
279               .add("timestamp", currentFormattedTimeStamp).build().toString();
280
281           String link = null;
282           try {
283             link = elasticSearchAdapter.buildElasticSearchPostUrl(indexName);
284             OperationResult or = elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE);
285             updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
286           } catch (Exception exc) {
287             LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
288           }
289
290           return null;
291         }
292
293       }, esExecutor).whenComplete((result, error) -> {
294
295         esWorkOnHand.decrementAndGet();
296
297       });
298
299     }
300
301     while (esWorkOnHand.get() > 0) {
302
303       try {
304         Thread.sleep(500);
305       } catch (InterruptedException exc) {
306         LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
307       }
308     }
309
310   }
311
312   @Override
313   public SynchronizerState getState() {
314
315     if (!isSyncDone()) {
316       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
317     }
318
319     return SynchronizerState.IDLE;
320
321   }
322
323   /* (non-Javadoc)
324    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
325    */
326   @Override
327   public String getStatReport(boolean showFinalReport) {
328     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
329     return this.getStatReport(syncDurationInMs, showFinalReport);
330   }
331
332   /* (non-Javadoc)
333    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
334    */
335   @Override
336   public void shutdown() {
337     this.shutdownExecutors();
338   }
339
340   @Override
341   protected boolean isSyncDone() {
342
343     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
344
345     if (LOG.isDebugEnabled()) {
346       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
347           + " all work enumerated = " + allWorkEnumerated);
348     }
349
350     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
351       return false;
352     }
353
354     this.syncInProgress = false;
355
356     return true;
357   }
358
359   /* (non-Javadoc)
360    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
361    */
362   @Override
363   public void clearCache() {
364
365     if (syncInProgress) {
366       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
367       return;
368     }
369
370     super.clearCache();
371     this.resetCounters();
372     if (entityCounters != null) {
373       entityCounters.clear();
374     }
375
376     allWorkEnumerated = false;
377
378   }
379
380 }