import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumer {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(SdncFlatJsonDmaapConsumer.class);
-
- private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
- private static final String SDNC_ENDPOINT = "SDNC.endpoint";
-
-
-
- @Override
- public void processMsg(String msg) throws InvalidMessageException {
-
- processMsg(msg, null);
- }
-
- public void processMsg(String msg, String mapDirName) throws InvalidMessageException {
-
- if (msg == null) {
- throw new InvalidMessageException("Null message");
- }
-
- ObjectMapper oMapper = new ObjectMapper();
- JsonNode instarRootNode = null;
- ObjectNode sdncRootNode = null;
+public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumerImpl {
- String instarMsgName = null;
+ private static final Logger LOG = LoggerFactory.getLogger(SdncFlatJsonDmaapConsumer.class);
- try {
- instarRootNode = oMapper.readTree(msg);
- } catch (Exception e) {
- throw new InvalidMessageException("Cannot parse json object", e);
- }
+ private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
+ private static final String SDNC_ENDPOINT = "SDNC.endpoint";
- Iterator<Map.Entry<String, JsonNode>> instarFields = instarRootNode.fields();
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
- while (instarFields.hasNext()) {
- Map.Entry<String, JsonNode> entry = instarFields.next();
+ processMsg(msg, null);
+ }
- instarMsgName = entry.getKey();
- instarRootNode = entry.getValue();
- break;
- }
+ public void processMsg(String msg, String mapDirName) throws InvalidMessageException {
- Map<String,String> fieldMap = loadMap(instarMsgName, mapDirName);
+ if (msg == null) {
+ throw new InvalidMessageException("Null message");
+ }
- if (fieldMap == null) {
- throw new InvalidMessageException("Unable to process message - cannot load field mappings");
- }
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode instarRootNode;
+ ObjectNode sdncRootNode;
- if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
- throw new InvalidMessageException("No SDNC endpoint known for message "+instarMsgName);
- }
+ String instarMsgName = null;
- String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
+ try {
+ instarRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse json object", e);
+ }
- sdncRootNode = oMapper.createObjectNode();
- ObjectNode inputNode = oMapper.createObjectNode();
+ Iterator<Map.Entry<String, JsonNode>> instarFields = instarRootNode.fields();
+ while (instarFields.hasNext()) {
+ Map.Entry<String, JsonNode> entry = instarFields.next();
- for (Map.Entry<String, String> entry: fieldMap.entrySet()) {
+ instarMsgName = entry.getKey();
+ instarRootNode = entry.getValue();
+ break;
+ }
- if (!SDNC_ENDPOINT.equals(entry.getKey())) {
- JsonNode curNode = instarRootNode.get(entry.getKey());
- if (curNode != null) {
- String fromValue = curNode.textValue();
+ Map<String, String> fieldMap = loadMap(instarMsgName, mapDirName);
- inputNode.put(entry.getValue(), fromValue);
- }
- }
- }
- sdncRootNode.put("input", inputNode);
+ if (fieldMap == null) {
+ throw new InvalidMessageException("Unable to process message - cannot load field mappings");
+ }
- try {
- String rpcMsgbody = oMapper.writeValueAsString(sdncRootNode);
- String odlUrlBase = getProperty("sdnc.odl.url-base");
- String odlUser = getProperty("sdnc.odl.user");
- String odlPassword = getProperty("sdnc.odl.password");
+ if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
+ throw new InvalidMessageException("No SDNC endpoint known for message " + instarMsgName);
+ }
- if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
- SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + sdncEndpoint, odlUser, odlPassword);
+ String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
- conn.send("POST", "application/json", rpcMsgbody);
- } else {
- LOG.info("POST message body would be:\n"+rpcMsgbody);
- }
- } catch (Exception e) {
- LOG.error("Unable to process message", e);
- }
+ sdncRootNode = oMapper.createObjectNode();
+ ObjectNode inputNode = oMapper.createObjectNode();
- }
+ for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
- private Map<String,String> loadMap(String msgType, String mapDirName) {
- Map<String, String> results = new HashMap<>();
+ if (!SDNC_ENDPOINT.equals(entry.getKey())) {
+ JsonNode curNode = instarRootNode.get(entry.getKey());
+ if (curNode != null) {
+ String fromValue = curNode.textValue();
+ inputNode.put(entry.getValue(), fromValue);
+ }
+ }
+ }
+ sdncRootNode.put("input", inputNode);
- if (mapDirName == null) {
- String rootdir = System.getenv(DMAAPLISTENERROOT);
+ try {
+ String rpcMsgbody = oMapper.writeValueAsString(sdncRootNode);
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
- if ((rootdir == null) || (rootdir.length() == 0)) {
- rootdir = "/opt/app/dmaap-listener";
- }
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + sdncEndpoint, odlUser, odlPassword);
- mapDirName = rootdir + "/lib";
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.info("POST message body would be:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
- }
+ private Map<String, String> loadMap(String msgType, String mapDirName) {
+ Map<String, String> results = new HashMap<>();
- String mapFilename = mapDirName + "/" + msgType + ".map";
+ String dirName = mapDirName;
- File mapFile = new File(mapFilename);
+ if (mapDirName == null) {
+ String rootdir = System.getenv(DMAAPLISTENERROOT);
- if (!mapFile.canRead()) {
- LOG.error("Cannot read map file ("+mapFilename+")");
- return(null);
- }
+ if ((rootdir == null) || (rootdir.length() == 0)) {
+ rootdir = "/opt/app/dmaap-listener";
+ }
- try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
+ dirName = rootdir + "/lib";
+ }
- String curLine;
+ String mapFilename = dirName + "/" + msgType + ".map";
- while ((curLine = mapReader.readLine()) != null) {
- curLine = curLine.trim();
+ File mapFile = new File(mapFilename);
- if ((curLine.length() > 0) && (!curLine.startsWith("#"))) {
+ if (!mapFile.canRead()) {
+ LOG.error(String.format("Cannot read map file (%s)", mapFilename));
+ return null;
+ }
- if (curLine.contains("=>")) {
- String[] entry = curLine.split("=>");
- if (entry.length == 2) {
- results.put(entry[0].trim(), entry[1].trim());
- }
- }
- }
- }
- mapReader.close();
- } catch (Exception e) {
- LOG.error("Caught exception reading map "+mapFilename, e);
- return(null);
- }
+ try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
- return(results);
- }
+ String curLine;
+ while ((curLine = mapReader.readLine()) != null) {
+ curLine = curLine.trim();
+ if ((curLine.length() > 0) && (!curLine.startsWith("#")) && curLine.contains("=>")) {
+ String[] entry = curLine.split("=>");
+ if (entry.length == 2) {
+ results.put(entry[0].trim(), entry[1].trim());
+ }
+ }
+ }
+ mapReader.close();
+ } catch (Exception e) {
+ LOG.error("Caught exception reading map " + mapFilename, e);
+ return null;
+ }
+ return results;
+ }
}