/*
- * Copyright 2017 ZTE Corporation.
+ * Copyright 2017-2020 ZTE Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
package org.onap.holmes.common.dmaap;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.inject.Inject;
-import lombok.extern.slf4j.Slf4j;
import org.jvnet.hk2.annotations.Service;
import org.onap.holmes.common.aai.AaiQuery;
import org.onap.holmes.common.aai.entity.RelationshipList.Relationship;
import org.onap.holmes.common.dcae.DcaeConfigurationsCache;
import org.onap.holmes.common.dmaap.entity.PolicyMsg;
import org.onap.holmes.common.dmaap.entity.PolicyMsg.EVENT_STATUS;
+import org.onap.holmes.common.dmaap.store.ClosedLoopControlNameCache;
+import org.onap.holmes.common.dmaap.store.UniqueRequestIdCache;
import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.GsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.UUID;
-@Slf4j
@Service
public class DmaapService {
- @Inject
+ private static final Logger log = LoggerFactory.getLogger(DmaapService.class);
+
private AaiQuery aaiQuery;
- public static final ConcurrentHashMap<String, String> loopControlNames = new ConcurrentHashMap<>();
- public static final ConcurrentHashMap<String, String> alarmUniqueRequestID = new ConcurrentHashMap<>();
+ private ClosedLoopControlNameCache closedLoopControlNameCache;
+ private UniqueRequestIdCache uniqueRequestIdCache;
+
+ @Inject
+ public void setAaiQuery(AaiQuery aaiQuery) {
+ this.aaiQuery = aaiQuery;
+ }
+
+ @Inject
+ public void setClosedLoopControlNameCache(ClosedLoopControlNameCache closedLoopControlNameCache) {
+ this.closedLoopControlNameCache = closedLoopControlNameCache;
+ }
+
+ @Inject
+ public void setUniqueRequestIdCache(UniqueRequestIdCache uniqueRequestIdCache) {
+ this.uniqueRequestIdCache = uniqueRequestIdCache;
+ }
public void publishPolicyMsg(PolicyMsg policyMsg, String dmaapConfigKey) {
try {
publisher.setUrl(DcaeConfigurationsCache.getPubSecInfo(dmaapConfigKey).getDmaapInfo()
.getTopicUrl());
publisher.publish(policyMsg);
- deleteRequestId(policyMsg);
+ deleteRequestIdIfNecessary(policyMsg);
log.info("send policyMsg: " + GsonUtil.beanToJson(policyMsg));
} catch (CorrelationException e) {
log.error("Failed to publish the control loop event to DMaaP", e);
if (rootAlarm.getAlarmIsCleared() == PolicyMassgeConstant.POLICY_MESSAGE_ONSET) {
enrichVnfInfo(vmEntity, childAlarm, policyMsg);
policyMsg.setClosedLoopEventStatus(EVENT_STATUS.ONSET);
- try {
- policyMsg.getAai().put("vserver.in-maint", Boolean.valueOf(vmEntity.getInMaint()).booleanValue());
- } catch (Exception e) {
- log.error("Failed to parse the field \"in-maint\". A boolean string (\"true\"/\"false\")"
- + " is expected but the actual value is " + vmEntity.getInMaint() + ".", e);
- }
- try {
- policyMsg.getAai().put("vserver.is-closed-loop-disabled",
- Boolean.valueOf(vmEntity.getClosedLoopDisable()).booleanValue());
- } catch (Exception e) {
- log.error("Failed to parse the field \"is-closed-loop-disabled\". A boolean string (\"true\"/\"false\")"
- + " is expected but the actual value is " + vmEntity.getClosedLoopDisable() + ".", e);
- }
+ policyMsg.getAai().put("vserver.in-maint", vmEntity.getInMaint());
+ policyMsg.getAai().put("vserver.is-closed-loop-disabled",
+ vmEntity.getClosedLoopDisable());
policyMsg.getAai().put("vserver.prov-status", vmEntity.getProvStatus());
policyMsg.getAai().put("vserver.resource-version", vmEntity.getResourceVersion());
} else {
policyMsg.setClosedLoopAlarmEnd(rootAlarm.getLastEpochMicrosec());
policyMsg.setClosedLoopEventStatus(EVENT_STATUS.ABATED);
}
- policyMsg.setClosedLoopControlName(loopControlNames.get(packageName));
+ policyMsg.setClosedLoopControlName(closedLoopControlNameCache.get(packageName));
policyMsg.setClosedLoopAlarmStart(rootAlarm.getStartEpochMicrosec());
policyMsg.getAai().put("vserver.vserver-id", vmEntity.getVserverId());
policyMsg.getAai().put("vserver.vserver-name", vmEntity.getVserverName());
private PolicyMsg getDefaultPolicyMsg(VesAlarm rootAlarm, String packageName) {
PolicyMsg policyMsg = new PolicyMsg();
policyMsg.setRequestID(getUniqueRequestId(rootAlarm));
- policyMsg.setClosedLoopControlName(loopControlNames.get(packageName));
+ policyMsg.setClosedLoopControlName(closedLoopControlNameCache.get(packageName));
policyMsg.setClosedLoopAlarmStart(rootAlarm.getStartEpochMicrosec());
policyMsg.setTarget("vserver.vserver-name");
policyMsg.setTargetType("VM");
} else {
alarmUniqueKey = rootAlarm.getSourceId() + ":" + rootAlarm.getEventName();
}
- if (alarmUniqueRequestID.containsKey(alarmUniqueKey)) {
- return alarmUniqueRequestID.get(alarmUniqueKey);
+ if (uniqueRequestIdCache.containsKey(alarmUniqueKey)) {
+ return uniqueRequestIdCache.get(alarmUniqueKey);
} else {
String requestID = UUID.randomUUID().toString();
- alarmUniqueRequestID.put(alarmUniqueKey, requestID);
+ uniqueRequestIdCache.put(alarmUniqueKey, requestID);
return requestID;
}
}
return vmEntity;
}
- private void deleteRequestId(PolicyMsg policyMsg){
+ private void deleteRequestIdIfNecessary(PolicyMsg policyMsg){
EVENT_STATUS status = policyMsg.getClosedLoopEventStatus();
if(EVENT_STATUS.ABATED.equals(status)) {
String requestId = policyMsg.getRequestID();
- for(Entry<String, String> kv: alarmUniqueRequestID.entrySet()) {
+ for(Entry<String, String> kv: uniqueRequestIdCache.entrySet()) {
if(kv.getValue().equals(requestId)) {
- alarmUniqueRequestID.remove(kv.getKey());
+ uniqueRequestIdCache.remove(kv.getKey());
break;
}
}
- log.info("Clear alarm, requestId deleted successful");
+ log.info("An alarm is cleared and the corresponding requestId is deleted successfully");
}
}
}