7c9828baa5fd00fa9071e4113773ba62e0e61646
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / aggregation / sync / HistoricalEntitySummarizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.aggregation.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.sql.Timestamp;
29 import java.text.SimpleDateFormat;
30 import java.util.Collection;
31 import java.util.EnumSet;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.Supplier;
38
39 import javax.json.Json;
40 import javax.ws.rs.core.MediaType;
41
42 import org.onap.aai.cl.api.Logger;
43 import org.onap.aai.cl.eelf.LoggerFactory;
44 import org.onap.aai.cl.mdc.MdcContext;
45 import org.onap.aai.restclient.client.OperationResult;
46 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
47 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
48 import org.onap.aai.sparky.dal.rest.HttpMethod;
49 import org.onap.aai.sparky.logging.AaiUiMsgs;
50 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
51 import org.onap.aai.sparky.sync.IndexSynchronizer;
52 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
53 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
54 import org.onap.aai.sparky.sync.enumeration.OperationState;
55 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
56 import org.onap.aai.sparky.util.NodeUtils;
57 import org.slf4j.MDC;
58
59 import com.fasterxml.jackson.databind.JsonNode;
60 import com.fasterxml.jackson.databind.node.ArrayNode;
61
62 /**
63  * The Class HistoricalEntitySummarizer.
64  */
65 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
66     implements IndexSynchronizer {
67
68   private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
69   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
70
71   private boolean allWorkEnumerated;
72   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
73   private boolean syncInProgress;
74   private Map<String, String> contextMap;
75   private ElasticSearchSchemaConfig schemaConfig;
76   private SearchableEntityLookup searchableEntityLookup;
77
78   /**
79    * Instantiates a new historical entity summarizer.
80    *
81    * @param indexName the index name
82    * @throws Exception the exception
83    */
84   public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
85       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
86       NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup)
87       throws Exception {
88     super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
89
90     this.schemaConfig = schemaConfig;
91     this.allWorkEnumerated = false;
92     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
93     this.synchronizerName = "Historical Entity Summarizer";
94     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
95     this.syncInProgress = false;
96     this.contextMap = MDC.getCopyOfContextMap(); 
97     this.syncDurationInMs = -1;
98     this.searchableEntityLookup = searchableEntityLookup;
99   }
100
101   /**
102    * Collect all the work.
103    *
104    * @return the operation state
105    */
106   private OperationState collectAllTheWork() {
107         
108     Map<String, SearchableOxmEntityDescriptor> descriptorMap =
109         searchableEntityLookup.getSearchableEntityDescriptors();
110
111     if (descriptorMap.isEmpty()) {
112       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
113
114       return OperationState.ERROR;
115     }
116
117     Collection<String> entityTypes = descriptorMap.keySet();
118
119     AtomicInteger asyncWoH = new AtomicInteger(0);
120
121     asyncWoH.set(entityTypes.size());
122
123     try {
124       for (String entityType : entityTypes) {
125
126         supplyAsync(new Supplier<Void>() {
127
128           @Override
129           public Void get() {
130                 MDC.setContextMap(contextMap);
131             try {
132               OperationResult typeLinksResult =
133                   aaiAdapter.getSelfLinksByEntityType(entityType);
134               updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
135               processEntityTypeSelfLinks(entityType, typeLinksResult);
136             } catch (Exception exc) {
137               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
138               
139             }
140
141             return null;
142           }
143
144         }, aaiExecutor).whenComplete((result, error) -> {
145
146           asyncWoH.decrementAndGet();
147
148           if (error != null) {
149             LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
150           }
151
152         });
153
154       }
155
156
157       while (asyncWoH.get() > 0) {
158
159         if (LOG.isDebugEnabled()) {
160           LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
161         }
162
163         Thread.sleep(250);
164       }
165
166       esWorkOnHand.set(entityCounters.size());
167
168       // start doing the real work
169       allWorkEnumerated = true;
170
171       insertEntityTypeCounters();
172
173       if (LOG.isDebugEnabled()) {
174
175         StringBuilder sb = new StringBuilder(128);
176
177         sb.append("\n\nHistorical Entity Counters:");
178
179         for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
180           sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
181         }
182
183         LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
184
185       }
186
187     } catch (Exception exc) {
188       LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
189
190
191       esWorkOnHand.set(0);
192       allWorkEnumerated = true;
193
194       return OperationState.ERROR;
195     }
196
197     return OperationState.OK;
198
199   }
200
201   /* (non-Javadoc)
202    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
203    */
204   @Override
205   public OperationState doSync() {
206     this.syncDurationInMs = -1;
207         String txnID = NodeUtils.getRandomTxnId();
208     MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
209         
210     if (syncInProgress) {
211       LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
212       return OperationState.PENDING;
213     }
214
215     clearCache();
216
217     syncInProgress = true;
218     this.syncStartedTimeStampInMs = System.currentTimeMillis();
219     allWorkEnumerated = false;
220
221     return collectAllTheWork();
222   }
223
224   /**
225    * Process entity type self links.
226    *
227    * @param entityType the entity type
228    * @param operationResult the operation result
229    */
230   private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
231
232     JsonNode rootNode = null;
233
234     final String jsonResult = operationResult.getResult();
235
236     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
237
238       try {
239         rootNode = mapper.readTree(jsonResult);
240       } catch (IOException exc) {
241         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
242         return;
243       }
244
245       JsonNode resultData = rootNode.get("result-data");
246       ArrayNode resultDataArrayNode = null;
247
248       if (resultData != null && resultData.isArray()) {
249         resultDataArrayNode = (ArrayNode) resultData;
250         entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
251       }
252     }
253
254   }
255
256   /**
257    * Insert entity type counters.
258    */
259   private void insertEntityTypeCounters() {
260
261     if (esWorkOnHand.get() <= 0) {
262       return;
263     }
264
265     SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
266     Timestamp timestamp = new Timestamp(System.currentTimeMillis());
267     String currentFormattedTimeStamp = dateFormat.format(timestamp);
268
269     Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
270
271     for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
272
273       supplyAsync(new Supplier<Void>() {
274
275         @Override
276         public Void get() {
277           MDC.setContextMap(contextMap);
278           String jsonString = Json.createObjectBuilder().add(
279               "count", entityCounterEntry.getValue().get())
280               .add("entityType", entityCounterEntry.getKey())
281               .add("timestamp", currentFormattedTimeStamp).build().toString();
282
283           String link = null;
284           try {
285             link = elasticSearchAdapter.buildElasticSearchPostUrl(indexName);
286             OperationResult or = elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE);
287             updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
288           } catch (Exception exc) {
289             LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
290           }
291
292           return null;
293         }
294
295       }, esExecutor).whenComplete((result, error) -> {
296
297         esWorkOnHand.decrementAndGet();
298
299       });
300
301     }
302
303     while (esWorkOnHand.get() > 0) {
304
305       try {
306         Thread.sleep(500);
307       } catch (InterruptedException exc) {
308         LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
309       }
310     }
311
312   }
313
314   @Override
315   public SynchronizerState getState() {
316
317     if (!isSyncDone()) {
318       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
319     }
320
321     return SynchronizerState.IDLE;
322
323   }
324
325   /* (non-Javadoc)
326    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
327    */
328   @Override
329   public String getStatReport(boolean showFinalReport) {
330     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
331     return this.getStatReport(syncDurationInMs, showFinalReport);
332   }
333
334   /* (non-Javadoc)
335    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
336    */
337   @Override
338   public void shutdown() {
339     this.shutdownExecutors();
340   }
341
342   @Override
343   protected boolean isSyncDone() {
344
345     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
346
347     if (LOG.isDebugEnabled()) {
348       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
349           + " all work enumerated = " + allWorkEnumerated);
350     }
351
352     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
353       return false;
354     }
355
356     this.syncInProgress = false;
357
358     return true;
359   }
360
361   /* (non-Javadoc)
362    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
363    */
364   @Override
365   public void clearCache() {
366
367     if (syncInProgress) {
368       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
369       return;
370     }
371
372     super.clearCache();
373     this.resetCounters();
374     if (entityCounters != null) {
375       entityCounters.clear();
376     }
377
378     allWorkEnumerated = false;
379
380   }
381
382 }