Renaming openecomp to onap
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / synchronizer / HistoricalEntitySummarizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.synchronizer;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.sql.Timestamp;
29 import java.text.SimpleDateFormat;
30 import java.util.Collection;
31 import java.util.EnumSet;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.Supplier;
38
39 import javax.json.Json;
40
41 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
42 import org.onap.aai.sparky.dal.rest.HttpMethod;
43 import org.onap.aai.sparky.dal.rest.OperationResult;
44 import org.onap.aai.sparky.logging.AaiUiMsgs;
45 import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
46 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
47 import org.onap.aai.sparky.util.NodeUtils;
48 import org.onap.aai.cl.api.Logger;
49 import org.onap.aai.cl.eelf.LoggerFactory;
50 import org.onap.aai.cl.mdc.MdcContext;
51 import org.slf4j.MDC;
52
53 import com.fasterxml.jackson.databind.JsonNode;
54 import com.fasterxml.jackson.databind.node.ArrayNode;
55
56 /**
57  * The Class HistoricalEntitySummarizer.
58  */
59 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
60     implements IndexSynchronizer {
61
62   private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
63   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
64
65   private boolean allWorkEnumerated;
66   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
67   private boolean syncInProgress;
68   private Map<String, String> contextMap;
69
70   /**
71    * Instantiates a new historical entity summarizer.
72    *
73    * @param indexName the index name
74    * @throws Exception the exception
75    */
76   public HistoricalEntitySummarizer(String indexName) throws Exception {
77     super(LOG, "HES", 2, 5, 5, indexName);
78
79     this.allWorkEnumerated = false;
80     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
81     this.synchronizerName = "Historical Entity Summarizer";
82     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
83     this.syncInProgress = false;
84     this.contextMap = MDC.getCopyOfContextMap(); 
85     this.syncDurationInMs = -1;
86   }
87
88   /**
89    * Collect all the work.
90    *
91    * @return the operation state
92    */
93   private OperationState collectAllTheWork() {
94         
95     Map<String, OxmEntityDescriptor> descriptorMap =
96         oxmModelLoader.getSearchableEntityDescriptors();
97
98     if (descriptorMap.isEmpty()) {
99       LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
100
101       return OperationState.ERROR;
102     }
103
104     Collection<String> entityTypes = descriptorMap.keySet();
105
106     AtomicInteger asyncWoH = new AtomicInteger(0);
107
108     asyncWoH.set(entityTypes.size());
109
110     try {
111       for (String entityType : entityTypes) {
112
113         supplyAsync(new Supplier<Void>() {
114
115           @Override
116           public Void get() {
117                 MDC.setContextMap(contextMap);
118             try {
119               OperationResult typeLinksResult =
120                   aaiDataProvider.getSelfLinksByEntityType(entityType);
121               updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
122               processEntityTypeSelfLinks(entityType, typeLinksResult);
123             } catch (Exception exc) {
124               LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
125               
126             }
127
128             return null;
129           }
130
131         }, aaiExecutor).whenComplete((result, error) -> {
132
133           asyncWoH.decrementAndGet();
134
135           if (error != null) {
136             LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
137           }
138
139         });
140
141       }
142
143
144       while (asyncWoH.get() > 0) {
145
146         if (LOG.isDebugEnabled()) {
147           LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
148         }
149
150         Thread.sleep(250);
151       }
152
153       esWorkOnHand.set(entityCounters.size());
154
155       // start doing the real work
156       allWorkEnumerated = true;
157
158       insertEntityTypeCounters();
159
160       if (LOG.isDebugEnabled()) {
161
162         StringBuilder sb = new StringBuilder(128);
163
164         sb.append("\n\nHistorical Entity Counters:");
165
166         for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
167           sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
168         }
169
170         LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
171
172       }
173
174     } catch (Exception exc) {
175       LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
176
177
178       esWorkOnHand.set(0);
179       allWorkEnumerated = true;
180
181       return OperationState.ERROR;
182     }
183
184     return OperationState.OK;
185
186   }
187
188   /* (non-Javadoc)
189    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
190    */
191   @Override
192   public OperationState doSync() {
193         String txnID = NodeUtils.getRandomTxnId();
194     MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
195         
196     if (syncInProgress) {
197       LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
198       return OperationState.PENDING;
199     }
200
201     clearCache();
202
203     syncInProgress = true;
204     this.syncStartedTimeStampInMs = System.currentTimeMillis();
205     allWorkEnumerated = false;
206
207     return collectAllTheWork();
208   }
209
210   /**
211    * Process entity type self links.
212    *
213    * @param entityType the entity type
214    * @param operationResult the operation result
215    */
216   private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
217
218     JsonNode rootNode = null;
219
220     final String jsonResult = operationResult.getResult();
221
222     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
223
224       try {
225         rootNode = mapper.readTree(jsonResult);
226       } catch (IOException exc) {
227         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
228         return;
229       }
230
231       JsonNode resultData = rootNode.get("result-data");
232       ArrayNode resultDataArrayNode = null;
233
234       if (resultData != null && resultData.isArray()) {
235         resultDataArrayNode = (ArrayNode) resultData;
236         entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
237       }
238     }
239
240   }
241
242   /**
243    * Insert entity type counters.
244    */
245   private void insertEntityTypeCounters() {
246
247     if (esWorkOnHand.get() <= 0) {
248       return;
249     }
250
251     SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
252     Timestamp timestamp = new Timestamp(System.currentTimeMillis());
253     String currentFormattedTimeStamp = dateFormat.format(timestamp);
254
255     Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
256
257     for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
258
259       supplyAsync(new Supplier<Void>() {
260
261         @Override
262         public Void get() {
263           MDC.setContextMap(contextMap);
264           String jsonString = Json.createObjectBuilder().add(
265               "count", entityCounterEntry.getValue().get())
266               .add("entityType", entityCounterEntry.getKey())
267               .add("timestamp", currentFormattedTimeStamp).build().toString();
268
269           String link = null;
270           try {
271             link = getElasticFullUrl("", indexName);
272             OperationResult or = esDataProvider.doPost(link, jsonString, "application/json");
273             updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
274           } catch (Exception exc) {
275             LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
276           }
277
278           return null;
279         }
280
281       }, esExecutor).whenComplete((result, error) -> {
282
283         esWorkOnHand.decrementAndGet();
284
285       });
286
287     }
288
289     while (esWorkOnHand.get() > 0) {
290
291       try {
292         Thread.sleep(500);
293       } catch (InterruptedException exc) {
294         LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
295       }
296     }
297
298   }
299
300   @Override
301   public SynchronizerState getState() {
302
303     if (!isSyncDone()) {
304       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
305     }
306
307     return SynchronizerState.IDLE;
308
309   }
310
311   /* (non-Javadoc)
312    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
313    */
314   @Override
315   public String getStatReport(boolean showFinalReport) {
316           syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
317           return this.getStatReport(syncDurationInMs, showFinalReport);
318   }
319
320   /* (non-Javadoc)
321    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
322    */
323   @Override
324   public void shutdown() {
325     this.shutdownExecutors();
326   }
327
328   @Override
329   protected boolean isSyncDone() {
330
331     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
332
333     if (LOG.isDebugEnabled()) {
334       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
335           + " all work enumerated = " + allWorkEnumerated);
336     }
337
338     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
339       return false;
340     }
341
342     this.syncInProgress = false;
343
344     return true;
345   }
346
347   /* (non-Javadoc)
348    * @see org.onap.aai.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
349    */
350   @Override
351   public void clearCache() {
352
353     if (syncInProgress) {
354       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
355       return;
356     }
357
358     super.clearCache();
359     this.resetCounters();
360     if (entityCounters != null) {
361       entityCounters.clear();
362     }
363
364     allWorkEnumerated = false;
365
366   }
367
368 }