* ============LICENSE_START=======================================================
* org.onap.dcaegen2.collectors.ves
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
* Copyright (C) 2018 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
class DMaaPEventPublisher implements EventPublisher {
private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
private static final String VES_UNIQUE_ID = "VESuniqueId";
+ private static final String EVENT = "event";
+ private static final String COMMON_EVENT_HEADER = "commonEventHeader";
+ private static final String PARTITION_KEY = "sourceName";
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
private final DMaaPPublishersCache publishersCache;
private final Logger outputLogger;
private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
throws IOException {
- int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
+
+ String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString();
+ int pendingMsgs = publisher.send(pk, event.toString());
if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
log.info("Pending messages count: " + pendingMsgs);
}
* org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2020 AT&T. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@Test
public void shouldSendEventToTopic() throws Exception {
// given
- JSONObject event = new JSONObject("{}");
+ JSONObject event = new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
+
// when
eventPublisher.sendEvent(event, STREAM_ID);
// then
- verify(cambriaPublisher).send("MyPartitionKey", event.toString());
+ verify(cambriaPublisher).send("dns01cmd004", event.toString());
}
+
@Test
public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
// given
JSONObject event = new JSONObject(
- "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}");
+ "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
// when
eventPublisher.sendEvent(event, STREAM_ID);
// then
- verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString());
+ verify(cambriaPublisher).send("dns01cmd004", new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}").toString());
}
@Test
// then
verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
}
-}
\ No newline at end of file
+}