2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.crud.service;
23 import java.util.TimerTask;
25 import javax.naming.OperationNotSupportedException;
27 import org.onap.aai.cl.api.Logger;
28 import org.onap.aai.cl.eelf.LoggerFactory;
29 import org.onap.crud.event.GraphEvent;
30 import org.onap.crud.logging.CrudServiceMsgs;
32 import org.onap.aai.event.api.EventConsumer;
34 public class CrudAsyncResponseConsumer extends TimerTask {
36 private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncResponseConsumer
39 private static Logger auditLogger = LoggerFactory.getInstance()
40 .getAuditLogger(CrudAsyncResponseConsumer.class.getName());
42 private EventConsumer asyncResponseConsumer;
45 public CrudAsyncResponseConsumer(EventConsumer asyncResponseConsumer) {
46 this.asyncResponseConsumer = asyncResponseConsumer;
47 logger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
48 "CrudAsyncResponseConsumer initialized SUCCESSFULLY! with event consumer "
49 + asyncResponseConsumer.getClass().getName());
56 logger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO, "Listening for graph events");
58 if (asyncResponseConsumer == null) {
59 logger.error(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR,
60 "Unable to initialize CrudAsyncRequestProcessor");
63 Iterable<String> events = null;
65 events = asyncResponseConsumer.consume();
66 } catch (Exception e) {
67 logger.error(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR, e.getMessage());
71 if (events == null || !events.iterator().hasNext()) {
72 logger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO, "No events recieved");
76 for (String event : events) {
79 GraphEvent graphEvent = GraphEvent.fromJson(event);
80 auditLogger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
81 "Event received of type: " + graphEvent.getObjectType() + " with key: "
82 + graphEvent.getObjectKey() + " , transaction-id: "
83 + graphEvent.getTransactionId() + " , operation: "
84 + graphEvent.getOperation().toString());
85 logger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
86 "Event received of type: " + graphEvent.getObjectType() + " with key: "
87 + graphEvent.getObjectKey() + " , transaction-id: "
88 + graphEvent.getTransactionId() + " , operation: "
89 + graphEvent.getOperation().toString());
90 logger.debug(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
91 "Event received with payload:" + event);
93 if (CrudAsyncGraphEventCache.get(graphEvent.getTransactionId()) != null) {
94 CrudAsyncGraphEventCache.get(graphEvent.getTransactionId())
95 .populateGraphEvent(graphEvent);
97 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
98 "Request timed out. Not sending response for transaction-id: "
99 + graphEvent.getTransactionId());
102 } catch (Exception e) {
103 logger.error(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR, e.getMessage());
108 asyncResponseConsumer.commitOffsets();
110 catch(OperationNotSupportedException e) {
111 //Dmaap doesnt support commit with offset
112 logger.debug(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR, e.getMessage());
114 catch (Exception e) {
115 logger.error(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_ERROR, e.getMessage());