X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fevents%2Fcmsubscription%2FCmNotificationSubscriptionNcmpOutEventProducer.java;fp=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fevents%2Fcmsubscription%2FCmNotificationSubscriptionNcmpOutEventProducer.java;h=e7af69e500d1ab4db74c9543648c01d1572e8800;hb=f679ac737984cfbbf10873f1d178fb718b876de2;hp=76ee08e64cb67fdd880f6c39027912b88d2e5cc6;hpb=2830723b8c5d5bb40c171c88f055adbc1a808f68;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java index 76ee08e64..e7af69e50 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducer.java @@ -34,7 +34,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; import org.onap.cps.utils.JsonObjectMapper; import org.springframework.beans.factory.annotation.Value; @@ -55,8 +54,8 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { private final EventsPublisher eventsPublisher; private final JsonObjectMapper jsonObjectMapper; - private final Map> cmNotificationSubscriptionCache; private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper; + private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private static final Map> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>(); @@ -72,8 +71,9 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { * or published now */ public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType, - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent, - final boolean isScheduledEvent) { + final CmNotificationSubscriptionNcmpOutEvent + cmNotificationSubscriptionNcmpOutEvent, + final boolean isScheduledEvent) { if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) { final ScheduledFuture scheduledFuture = @@ -86,16 +86,15 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { cmNotificationSubscriptionNcmpOutEvent); log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId); } - } private ScheduledFuture scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, - final String eventType) { + final String eventType) { final CmNotificationSubscriptionNcmpOutEventPublishingTask cmNotificationSubscriptionNcmpOutEventPublishingTask = new CmNotificationSubscriptionNcmpOutEventPublishingTask(cmNotificationSubscriptionNcmpOutEventTopic, - subscriptionId, eventType, eventsPublisher, jsonObjectMapper, cmNotificationSubscriptionCache, - cmNotificationSubscriptionNcmpOutEventMapper); + subscriptionId, eventType, eventsPublisher, jsonObjectMapper, + cmNotificationSubscriptionNcmpOutEventMapper, dmiCmNotificationSubscriptionCacheHandler); return scheduledExecutorService.schedule(cmNotificationSubscriptionNcmpOutEventPublishingTask, cmNotificationSubscriptionDmiOutEventTimeoutInMs, TimeUnit.MILLISECONDS); } @@ -112,12 +111,15 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { private void publishCmNotificationSubscriptionNcmpOutEventNow(final String subscriptionId, final String eventType, - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) { + final CmNotificationSubscriptionNcmpOutEvent + cmNotificationSubscriptionNcmpOutEvent) { final CloudEvent cmNotificationSubscriptionNcmpOutEventAsCloudEvent = buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent); eventsPublisher.publishCloudEvent(cmNotificationSubscriptionNcmpOutEventTopic, subscriptionId, cmNotificationSubscriptionNcmpOutEventAsCloudEvent); + dmiCmNotificationSubscriptionCacheHandler + .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId); } protected static CloudEvent buildAndGetCmNotificationNcmpOutEventAsCloudEvent( @@ -125,9 +127,9 @@ public class CmNotificationSubscriptionNcmpOutEventProducer { final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) { return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType) - .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0")) - .withExtension("correlationid", subscriptionId) - .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build(); + .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0")) + .withExtension("correlationid", subscriptionId) + .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build(); } }