Merge "Remove duplicated code"
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / aggregation / sync / HistoricalEntitySummarizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.aggregation.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.sql.Timestamp;
27 import java.text.SimpleDateFormat;
28 import java.util.Collection;
29 import java.util.EnumSet;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.function.Supplier;
36
37 import javax.json.Json;
38 import javax.ws.rs.core.MediaType;
39
40 import org.onap.aai.cl.api.Logger;
41 import org.onap.aai.cl.eelf.LoggerFactory;
42 import org.onap.aai.cl.mdc.MdcContext;
43 import org.onap.aai.restclient.client.OperationResult;
44 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
45 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
46 import org.onap.aai.sparky.dal.rest.HttpMethod;
47 import org.onap.aai.sparky.logging.AaiUiMsgs;
48 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
49 import org.onap.aai.sparky.sync.IndexSynchronizer;
50 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
51 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
52 import org.onap.aai.sparky.sync.enumeration.OperationState;
53 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
54 import org.onap.aai.sparky.util.NodeUtils;
55 import org.slf4j.MDC;
56
57 import com.fasterxml.jackson.databind.JsonNode;
58 import com.fasterxml.jackson.databind.node.ArrayNode;
59
60 /**
61  * The Class HistoricalEntitySummarizer.
62  */
63 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
64     implements IndexSynchronizer {
65
66   private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
67   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
68
69   private boolean allWorkEnumerated;
70   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
71   private boolean syncInProgress;
72   private Map<String, String> contextMap;
73   private ElasticSearchSchemaConfig schemaConfig;
74   private SearchableEntityLookup searchableEntityLookup;
75
76   /**
77    * Instantiates a new historical entity summarizer.
78    *
79    * @throws Exception the exception
80    */
81   public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
82       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
83       NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup)
84       throws Exception {
85     super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
86
87     this.schemaConfig = schemaConfig;
88     this.allWorkEnumerated = false;
89     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
90     this.synchronizerName = "Historical Entity Summarizer";
91     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
92     this.syncInProgress = false;
93     this.contextMap = MDC.getCopyOfContextMap(); 
94     this.syncDurationInMs = -1;
95     this.searchableEntityLookup = searchableEntityLookup;
96   }
97
98   /**
99    * Collect all the work.
100    *
101    * @return the operation state
102    */
103   private OperationState collectAllTheWork() {
104         
105     Map<String, SearchableOxmEntityDescriptor> descriptorMap =
106         searchableEntityLookup.getSearchableEntityDescriptors();
107
108     if (descriptorMap.isEmpty()) {
109       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
110
111       return OperationState.ERROR;
112     }
113
114     Collection<String> entityTypes = descriptorMap.keySet();
115
116     AtomicInteger asyncWoH = new AtomicInteger(0);
117
118     asyncWoH.set(entityTypes.size());
119
120     try {
121       for (String entityType : entityTypes) {
122
123         supplyAsync(new Supplier<Void>() {
124
125           @Override
126           public Void get() {
127                 MDC.setContextMap(contextMap);
128             try {
129               OperationResult typeLinksResult =
130                   aaiAdapter.getSelfLinksByEntityType(entityType);
131               updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
132               processEntityTypeSelfLinks(entityType, typeLinksResult);
133             } catch (Exception exc) {
134               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
135               
136             }
137
138             return null;
139           }
140
141         }, aaiExecutor).whenComplete((result, error) -> {
142
143           asyncWoH.decrementAndGet();
144
145           if (error != null) {
146             LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
147           }
148
149         });
150
151       }
152
153
154       while (asyncWoH.get() > 0) {
155
156         if (LOG.isDebugEnabled()) {
157           LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
158         }
159
160         Thread.sleep(250);
161       }
162
163       esWorkOnHand.set(entityCounters.size());
164
165       // start doing the real work
166       allWorkEnumerated = true;
167
168       insertEntityTypeCounters();
169
170       if (LOG.isDebugEnabled()) {
171
172         StringBuilder sb = new StringBuilder(128);
173
174         sb.append("\n\nHistorical Entity Counters:");
175
176         for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
177           sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
178         }
179
180         LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
181
182       }
183
184     } catch (Exception exc) {
185       LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
186
187
188       esWorkOnHand.set(0);
189       allWorkEnumerated = true;
190
191       return OperationState.ERROR;
192     }
193
194     return OperationState.OK;
195
196   }
197
198   /* (non-Javadoc)
199    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
200    */
201   @Override
202   public OperationState doSync() {
203     this.syncDurationInMs = -1;
204         String txnID = NodeUtils.getRandomTxnId();
205     MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
206         
207     if (syncInProgress) {
208       LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
209       return OperationState.PENDING;
210     }
211
212     clearCache();
213
214     syncInProgress = true;
215     this.syncStartedTimeStampInMs = System.currentTimeMillis();
216     allWorkEnumerated = false;
217
218     return collectAllTheWork();
219   }
220
221   /**
222    * Process entity type self links.
223    *
224    * @param entityType the entity type
225    * @param operationResult the operation result
226    */
227   private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
228
229     JsonNode rootNode = null;
230
231     final String jsonResult = operationResult.getResult();
232
233     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
234
235       try {
236         rootNode = mapper.readTree(jsonResult);
237       } catch (IOException exc) {
238         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
239         return;
240       }
241
242       JsonNode resultData = rootNode.get("result-data");
243       ArrayNode resultDataArrayNode = null;
244
245       if (resultData != null && resultData.isArray()) {
246         resultDataArrayNode = (ArrayNode) resultData;
247         entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
248       }
249     }
250
251   }
252
253   /**
254    * Insert entity type counters.
255    */
256   private void insertEntityTypeCounters() {
257
258     if (esWorkOnHand.get() <= 0) {
259       return;
260     }
261
262     SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
263     Timestamp timestamp = new Timestamp(System.currentTimeMillis());
264     String currentFormattedTimeStamp = dateFormat.format(timestamp);
265
266     Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
267
268     for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
269
270       supplyAsync(new Supplier<Void>() {
271
272         @Override
273         public Void get() {
274           MDC.setContextMap(contextMap);
275           String jsonString = Json.createObjectBuilder().add(
276               "count", entityCounterEntry.getValue().get())
277               .add("entityType", entityCounterEntry.getKey())
278               .add("timestamp", currentFormattedTimeStamp).build().toString();
279
280           String link = null;
281           try {
282             link = elasticSearchAdapter.buildElasticSearchPostUrl(indexName);
283             OperationResult or = elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE);
284             updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
285           } catch (Exception exc) {
286             LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
287           }
288
289           return null;
290         }
291
292       }, esExecutor).whenComplete((result, error) -> {
293
294         esWorkOnHand.decrementAndGet();
295
296       });
297
298     }
299
300     while (esWorkOnHand.get() > 0) {
301
302       try {
303         Thread.sleep(500);
304       } catch (InterruptedException exc) {
305         LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
306         Thread.currentThread().interrupt();
307       }
308     }
309
310   }
311
312   @Override
313   public SynchronizerState getState() {
314
315     if (!isSyncDone()) {
316       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
317     }
318
319     return SynchronizerState.IDLE;
320
321   }
322
323   /* (non-Javadoc)
324    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
325    */
326   @Override
327   public String getStatReport(boolean showFinalReport) {
328     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
329     return this.getStatReport(syncDurationInMs, showFinalReport);
330   }
331
332   /* (non-Javadoc)
333    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
334    */
335   @Override
336   public void shutdown() {
337     this.shutdownExecutors();
338   }
339
340   @Override
341   protected boolean isSyncDone() {
342
343     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
344
345     if (LOG.isDebugEnabled()) {
346       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
347           + " all work enumerated = " + allWorkEnumerated);
348     }
349
350     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
351       return false;
352     }
353
354     this.syncInProgress = false;
355
356     return true;
357   }
358
359   /* (non-Javadoc)
360    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
361    */
362   @Override
363   public void clearCache() {
364
365     if (syncInProgress) {
366       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
367       return;
368     }
369
370     super.clearCache();
371     this.resetCounters();
372     if (entityCounters != null) {
373       entityCounters.clear();
374     }
375
376     allWorkEnumerated = false;
377
378   }
379
380 }