2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.crud.service;
23 import java.util.TimerTask;
25 import javax.naming.OperationNotSupportedException;
27 import org.onap.aai.cl.api.Logger;
28 import org.onap.aai.cl.eelf.LoggerFactory;
29 import org.onap.crud.event.GraphEvent;
30 import org.onap.crud.event.envelope.GraphEventEnvelope;
31 import org.onap.crud.logging.CrudServiceMsgs;
33 import org.onap.aai.event.api.EventConsumer;
35 public class CrudAsyncResponseConsumer extends TimerTask {
37 private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncResponseConsumer
40 private static Logger auditLogger = LoggerFactory.getInstance()
41 .getAuditLogger(CrudAsyncResponseConsumer.class.getName());
43 private EventConsumer asyncResponseConsumer;
46 public CrudAsyncResponseConsumer(EventConsumer asyncResponseConsumer) {
47 this.asyncResponseConsumer = asyncResponseConsumer;
48 logger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
49 "CrudAsyncResponseConsumer initialized SUCCESSFULLY! with event consumer "
50 + asyncResponseConsumer.getClass().getName());
57 logger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO, "Listening for graph events");
59 if (asyncResponseConsumer == null) {
60 logger.error(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR,
61 "Unable to initialize CrudAsyncRequestProcessor");
64 Iterable<String> events = null;
66 events = asyncResponseConsumer.consume();
67 } catch (Exception e) {
68 logger.error(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR, e.getMessage());
72 if (events == null || !events.iterator().hasNext()) {
73 logger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO, "No events recieved");
77 for (String event : events) {
80 GraphEventEnvelope graphEventEnvelope = GraphEventEnvelope.fromJson(event);
81 GraphEvent graphEvent = graphEventEnvelope.getBody();
82 auditLogger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
83 "Event received of type: " + graphEvent.getObjectType() + " with key: "
84 + graphEvent.getObjectKey() + " , transaction-id: "
85 + graphEvent.getTransactionId() + " , operation: "
86 + graphEvent.getOperation().toString());
87 logger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
88 "Event received of type: " + graphEvent.getObjectType() + " with key: "
89 + graphEvent.getObjectKey() + " , transaction-id: "
90 + graphEvent.getTransactionId() + " , operation: "
91 + graphEvent.getOperation().toString());
92 logger.debug(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
93 "Event received with payload:" + event);
95 if (CrudAsyncGraphEventCache.get(graphEvent.getTransactionId()) != null) {
96 CrudAsyncGraphEventCache.get(graphEvent.getTransactionId())
97 .populateGraphEventEnvelope(graphEventEnvelope);
99 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
100 "Request timed out. Not sending response for transaction-id: "
101 + graphEvent.getTransactionId());
104 } catch (Exception e) {
105 logger.error(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR, e.getMessage());
110 asyncResponseConsumer.commitOffsets();
112 catch(OperationNotSupportedException e) {
113 //Dmaap doesnt support commit with offset
114 logger.debug(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR, e.getMessage());
116 catch (Exception e) {
117 logger.error(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR, e.getMessage());