2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.ccsdk.sli.northbound.dmaapclient;
 
  24 import java.io.BufferedReader;
 
  26 import java.io.FileReader;
 
  27 import java.util.HashMap;
 
  28 import java.util.Iterator;
 
  31 import org.slf4j.Logger;
 
  32 import org.slf4j.LoggerFactory;
 
  34 import com.fasterxml.jackson.databind.JsonNode;
 
  35 import com.fasterxml.jackson.databind.ObjectMapper;
 
  36 import com.fasterxml.jackson.databind.node.ObjectNode;
 
  39 public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumer {
 
  41         private static final Logger LOG = LoggerFactory
 
  42                         .getLogger(SdncFlatJsonDmaapConsumer.class);
 
  44         private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
 
  45         private static final String SDNC_ENDPOINT = "SDNC.endpoint";
 
  50         public void processMsg(String msg) throws InvalidMessageException {
 
  52                 processMsg(msg, null);
 
  55         public void processMsg(String msg, String mapDirName) throws InvalidMessageException {
 
  58                         throw new InvalidMessageException("Null message");
 
  61                 ObjectMapper oMapper = new ObjectMapper();
 
  62                 JsonNode instarRootNode ;
 
  63                 ObjectNode sdncRootNode;
 
  65                 String instarMsgName = null;
 
  68                          instarRootNode = oMapper.readTree(msg);
 
  69                 } catch (Exception e) {
 
  70                         throw new InvalidMessageException("Cannot parse json object", e);
 
  73                 Iterator<Map.Entry<String, JsonNode>> instarFields = instarRootNode.fields();
 
  75                 while (instarFields.hasNext()) {
 
  76                         Map.Entry<String, JsonNode> entry = instarFields.next();
 
  78                         instarMsgName = entry.getKey();
 
  79                         instarRootNode = entry.getValue();
 
  83                 Map<String,String> fieldMap = loadMap(instarMsgName, mapDirName);
 
  85                 if (fieldMap == null) {
 
  86                         throw new InvalidMessageException("Unable to process message - cannot load field mappings");
 
  89                 if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
 
  90                         throw new InvalidMessageException("No SDNC endpoint known for message "+instarMsgName);
 
  93                 String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
 
  95                 sdncRootNode = oMapper.createObjectNode();
 
  96                 ObjectNode inputNode = oMapper.createObjectNode();
 
  99                 for (Map.Entry<String, String> entry: fieldMap.entrySet()) {
 
 101                         if (!SDNC_ENDPOINT.equals(entry.getKey())) {
 
 102                                 JsonNode curNode = instarRootNode.get(entry.getKey());
 
 103                                 if (curNode != null) {
 
 104                                         String fromValue = curNode.textValue();
 
 106                                         inputNode.put(entry.getValue(), fromValue);
 
 110                 sdncRootNode.put("input", inputNode);
 
 113                         String rpcMsgbody = oMapper.writeValueAsString(sdncRootNode);
 
 114                         String odlUrlBase = getProperty("sdnc.odl.url-base");
 
 115                         String odlUser = getProperty("sdnc.odl.user");
 
 116                         String odlPassword = getProperty("sdnc.odl.password");
 
 118                         if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
 
 119                                 SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + sdncEndpoint, odlUser, odlPassword);
 
 121                                 conn.send("POST", "application/json", rpcMsgbody);
 
 123                                 LOG.info("POST message body would be:\n"+rpcMsgbody);
 
 125                 } catch (Exception e) {
 
 126                         LOG.error("Unable to process message", e);
 
 131         private Map<String,String> loadMap(String msgType, String mapDirName) {
 
 132                 Map<String, String> results = new HashMap<>();
 
 135                 if (mapDirName == null) {
 
 136                         String rootdir = System.getenv(DMAAPLISTENERROOT);
 
 138                         if ((rootdir == null) || (rootdir.length() == 0)) {
 
 139                                 rootdir = "/opt/app/dmaap-listener";
 
 142                         mapDirName = rootdir + "/lib";
 
 146                 String mapFilename = mapDirName + "/" + msgType + ".map";
 
 148                 File mapFile = new File(mapFilename);
 
 150                 if (!mapFile.canRead()) {
 
 151                         LOG.error("Cannot read map file ("+mapFilename+")");
 
 155                 try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
 
 159                         while ((curLine = mapReader.readLine()) != null) {
 
 160                                 curLine = curLine.trim();
 
 162                                 if ((curLine.length() > 0) && (!curLine.startsWith("#"))) {
 
 164                                         if (curLine.contains("=>")) {
 
 165                                                 String[] entry = curLine.split("=>");
 
 166                                                 if (entry.length == 2) {
 
 167                                                         results.put(entry[0].trim(), entry[1].trim());
 
 173                 } catch (Exception e) {
 
 174                         LOG.error("Caught exception reading map "+mapFilename, e);