import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventPublisher;
import org.onap.champ.event.GraphEvent;
import org.onap.champ.event.GraphEvent.GraphEventResult;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
import org.onap.champ.service.ChampThreadFactory;
import org.onap.champ.service.logging.ChampMsgs;
-import org.onap.aai.event.api.EventPublisher;
-
public class ChampAsyncResponsePublisher {
private EventPublisher asyncResponsePublisher;
/**
* Internal queue where outgoing events will be buffered.
**/
- private BlockingQueue<GraphEvent> responsePublisherEventQueue;
+ private BlockingQueue<GraphEventEnvelope> responsePublisherEventQueue;
/**
* Pool of worker threads that do the work of publishing the events to the
this.responsePublisherPoolSize = responsePublisherPoolSize;
- responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
+ responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize);
responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
responsePublisherPoolSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE;
- responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
+ responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize);
responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
+ asyncResponsePublisher.getClass().getName());
}
- public void publishResponseEvent(GraphEvent event) {
- responsePublisherEventQueue.offer(event);
+ public void publishResponseEvent(GraphEventEnvelope eventEnvelope) {
+ responsePublisherEventQueue.offer(eventEnvelope);
}
public void run() {
while (true) {
-
+ GraphEventEnvelope eventEnvelope = null;
GraphEvent event = null;
try {
-
// Get the next event to be published from the queue.
- event = responsePublisherEventQueue.take();
-
+ eventEnvelope = responsePublisherEventQueue.take();
+ event = eventEnvelope.getBody();
} catch (InterruptedException e) {
-
// Restore the interrupted status.
Thread.currentThread().interrupt();
}
// Publish the response
-
try {
event.setTimestamp(System.currentTimeMillis());
- asyncResponsePublisher.sendSync(event.toJson());
+ asyncResponsePublisher.sendSync(eventEnvelope.toJson());
if (event.getResult().equals(GraphEventResult.SUCCESS)) {
logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
"Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()