2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.onap.aai.sparky.aggregation.sync;
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
27 import java.io.IOException;
28 import java.sql.Timestamp;
29 import java.text.SimpleDateFormat;
30 import java.util.Collection;
31 import java.util.EnumSet;
33 import java.util.Map.Entry;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.Supplier;
39 import javax.json.Json;
40 import javax.ws.rs.core.MediaType;
42 import org.onap.aai.cl.api.Logger;
43 import org.onap.aai.cl.eelf.LoggerFactory;
44 import org.onap.aai.cl.mdc.MdcContext;
45 import org.onap.aai.restclient.client.OperationResult;
46 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
47 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
48 import org.onap.aai.sparky.dal.rest.HttpMethod;
49 import org.onap.aai.sparky.logging.AaiUiMsgs;
50 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
51 import org.onap.aai.sparky.sync.IndexSynchronizer;
52 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
53 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
54 import org.onap.aai.sparky.sync.enumeration.OperationState;
55 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
56 import org.onap.aai.sparky.util.NodeUtils;
59 import com.fasterxml.jackson.databind.JsonNode;
60 import com.fasterxml.jackson.databind.node.ArrayNode;
63 * The Class HistoricalEntitySummarizer.
65 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
66 implements IndexSynchronizer {
68 private static final Logger LOG =
69 LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
70 private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
72 private boolean allWorkEnumerated;
73 private ConcurrentHashMap<String, AtomicInteger> entityCounters;
74 private boolean syncInProgress;
75 private Map<String, String> contextMap;
76 private ElasticSearchSchemaConfig schemaConfig;
79 * Instantiates a new historical entity summarizer.
81 * @param indexName the index name
82 * @throws Exception the exception
84 public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
85 int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
86 NetworkStatisticsConfig esStatConfig) throws Exception {
87 super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
88 aaiStatConfig, esStatConfig);
90 this.schemaConfig = schemaConfig;
91 this.allWorkEnumerated = false;
92 this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
93 this.synchronizerName = "Historical Entity Summarizer";
94 this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
95 this.syncInProgress = false;
96 this.contextMap = MDC.getCopyOfContextMap();
97 this.syncDurationInMs = -1;
101 * Collect all the work.
103 * @return the operation state
105 private OperationState collectAllTheWork() {
107 Map<String, SearchableOxmEntityDescriptor> descriptorMap =
108 SearchableEntityLookup.getInstance().getSearchableEntityDescriptors();
110 if (descriptorMap.isEmpty()) {
111 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
113 return OperationState.ERROR;
116 Collection<String> entityTypes = descriptorMap.keySet();
118 AtomicInteger asyncWoH = new AtomicInteger(0);
120 asyncWoH.set(entityTypes.size());
123 for (String entityType : entityTypes) {
125 supplyAsync(new Supplier<Void>() {
129 MDC.setContextMap(contextMap);
131 OperationResult typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entityType);
132 updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
133 processEntityTypeSelfLinks(entityType, typeLinksResult);
134 } catch (Exception exc) {
135 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
142 }, aaiExecutor).whenComplete((result, error) -> {
144 asyncWoH.decrementAndGet();
147 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
155 while (asyncWoH.get() > 0) {
157 if (LOG.isDebugEnabled()) {
158 LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
159 indexName + " summarizer waiting for all the links to be processed.");
165 esWorkOnHand.set(entityCounters.size());
167 // start doing the real work
168 allWorkEnumerated = true;
170 insertEntityTypeCounters();
172 if (LOG.isDebugEnabled()) {
174 StringBuilder sb = new StringBuilder(128);
176 sb.append("\n\nHistorical Entity Counters:");
178 for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
179 sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
182 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
186 } catch (Exception exc) {
187 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
191 allWorkEnumerated = true;
193 return OperationState.ERROR;
196 return OperationState.OK;
203 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
206 public OperationState doSync() {
207 this.syncDurationInMs = -1;
208 String txnID = NodeUtils.getRandomTxnId();
209 MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
211 if (syncInProgress) {
212 LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
213 return OperationState.PENDING;
218 syncInProgress = true;
219 this.syncStartedTimeStampInMs = System.currentTimeMillis();
220 allWorkEnumerated = false;
222 return collectAllTheWork();
226 * Process entity type self links.
228 * @param entityType the entity type
229 * @param operationResult the operation result
231 private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
233 JsonNode rootNode = null;
235 final String jsonResult = operationResult.getResult();
237 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
240 rootNode = mapper.readTree(jsonResult);
241 } catch (IOException exc) {
242 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
246 JsonNode resultData = rootNode.get("result-data");
247 ArrayNode resultDataArrayNode = null;
249 if (resultData != null && resultData.isArray()) {
250 resultDataArrayNode = (ArrayNode) resultData;
251 entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
258 * Insert entity type counters.
260 private void insertEntityTypeCounters() {
262 if (esWorkOnHand.get() <= 0) {
266 SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
267 Timestamp timestamp = new Timestamp(System.currentTimeMillis());
268 String currentFormattedTimeStamp = dateFormat.format(timestamp);
270 Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
272 for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
274 supplyAsync(new Supplier<Void>() {
278 MDC.setContextMap(contextMap);
280 Json.createObjectBuilder().add("count", entityCounterEntry.getValue().get())
281 .add("entityType", entityCounterEntry.getKey())
282 .add("timestamp", currentFormattedTimeStamp).build().toString();
286 link = getElasticFullUrl("", indexName);
288 elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE);
289 updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
290 } catch (Exception exc) {
291 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage());
297 }, esExecutor).whenComplete((result, error) -> {
299 esWorkOnHand.decrementAndGet();
305 while (esWorkOnHand.get() > 0) {
309 } catch (InterruptedException exc) {
310 LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
317 public SynchronizerState getState() {
320 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
323 return SynchronizerState.IDLE;
330 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
333 public String getStatReport(boolean showFinalReport) {
334 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
335 return this.getStatReport(syncDurationInMs, showFinalReport);
341 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
344 public void shutdown() {
345 this.shutdownExecutors();
349 protected boolean isSyncDone() {
351 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
353 if (LOG.isDebugEnabled()) {
354 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
355 + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
358 if (totalWorkOnHand > 0 || !allWorkEnumerated) {
362 this.syncInProgress = false;
370 * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
373 public void clearCache() {
375 if (syncInProgress) {
376 LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
377 "Historical Entity Summarizer in progress, request to clear cache ignored");
382 this.resetCounters();
383 if (entityCounters != null) {
384 entityCounters.clear();
387 allWorkEnumerated = false;