X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=holmes-actions%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fholmes%2Fcommon%2Fdmaap%2FDmaapService.java;h=f29f1d98d553460790ea172ea6dc09f2d0f7731c;hb=490fc3c1fafe50d5fb0e23db5cfd10730be96866;hp=fc795399210d18f90972cb280ef3e3785b45f53a;hpb=80aa5b49e5e3fb696231ce66acf198ff50d74731;p=holmes%2Fcommon.git diff --git a/holmes-actions/src/main/java/org/onap/holmes/common/dmaap/DmaapService.java b/holmes-actions/src/main/java/org/onap/holmes/common/dmaap/DmaapService.java index fc79539..f29f1d9 100644 --- a/holmes-actions/src/main/java/org/onap/holmes/common/dmaap/DmaapService.java +++ b/holmes-actions/src/main/java/org/onap/holmes/common/dmaap/DmaapService.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 ZTE Corporation. + * Copyright 2017-2020 ZTE Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,139 +15,171 @@ */ package org.onap.holmes.common.dmaap; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.util.List; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import javax.inject.Inject; -import lombok.extern.slf4j.Slf4j; import org.jvnet.hk2.annotations.Service; import org.onap.holmes.common.aai.AaiQuery; import org.onap.holmes.common.aai.entity.RelationshipList.Relationship; -import org.onap.holmes.common.aai.entity.RelationshipList.RelationshipData; import org.onap.holmes.common.aai.entity.VmEntity; import org.onap.holmes.common.aai.entity.VnfEntity; import org.onap.holmes.common.api.stat.VesAlarm; import org.onap.holmes.common.dcae.DcaeConfigurationsCache; import org.onap.holmes.common.dmaap.entity.PolicyMsg; import org.onap.holmes.common.dmaap.entity.PolicyMsg.EVENT_STATUS; +import org.onap.holmes.common.dmaap.store.ClosedLoopControlNameCache; +import org.onap.holmes.common.dmaap.store.UniqueRequestIdCache; import org.onap.holmes.common.exception.CorrelationException; -import org.onap.holmes.common.utils.JacksonUtil; +import org.onap.holmes.common.utils.GsonUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.UUID; -@Slf4j @Service public class DmaapService { - public static final int POLICY_MESSAGE_ABATED = 1; - public static final String SERVICE_INSTANCE = "service-instance"; - public static final String SERVICE_INSTANCE_ID = "service-instance.service-instance-id"; - @Inject + + private static final Logger log = LoggerFactory.getLogger(DmaapService.class); + private AaiQuery aaiQuery; - public static ConcurrentHashMap loopControlNames = new ConcurrentHashMap<>(); - public static ConcurrentHashMap alarmUniqueRequestID = new ConcurrentHashMap<>(); + private ClosedLoopControlNameCache closedLoopControlNameCache; + private UniqueRequestIdCache uniqueRequestIdCache; + + @Inject + public void setAaiQuery(AaiQuery aaiQuery) { + this.aaiQuery = aaiQuery; + } + + @Inject + public void setClosedLoopControlNameCache(ClosedLoopControlNameCache closedLoopControlNameCache) { + this.closedLoopControlNameCache = closedLoopControlNameCache; + } + + @Inject + public void setUniqueRequestIdCache(UniqueRequestIdCache uniqueRequestIdCache) { + this.uniqueRequestIdCache = uniqueRequestIdCache; + } public void publishPolicyMsg(PolicyMsg policyMsg, String dmaapConfigKey) { try { Publisher publisher = new Publisher(); - publisher.setUrl(DcaeConfigurationsCache.getPubSecInfo(dmaapConfigKey).getDmaapInfo().getTopicUrl()); + publisher.setUrl(DcaeConfigurationsCache.getPubSecInfo(dmaapConfigKey).getDmaapInfo() + .getTopicUrl()); publisher.publish(policyMsg); - log.info("send policyMsg: " + JacksonUtil.beanToJson(policyMsg)); + deleteRequestIdIfNecessary(policyMsg); + log.info("send policyMsg: " + GsonUtil.beanToJson(policyMsg)); } catch (CorrelationException e) { - log.error("Failed to publish policyMsg to dmaap", e.getMessage()); - } catch (JsonProcessingException e) { - log.info("Failed to convert policyMsg to json"); + log.error("Failed to publish the control loop event to DMaaP", e); } catch (NullPointerException e) { - log.error("DMaaP configurations does not exist!"); + log.error("DMaaP configurations do not exist!"); } } public PolicyMsg getPolicyMsg(VesAlarm rootAlarm, VesAlarm childAlarm, String packgeName) { return Optional.ofNullable(getVmEntity(rootAlarm.getSourceId(), rootAlarm.getSourceName())) .map(vmEntity -> getEnrichedPolicyMsg(vmEntity, rootAlarm, childAlarm, packgeName)) - .orElse(getDefaultPolicyMsg(rootAlarm.getSourceName())); - } - - private String getVserverInstanceId(VnfEntity vnfEntity) { - String vserverInstanceId = ""; - if (vnfEntity != null) { - List relationshipList = vnfEntity.getRelationshipList().getRelationships(); - Relationship relationship = null; - for(int i = 0; i < relationshipList.size(); i++) { - if (SERVICE_INSTANCE.equals(relationshipList.get(i).getRelatedTo())) { - relationship = relationshipList.get(i); - break; - } - } - if (relationship != null) { - List relationshipDataList = relationship.getRelationshipDataList(); - for(int i = 0; i < relationshipDataList.size(); i++) { - if (SERVICE_INSTANCE_ID - .equals(relationshipDataList.get(i).getRelationshipKey())) { - vserverInstanceId = relationshipDataList.get(i).getRelationshipValue(); - break; - } - } - } - } - return vserverInstanceId; + .orElse(getDefaultPolicyMsg(rootAlarm, packgeName)); } private PolicyMsg getEnrichedPolicyMsg(VmEntity vmEntity, VesAlarm rootAlarm, VesAlarm childAlarm, String packageName) { PolicyMsg policyMsg = new PolicyMsg(); - String alarmUniqueKey = ""; - if (rootAlarm.getAlarmIsCleared() == POLICY_MESSAGE_ABATED) { - policyMsg.setClosedLoopEventStatus(EVENT_STATUS.ABATED); - alarmUniqueKey = - rootAlarm.getSourceId() + ":" + rootAlarm.getEventName().replace("Cleared", ""); - } else { + policyMsg.setRequestID(getUniqueRequestId(rootAlarm)); + if (rootAlarm.getAlarmIsCleared() == PolicyMassgeConstant.POLICY_MESSAGE_ONSET) { + enrichVnfInfo(vmEntity, childAlarm, policyMsg); policyMsg.setClosedLoopEventStatus(EVENT_STATUS.ONSET); - enrichVnfInfo(childAlarm, policyMsg); - alarmUniqueKey = rootAlarm.getSourceId() + ":" + rootAlarm.getEventName(); - } - if (alarmUniqueRequestID.containsKey(alarmUniqueKey)) { - policyMsg.setRequestID(alarmUniqueRequestID.get(alarmUniqueKey)); + policyMsg.getAai().put("vserver.in-maint", vmEntity.getInMaint()); + policyMsg.getAai().put("vserver.is-closed-loop-disabled", + vmEntity.getClosedLoopDisable()); + policyMsg.getAai().put("vserver.prov-status", vmEntity.getProvStatus()); + policyMsg.getAai().put("vserver.resource-version", vmEntity.getResourceVersion()); } else { - String requestID = UUID.randomUUID().toString(); - policyMsg.setRequestID(requestID); - alarmUniqueRequestID.put(alarmUniqueKey, requestID); + policyMsg.setClosedLoopAlarmEnd(rootAlarm.getLastEpochMicrosec()); + policyMsg.setClosedLoopEventStatus(EVENT_STATUS.ABATED); } - policyMsg.setClosedLoopControlName(loopControlNames.get(packageName)); - policyMsg.setTarget(vmEntity.getVserverName()); - policyMsg.getAai().put("vserver.in-maint", String.valueOf(vmEntity.getInMaint())); - policyMsg.getAai().put("vserver.is-closed-loop-disabled", - String.valueOf(vmEntity.getClosedLoopDisable())); - policyMsg.getAai().put("vserver.prov-status", vmEntity.getProvStatus()); - policyMsg.getAai().put("vserver.resource-version", vmEntity.getResourceVersion()); + policyMsg.setClosedLoopControlName(closedLoopControlNameCache.get(packageName)); + policyMsg.setClosedLoopAlarmStart(rootAlarm.getStartEpochMicrosec()); policyMsg.getAai().put("vserver.vserver-id", vmEntity.getVserverId()); policyMsg.getAai().put("vserver.vserver-name", vmEntity.getVserverName()); policyMsg.getAai().put("vserver.vserver-name2", vmEntity.getVserverName2()); policyMsg.getAai().put("vserver.vserver-selflink", vmEntity.getVserverSelflink()); + policyMsg.setTarget("vserver.vserver-name"); return policyMsg; } - private PolicyMsg getDefaultPolicyMsg(String sourceName) { + private PolicyMsg getDefaultPolicyMsg(VesAlarm rootAlarm, String packageName) { PolicyMsg policyMsg = new PolicyMsg(); + policyMsg.setRequestID(getUniqueRequestId(rootAlarm)); + policyMsg.setClosedLoopControlName(closedLoopControlNameCache.get(packageName)); + policyMsg.setClosedLoopAlarmStart(rootAlarm.getStartEpochMicrosec()); policyMsg.setTarget("vserver.vserver-name"); policyMsg.setTargetType("VM"); - policyMsg.getAai().put("vserver.vserver-name", sourceName); + policyMsg.getAai().put("vserver.vserver-name", rootAlarm.getSourceName()); + if (rootAlarm.getAlarmIsCleared() == PolicyMassgeConstant.POLICY_MESSAGE_ABATED) { + policyMsg.setClosedLoopAlarmEnd(rootAlarm.getLastEpochMicrosec()); + policyMsg.setClosedLoopEventStatus(EVENT_STATUS.ABATED); + } return policyMsg; } - private void enrichVnfInfo(VesAlarm childAlarm, PolicyMsg policyMsg) { - VnfEntity vnfEntity = getVnfEntity(childAlarm.getSourceId(), childAlarm.getSourceName()); + private String getUniqueRequestId(VesAlarm rootAlarm) { + String alarmUniqueKey = ""; + if (rootAlarm.getAlarmIsCleared() == PolicyMassgeConstant.POLICY_MESSAGE_ABATED) { + alarmUniqueKey = + rootAlarm.getSourceId() + ":" + rootAlarm.getEventName().replace("Cleared", ""); + } else { + alarmUniqueKey = rootAlarm.getSourceId() + ":" + rootAlarm.getEventName(); + } + if (uniqueRequestIdCache.containsKey(alarmUniqueKey)) { + return uniqueRequestIdCache.get(alarmUniqueKey); + } else { + String requestID = UUID.randomUUID().toString(); + uniqueRequestIdCache.put(alarmUniqueKey, requestID); + return requestID; + } + } + + private void enrichVnfInfo(VmEntity vmEntity, VesAlarm childAlarm, PolicyMsg policyMsg) { + String vnfId = ""; + String vnfName = ""; + if (null != childAlarm) { + vnfId = childAlarm.getSourceId(); + vnfName = childAlarm.getSourceName(); + } else { + Relationship relationship = vmEntity.getRelationshipList() + .getRelationship(PolicyMassgeConstant.GENERIC_VNF); + if (null != relationship) { + vnfId = relationship.getRelationshipDataValue(PolicyMassgeConstant.GENERIC_VNF_VNF_ID); + vnfName = relationship.getRelatedToPropertyValue(PolicyMassgeConstant.GENERIC_VNF_VNF_NAME); + } + } + VnfEntity vnfEntity = getVnfEntity(vnfId, vnfName); String vserverInstatnceId = getVserverInstanceId(vnfEntity); - policyMsg.getAai().put("generic-vnf.vnf-id", childAlarm.getSourceId()); - policyMsg.getAai().put("generic-vnf.vnf-name", childAlarm.getSourceName()); + policyMsg.getAai().put("generic-vnf.vnf-id", vnfId); policyMsg.getAai().put("generic-vnf.service-instance-id", vserverInstatnceId); } + + private String getVserverInstanceId(VnfEntity vnfEntity) { + String vserverInstanceId = ""; + if (vnfEntity != null) { + Relationship relationship = vnfEntity.getRelationshipList() + .getRelationship(PolicyMassgeConstant.SERVICE_INSTANCE); + if (relationship == null) { + return vserverInstanceId; + } + vserverInstanceId = relationship + .getRelationshipDataValue(PolicyMassgeConstant.SERVICE_INSTANCE_ID); + } + return vserverInstanceId; + } + private VnfEntity getVnfEntity(String vnfId, String vnfName) { VnfEntity vnfEntity = null; try { vnfEntity = aaiQuery.getAaiVnfData(vnfId, vnfName); } catch (CorrelationException e) { - log.error("Failed to get vnf data", e.getMessage()); + log.error("Failed to get the VNF data.", e); } return vnfEntity; } @@ -157,8 +189,22 @@ public class DmaapService { try { vmEntity = aaiQuery.getAaiVmData(sourceId, sourceName); } catch (CorrelationException e) { - log.error("Failed to get vm data", e.getMessage()); + log.error("Failed to get the VM data.", e); } return vmEntity; } + + private void deleteRequestIdIfNecessary(PolicyMsg policyMsg){ + EVENT_STATUS status = policyMsg.getClosedLoopEventStatus(); + if(EVENT_STATUS.ABATED.equals(status)) { + String requestId = policyMsg.getRequestID(); + for(Entry kv: uniqueRequestIdCache.entrySet()) { + if(kv.getValue().equals(requestId)) { + uniqueRequestIdCache.remove(kv.getKey()); + break; + } + } + log.info("An alarm is cleared and the corresponding requestId is deleted successfully"); + } + } }