2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.ccsdk.sli.northbound.dmaapclient;
 
  24 import com.fasterxml.jackson.databind.JsonNode;
 
  25 import com.fasterxml.jackson.databind.ObjectMapper;
 
  26 import com.fasterxml.jackson.databind.node.ObjectNode;
 
  27 import java.io.BufferedReader;
 
  29 import java.io.FileReader;
 
  30 import java.util.HashMap;
 
  31 import java.util.Iterator;
 
  33 import org.slf4j.Logger;
 
  34 import org.slf4j.LoggerFactory;
 
  37 public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumer {
 
  39     private static final Logger LOG = LoggerFactory.getLogger(SdncFlatJsonDmaapConsumer.class);
 
  41     private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
 
  42     private static final String SDNC_ENDPOINT = "SDNC.endpoint";
 
  45     public void processMsg(String msg) throws InvalidMessageException {
 
  47         processMsg(msg, null);
 
  50     public void processMsg(String msg, String mapDirName) throws InvalidMessageException {
 
  53             throw new InvalidMessageException("Null message");
 
  56         ObjectMapper oMapper = new ObjectMapper();
 
  57         JsonNode instarRootNode;
 
  58         ObjectNode sdncRootNode;
 
  60         String instarMsgName = null;
 
  63             instarRootNode = oMapper.readTree(msg);
 
  64         } catch (Exception e) {
 
  65             throw new InvalidMessageException("Cannot parse json object", e);
 
  68         Iterator<Map.Entry<String, JsonNode>> instarFields = instarRootNode.fields();
 
  70         while (instarFields.hasNext()) {
 
  71             Map.Entry<String, JsonNode> entry = instarFields.next();
 
  73             instarMsgName = entry.getKey();
 
  74             instarRootNode = entry.getValue();
 
  78         Map<String, String> fieldMap = loadMap(instarMsgName, mapDirName);
 
  80         if (fieldMap == null) {
 
  81             throw new InvalidMessageException("Unable to process message - cannot load field mappings");
 
  84         if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
 
  85             throw new InvalidMessageException("No SDNC endpoint known for message " + instarMsgName);
 
  88         String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
 
  90         sdncRootNode = oMapper.createObjectNode();
 
  91         ObjectNode inputNode = oMapper.createObjectNode();
 
  93         for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
 
  95             if (!SDNC_ENDPOINT.equals(entry.getKey())) {
 
  96                 JsonNode curNode = instarRootNode.get(entry.getKey());
 
  97                 if (curNode != null) {
 
  98                     String fromValue = curNode.textValue();
 
 100                     inputNode.put(entry.getValue(), fromValue);
 
 104         sdncRootNode.put("input", inputNode);
 
 107             String rpcMsgbody = oMapper.writeValueAsString(sdncRootNode);
 
 108             String odlUrlBase = getProperty("sdnc.odl.url-base");
 
 109             String odlUser = getProperty("sdnc.odl.user");
 
 110             String odlPassword = getProperty("sdnc.odl.password");
 
 112             if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
 
 113                 SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + sdncEndpoint, odlUser, odlPassword);
 
 115                 conn.send("POST", "application/json", rpcMsgbody);
 
 117                 LOG.info("POST message body would be:\n" + rpcMsgbody);
 
 119         } catch (Exception e) {
 
 120             LOG.error("Unable to process message", e);
 
 124     private Map<String, String> loadMap(String msgType, String mapDirName) {
 
 125         Map<String, String> results = new HashMap<>();
 
 127         String dirName = mapDirName;
 
 129         if (mapDirName == null) {
 
 130             String rootdir = System.getenv(DMAAPLISTENERROOT);
 
 132             if ((rootdir == null) || (rootdir.length() == 0)) {
 
 133                 rootdir = "/opt/app/dmaap-listener";
 
 136             dirName = rootdir + "/lib";
 
 139         String mapFilename = dirName + "/" + msgType + ".map";
 
 141         File mapFile = new File(mapFilename);
 
 143         if (!mapFile.canRead()) {
 
 144             LOG.error(String.format("Cannot read map file (%s)", mapFilename));
 
 148         try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
 
 152             while ((curLine = mapReader.readLine()) != null) {
 
 153                 curLine = curLine.trim();
 
 155                 if ((curLine.length() > 0) && (!curLine.startsWith("#")) && curLine.contains("=>")) {
 
 156                     String[] entry = curLine.split("=>");
 
 157                     if (entry.length == 2) {
 
 158                         results.put(entry[0].trim(), entry[1].trim());
 
 163         } catch (Exception e) {
 
 164             LOG.error("Caught exception reading map " + mapFilename, e);