Fixing mapping file and formatting issues 97/85697/2
authorPooja03 <pm00501616@techmahindra.com>
Thu, 18 Apr 2019 14:27:48 +0000 (19:57 +0530)
committerPooja Malik <PM00501616@techmahindra.com>
Fri, 19 Apr 2019 13:22:23 +0000 (13:22 +0000)
Fixing mapping file issue by adding &amp; and formatting issues mention
in comments of https://gerrit.onap.org/r/#/c/85334/

Change-Id: If474fda9205fc66dcc0f5caa77d7f35ba14328a0
Issue-ID: DCAEGEN2-1445
Signed-off-by: Pooja03 <pm00501616@techmahindra.com>
UniversalVesAdapter/dpo/blueprints/k8s-vesmapper.yaml-template.yaml
UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java
UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java
UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java

index 6821fa2..ad334c4 100644 (file)
 description: "This blueprint deploys the UniversalVESAdapter(UVA) as a Docker container\n"\r
 imports: \r
   - "http://www.getcloudify.org/spec/cloudify/3.4/types.yaml"\r
-  - "https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R3/k8splugin/1.4.4/k8splugin_types.yaml"\r
+  - "https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.12/k8splugin_types.yaml"\r
 inputs: \r
   rcc_notification_url: \r
-    default: "http://message-router.onap.svc.cluster.local:3904/events/unathenticated.DCAE_RCC_OUTPUT"\r
+    default: "http://message-router.onap.svc.cluster.local:3904/events/unauthenticated.DCAE_RCC_OUTPUT"\r
     type: string\r
   snmp_notification_url: \r
     default: "http://message-router.onap.svc.cluster.local:3904/events/unauthenticated.ONAP-COLLECTOR-SNMPTRAP"\r
     type: string\r
   tag_version: \r
-    default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.mapper.vesadapter.universalvesadaptor:latest"\r
+    default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.mapper.vesadapter.universalvesadaptor:1.0.0-SNAPSHOT"\r
     type: string\r
   universal_mapper_name: \r
     default: dcaegen2-svc-mapper\r
@@ -64,7 +64,7 @@ node_templates:
               identifier: notification-id\r
               mapping-files: \r
                 - \r
-                  defaultMappingFile-rcc-notification: "<?xml version='1.0' encoding='UTF-8'?><smooks-resource-list xmlns='http://www.milyn.org/xsd/smooks-1.1.xsd' xmlns:jb='http://www.milyn.org/xsd/smooks/javabean-1.4.xsd' xmlns:json='http://www.milyn.org/xsd/smooks/json-1.1.xsd'><json:reader rootName='vesevent' keyWhitspaceReplacement='-'><json:keyMap><json:key from='date&time' to='date-and-time' /></json:keyMap></json:reader><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.VesEvent' beanId='vesEvent' createOnElement='vesevent'><jb:wiring property='event' beanIdRef='event' /></jb:bean><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.Event' beanId='event' createOnElement='vesevent'><jb:wiring property='commonEventHeader' beanIdRef='commonEventHeader' /><jb:wiring property='pnfRegistrationFields' beanIdRef='pnfRegistrationFields' /></jb:bean><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader' beanId='commonEventHeader' createOnElement='vesevent'><jb:expression property='version'>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Version._4_0_1</jb:expression><jb:expression property='eventType'>'pnfRegistration'</jb:expression><jb:expression property='vesEventListenerVersion'>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.VesEventListenerVersion._7_0_1</jb:expression><jb:expression property='eventId' execOnElement='vesevent'>'registration_'+commonEventHeader.ts1</jb:expression><jb:expression property='reportingEntityName'>'VESMapper'</jb:expression><jb:expression property='domain'>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Domain.PNF_REGISTRATION</jb:expression><jb:expression property='eventName' execOnElement='vesevent'>commonEventHeader.domain</jb:expression><jb:value property='sequence' data='0' default='0' decoder='Long' /><jb:expression property='lastEpochMicrosec' execOnElement='vesevent'>commonEventHeader.ts1</jb:expression><jb:expression property='startEpochMicrosec' execOnElement='vesevent'>commonEventHeader.ts1</jb:expression><jb:expression property='priority'>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Priority.NORMAL</jb:expression><jb:expression property='sourceName' execOnElement='vesevent'>pnfRegistrationFields.vendorName+'-'+pnfRegistrationFields.serialNumber</jb:expression></jb:bean><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields' beanId='pnfRegistrationFields' createOnElement='vesevent'><jb:expression property='pnfRegistrationFieldsVersion'>org.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields.PnfRegistrationFieldsVersion._2_0</jb:expression><jb:value property='serialNumber' data='pnfRegistration/serialNumber' /><jb:value property='lastServiceDate' data='pnfRegistration/lastServiceDate' /><jb:value property='manufactureDate' data='pnfRegistration/manufactureDate' /><jb:value property='modelNumber' data='pnfRegistration/modelNumber' /><jb:value property='oamV4IpAddress' data='pnfRegistration/oamV4IpAddress' /><jb:value property='oamV6IpAddress' data='pnfRegistration/oamV6IpAddress' /><jb:value property='softwareVersion' data='pnfRegistration/softwareVersion' /><jb:value property='unitFamily' data='pnfRegistration/unitFamily' /><jb:value property='unitType' data='pnfRegistration/unitType' /><jb:value property='vendorName' data='pnfRegistration/vendorName' /><jb:wiring property='additionalFields' beanIdRef='alarmAdditionalInformation' /></jb:bean><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.AlarmAdditionalInformation' beanId='alarmAdditionalInformation' createOnElement='vesevent'><jb:wiring property='additionalProperties' beanIdRef='additionalFields2' /></jb:bean><jb:bean beanId='additionalFields2' class='java.util.HashMap' createOnElement='vesevent/pnfRegistration/additionalFields'><jb:value data='pnfRegistration/additionalFields/*'/></jb:bean></smooks-resource-list>"\r
+                  defaultMappingFile-rcc-notification: "<?xml version='1.0' encoding='UTF-8'?><smooks-resource-list xmlns='http://www.milyn.org/xsd/smooks-1.1.xsd' xmlns:jb='http://www.milyn.org/xsd/smooks/javabean-1.4.xsd' xmlns:json='http://www.milyn.org/xsd/smooks/json-1.1.xsd'><json:reader rootName='vesevent' keyWhitspaceReplacement='-'><json:keyMap><json:key from='date&amp;time' to='date-and-time' /></json:keyMap></json:reader><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.VesEvent' beanId='vesEvent' createOnElement='vesevent'><jb:wiring property='event' beanIdRef='event' /></jb:bean><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.Event' beanId='event' createOnElement='vesevent'><jb:wiring property='commonEventHeader' beanIdRef='commonEventHeader' /><jb:wiring property='pnfRegistrationFields' beanIdRef='pnfRegistrationFields' /></jb:bean><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader' beanId='commonEventHeader' createOnElement='vesevent'><jb:expression property='version'>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Version._4_0_1</jb:expression><jb:expression property='eventType'>'pnfRegistration'</jb:expression><jb:expression property='vesEventListenerVersion'>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.VesEventListenerVersion._7_0_1</jb:expression><jb:expression property='eventId' execOnElement='vesevent'>'registration_'+commonEventHeader.ts1</jb:expression><jb:expression property='reportingEntityName'>'VESMapper'</jb:expression><jb:expression property='domain'>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Domain.PNF_REGISTRATION</jb:expression><jb:expression property='eventName' execOnElement='vesevent'>commonEventHeader.domain</jb:expression><jb:value property='sequence' data='0' default='0' decoder='Long' /><jb:expression property='lastEpochMicrosec' execOnElement='vesevent'>commonEventHeader.ts1</jb:expression><jb:expression property='startEpochMicrosec' execOnElement='vesevent'>commonEventHeader.ts1</jb:expression><jb:expression property='priority'>org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Priority.NORMAL</jb:expression><jb:expression property='sourceName' execOnElement='vesevent'>pnfRegistrationFields.vendorName+'-'+pnfRegistrationFields.serialNumber</jb:expression></jb:bean><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields' beanId='pnfRegistrationFields' createOnElement='vesevent'><jb:expression property='pnfRegistrationFieldsVersion'>org.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields.PnfRegistrationFieldsVersion._2_0</jb:expression><jb:value property='serialNumber' data='pnfRegistration/serialNumber' /><jb:value property='lastServiceDate' data='pnfRegistration/lastServiceDate' /><jb:value property='manufactureDate' data='pnfRegistration/manufactureDate' /><jb:value property='modelNumber' data='pnfRegistration/modelNumber' /><jb:value property='oamV4IpAddress' data='pnfRegistration/oamV4IpAddress' /><jb:value property='oamV6IpAddress' data='pnfRegistration/oamV6IpAddress' /><jb:value property='softwareVersion' data='pnfRegistration/softwareVersion' /><jb:value property='unitFamily' data='pnfRegistration/unitFamily' /><jb:value property='unitType' data='pnfRegistration/unitType' /><jb:value property='vendorName' data='pnfRegistration/vendorName' /><jb:wiring property='additionalFields' beanIdRef='alarmAdditionalInformation' /></jb:bean><jb:bean class='org.onap.dcaegen2.ves.domain.ves70.AlarmAdditionalInformation' beanId='alarmAdditionalInformation' createOnElement='vesevent'><jb:wiring property='additionalProperties' beanIdRef='additionalFields2' /></jb:bean><jb:bean beanId='additionalFields2' class='java.util.HashMap' createOnElement='vesevent/pnfRegistration/additionalFields'><jb:value data='pnfRegistration/additionalFields/*'/></jb:bean></smooks-resource-list>"\r
               stream_publisher: ves-pnfRegistration\r
               stream_subscriber: rcc-notification\r
         streams_publishes: \r
index f2adc9b..2ac4fe9 100644 (file)
@@ -45,218 +45,208 @@ import org.springframework.stereotype.Component;
 // AdapterInitializer\r
 @Component\r
 public class VESAdapterInitializer implements CommandLineRunner, Ordered {\r
-        private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");\r
-        private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");\r
+    private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");\r
+    private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");\r
+    \r
+    @Value("${defaultConfigFilelocation}")\r
+    String defaultConfigFilelocation;\r
+    @Value("${server.port}")\r
+    String serverPort;\r
+    \r
+    private static Map<String, String> mappingFiles = new HashMap<String, String>();\r
+    private static Map<String, String> env;\r
+    \r
+    @Autowired\r
+    private ApplicationContext applicationContext;\r
+    \r
+    @Override\r
+    public void run(String... args) throws Exception {\r
+        debugLogger.info("The Default Config file Location:" + defaultConfigFilelocation.trim());\r
         \r
-        @Value("${defaultConfigFilelocation}")\r
-        String defaultConfigFilelocation;\r
-        @Value("${server.port}")\r
-        String serverPort;\r
-        \r
-        private static Map<String, String> mappingFiles = new HashMap<String, String>();\r
-        private static Map<String, String> env;\r
-        \r
-        @Autowired\r
-        private ApplicationContext applicationContext;\r
+        if (ClassLoader.getSystemResource(defaultConfigFilelocation.trim()) == null) {\r
+            errorLogger.error(\r
+                    "Default Config file " + defaultConfigFilelocation.trim() + " is missing");\r
+            System.exit(SpringApplication.exit(applicationContext, () -> {\r
+                errorLogger.error("Application stoped due to missing default Config file");\r
+                return -1;\r
+            }));\r
+        }\r
+        env = System.getenv();\r
+        for (Map.Entry<String, String> entry : env.entrySet()) {\r
+            debugLogger.debug(entry.getKey() + ":" + entry.getValue());\r
+        }\r
         \r
-        @Override\r
-        public void run(String... args) throws Exception {\r
-                debugLogger.info("The Default Config file Location:"\r
-                                + defaultConfigFilelocation.trim());\r
-                // final Path configFilePath =\r
-                // Paths.get(defaultConfigFilelocation.trim()).toAbsolutePath();\r
-                // File f = new File(configFilePath.toString());\r
-                \r
-                if (ClassLoader.getSystemResource(defaultConfigFilelocation.trim()) == null) {\r
-                        errorLogger.error("Default Config file " + defaultConfigFilelocation.trim()\r
-                                        + " is missing");\r
-                        System.exit(SpringApplication.exit(applicationContext, () -> {\r
-                                errorLogger.error(\r
-                                                "Application stoped due to missing default Config file");\r
-                                return -1;\r
-                        }));\r
-                }\r
-                env = System.getenv();\r
-                for (Map.Entry<String, String> entry : env.entrySet()) {\r
-                        debugLogger.debug(entry.getKey() + ":" + entry.getValue());\r
-                }\r
-                \r
-                // check for consul details\r
-                if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")\r
-                                && env.containsKey("HOSTNAME")) {\r
-                        debugLogger.info(">>>Dynamic configuration to be used");\r
-                        FetchDynamicConfig.cbsCall(defaultConfigFilelocation);\r
-                        \r
-                } else {\r
-                        debugLogger.info(">>>Static configuration to be used");\r
-                        \r
-                }\r
-                readJsonToMap(defaultConfigFilelocation);\r
-                \r
-                // prepareDatabase();\r
-                // fetchMappingFile();\r
-                \r
-                debugLogger.info("Triggering controller's start url ");\r
-                executecurl("http://localhost:" + serverPort + "/start");\r
+        // check for consul details\r
+        if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE")\r
+                && env.containsKey("HOSTNAME")) {\r
+            debugLogger.info(">>>Dynamic configuration to be used");\r
+            FetchDynamicConfig.cbsCall(defaultConfigFilelocation);\r
+            \r
+        } else {\r
+            debugLogger.info(">>>Static configuration to be used");\r
+            \r
         }\r
+        readJsonToMap(defaultConfigFilelocation);\r
+        \r
+        // prepareDatabase();\r
+        // fetchMappingFile();\r
         \r
+        debugLogger.info("Triggering controller's start url ");\r
+        executecurl("http://localhost:" + serverPort + "/start");\r
+    }\r
+    \r
+    \r
+    private static String executecurl(String url) {\r
         \r
-        private static String executecurl(String url) {\r
+        debugLogger.info("Running curl command for url:{}", url);\r
+        String[] command = {"curl", "-v", url};\r
+        ProcessBuilder process = new ProcessBuilder(command);\r
+        Process p;\r
+        String result = null;\r
+        try {\r
+            p = process.start();\r
+            try (InputStreamReader ipr = new InputStreamReader(p.getInputStream());\r
+                    BufferedReader reader = new BufferedReader(ipr)) {\r
+                StringBuilder builder = new StringBuilder();\r
+                String line;\r
                 \r
-                debugLogger.info("Running curl command for url:{}", url);\r
-                String[] command = {"curl", "-v", url};\r
-                ProcessBuilder process = new ProcessBuilder(command);\r
-                Process p;\r
-                String result = null;\r
-                try {\r
-                        p = process.start();\r
-                        try (InputStreamReader ipr = new InputStreamReader(p.getInputStream());\r
-                                        BufferedReader reader = new BufferedReader(ipr)) {\r
-                                StringBuilder builder = new StringBuilder();\r
-                                String line;\r
-                                \r
-                                while ((line = reader.readLine()) != null) {\r
-                                        builder.append(line);\r
-                                }\r
-                                result = builder.toString();\r
-                        }\r
-                } catch (IOException e) {\r
-                        errorLogger.error("error", e);\r
+                while ((line = reader.readLine()) != null) {\r
+                    builder.append(line);\r
                 }\r
-                return result;\r
-                \r
+                result = builder.toString();\r
+            }\r
+        } catch (IOException e) {\r
+            errorLogger.error("error", e);\r
         }\r
+        return result;\r
         \r
-        private void readJsonToMap(String configFile) {\r
-                try {\r
-                        JSONArray collectorArray = CollectorConfigPropertyRetrival\r
-                                        .collectorConfigArray(configFile);\r
+    }\r
+    \r
+    private void readJsonToMap(String configFile) {\r
+        try {\r
+            JSONArray collectorArray =\r
+                    CollectorConfigPropertyRetrival.collectorConfigArray(configFile);\r
+            \r
+            for (int i = 0; i < collectorArray.size(); i++) {\r
+                JSONObject obj2 = (JSONObject) collectorArray.get(i);\r
+                \r
+                if (obj2.containsKey("mapping-files")) {\r
+                    \r
+                    JSONArray a1 = (JSONArray) obj2.get("mapping-files");\r
+                    \r
+                    for (int j = 0; j < a1.size(); j++) {\r
+                        JSONObject obj3 = (JSONObject) a1.get(j);\r
+                        Set<Entry<String, String>> set = obj3.entrySet();\r
                         \r
-                        for (int i = 0; i < collectorArray.size(); i++) {\r
-                                JSONObject obj2 = (JSONObject) collectorArray.get(i);\r
-                                \r
-                                if (obj2.containsKey("mapping-files")) {\r
-                                        \r
-                                        JSONArray a1 = (JSONArray) obj2.get("mapping-files");\r
-                                        \r
-                                        for (int j = 0; j < a1.size(); j++) {\r
-                                                JSONObject obj3 = (JSONObject) a1.get(j);\r
-                                                Set<Entry<String, String>> set = obj3.entrySet();\r
-                                                \r
-                                                for (Entry<String, String> entry : set) {\r
-                                                        \r
-                                                        mappingFiles.put(entry.getKey(),\r
-                                                                        entry.getValue());\r
-                                                }\r
-                                        }\r
-                                        \r
-                                }\r
+                        for (Entry<String, String> entry : set) {\r
+                            \r
+                            mappingFiles.put(entry.getKey(), entry.getValue());\r
                         }\r
-                        \r
-                } catch (Exception e) {\r
-                        e.printStackTrace();\r
-                        errorLogger.error(\r
-                                        "Exception occured while reading Collector config file cause: ",\r
-                                        e.getCause());\r
+                    }\r
+                    \r
                 }\r
-                \r
-        }\r
-        \r
-        \r
-        /*\r
-         * private void prepareDatabase() throws IOException {\r
-         * \r
-         * \r
-         * debugLogger.info("The Default Mapping file Location:" +\r
-         * defaultMappingFileLocation.trim());\r
-         * \r
-         * if (ClassLoader.getSystemResource(defaultMappingFileLocation.trim()) == null) {\r
-         * errorLogger.error( "Default mapping file " + defaultMappingFileLocation.trim() +\r
-         * " is missing"); System.exit(SpringApplication.exit(applicationContext, () -> {\r
-         * errorLogger.error("Application stoped due to missing default mapping file"); return -1;\r
-         * })); }\r
-         * \r
-         * File file = new File(\r
-         * ClassLoader.getSystemResource(defaultMappingFileLocation.trim()).getFile());\r
-         * \r
-         * try (FileInputStream fileInputStream = new FileInputStream(file)) { bytesArray = new\r
-         * byte[(int) file.length()]; fileInputStream.read(bytesArray);\r
-         * \r
-         * } catch (IOException e1) {\r
-         * errorLogger.error("Exception Occured while reading the default mapping file ,Cause: " +\r
-         * e1.getMessage(), e1); // exit on missing default mapping file\r
-         * System.exit(SpringApplication.exit(applicationContext, () -> {\r
-         * errorLogger.error("Application stoped due to missing default mapping file"); return -1;\r
-         * })); }\r
-         * \r
-         * try (Connection con = DriverManager.getConnection(dBurl, user, pwd); // creating table if\r
-         * not exist PreparedStatement pstmt11 =\r
-         * con.prepareStatement("CREATE TABLE IF NOT EXISTS public." + MappingFileTableName + "\r\n"\r
-         * + "(\r\n" +\r
-         * "    enterpriseid character varying COLLATE pg_catalog.\"default\" NOT NULL,\r\n" +\r
-         * "    mappingfilecontents bytea,\r\n" +\r
-         * "    mimetype character varying COLLATE pg_catalog.\"default\",\r\n" +\r
-         * "    file_name character varying COLLATE pg_catalog.\"default\",\r\n" +\r
-         * "    CONSTRAINT mapping_file_pkey5 PRIMARY KEY (enterpriseid)\r\n" + ")\r\n" +\r
-         * "WITH (\r\n" + "    OIDS = FALSE\r\n" + ")\r\n" + "TABLESPACE pg_default;")) {\r
-         * \r
-         * metricsLogger.info("Postgresql Connection successful...");\r
-         * debugLogger.debug("Connection object:{}" , con.toString());\r
-         * \r
-         * pstmt11.executeUpdate();\r
-         * debugLogger.info("CREATE TABLE IF NOT EXISTS executed successfully....");\r
-         * \r
-         * if ((bytesArray.length > 0) && (!Arrays.toString(bytesArray).equals(""))) {\r
-         * \r
-         * try (PreparedStatement pstmt = con.prepareStatement("INSERT INTO " + MappingFileTableName\r
-         * +\r
-         * "(enterpriseid, mappingfilecontents, mimetype,  File_Name) VALUES (?, ?, ?, ?) ON CONFLICT (enterpriseid) DO NOTHING;"\r
-         * )) { pstmt.setString(1, defaultEnterpriseId); pstmt.setBytes(2, bytesArray);\r
-         * pstmt.setString(3, "text/xml"); pstmt.setString(4, file.getName());\r
-         * \r
-         * pstmt.executeUpdate();\r
-         * debugLogger.info("Made sure that default mapping file is present in table"); } } else {\r
-         * errorLogger.error(file.getName() + " is empty"); // exit on empty mapping file\r
-         * System.exit(SpringApplication.exit(applicationContext, () -> {\r
-         * errorLogger.error("Application stoped beacuase default mapping file is empty.."); return\r
-         * -1; })); }\r
-         * \r
-         * } catch (SQLException e) { errorLogger.error("Received exception : " + e.getMessage(),\r
-         * e); // exit on SqlException System.exit(SpringApplication.exit(applicationContext, () ->\r
-         * { errorLogger.error("Application Stoped due to ", e.getCause()); return -1; })); }\r
-         * \r
-         * }\r
-         */\r
-        /*\r
-         * public void fetchMappingFile() {\r
-         * \r
-         * try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) {\r
-         * debugLogger.info("Retrieving data from DB"); PreparedStatement pstmt =\r
-         * con.prepareStatement("SELECT * FROM mapping_file"); ResultSet rs = pstmt.executeQuery();\r
-         * // parsing the column each time is a linear search int column1Pos =\r
-         * rs.findColumn("enterpriseid"); int column2Pos = rs.findColumn("mappingfilecontents");\r
-         * String hexString; while (rs.next()) { String column1 = rs.getString(column1Pos); String\r
-         * column2 = rs.getString(column2Pos); hexString = column2.substring(2); byte[] bytes =\r
-         * Hex.decodeHex(hexString.toCharArray()); String data = new String(bytes, "UTF-8");\r
-         * mappingFiles.put(column1, data); }\r
-         * debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" +\r
-         * mappingFiles.size()); } catch (Exception e) { errorLogger.error("Error occured due to :"\r
-         * + e.getMessage()); e.printStackTrace(); }\r
-         * \r
-         * }\r
-         */\r
-        \r
-        public static Map<String, String> getMappingFiles() {\r
-                return mappingFiles;\r
-        }\r
-        \r
-        public static void setMappingFiles(Map<String, String> mappingFiles) {\r
-                VESAdapterInitializer.mappingFiles = mappingFiles;\r
-        }\r
-        \r
-        @Override\r
-        public int getOrder() {\r
-                return 0;\r
+            }\r
+            \r
+        } catch (Exception e) {\r
+            e.printStackTrace();\r
+            errorLogger.error("Exception occured while reading Collector config file cause: ",\r
+                    e.getCause());\r
         }\r
         \r
+    }\r
+    \r
+    \r
+    /*\r
+     * private void prepareDatabase() throws IOException {\r
+     * \r
+     * \r
+     * debugLogger.info("The Default Mapping file Location:" + defaultMappingFileLocation.trim());\r
+     * \r
+     * if (ClassLoader.getSystemResource(defaultMappingFileLocation.trim()) == null) {\r
+     * errorLogger.error( "Default mapping file " + defaultMappingFileLocation.trim() +\r
+     * " is missing"); System.exit(SpringApplication.exit(applicationContext, () -> {\r
+     * errorLogger.error("Application stoped due to missing default mapping file"); return -1; }));\r
+     * }\r
+     * \r
+     * File file = new File(\r
+     * ClassLoader.getSystemResource(defaultMappingFileLocation.trim()).getFile());\r
+     * \r
+     * try (FileInputStream fileInputStream = new FileInputStream(file)) { bytesArray = new\r
+     * byte[(int) file.length()]; fileInputStream.read(bytesArray);\r
+     * \r
+     * } catch (IOException e1) {\r
+     * errorLogger.error("Exception Occured while reading the default mapping file ,Cause: " +\r
+     * e1.getMessage(), e1); // exit on missing default mapping file\r
+     * System.exit(SpringApplication.exit(applicationContext, () -> {\r
+     * errorLogger.error("Application stoped due to missing default mapping file"); return -1; }));\r
+     * }\r
+     * \r
+     * try (Connection con = DriverManager.getConnection(dBurl, user, pwd); // creating table if not\r
+     * exist PreparedStatement pstmt11 = con.prepareStatement("CREATE TABLE IF NOT EXISTS public." +\r
+     * MappingFileTableName + "\r\n" + "(\r\n" +\r
+     * "    enterpriseid character varying COLLATE pg_catalog.\"default\" NOT NULL,\r\n" +\r
+     * "    mappingfilecontents bytea,\r\n" +\r
+     * "    mimetype character varying COLLATE pg_catalog.\"default\",\r\n" +\r
+     * "    file_name character varying COLLATE pg_catalog.\"default\",\r\n" +\r
+     * "    CONSTRAINT mapping_file_pkey5 PRIMARY KEY (enterpriseid)\r\n" + ")\r\n" + "WITH (\r\n" +\r
+     * "    OIDS = FALSE\r\n" + ")\r\n" + "TABLESPACE pg_default;")) {\r
+     * \r
+     * metricsLogger.info("Postgresql Connection successful...");\r
+     * debugLogger.debug("Connection object:{}" , con.toString());\r
+     * \r
+     * pstmt11.executeUpdate();\r
+     * debugLogger.info("CREATE TABLE IF NOT EXISTS executed successfully....");\r
+     * \r
+     * if ((bytesArray.length > 0) && (!Arrays.toString(bytesArray).equals(""))) {\r
+     * \r
+     * try (PreparedStatement pstmt = con.prepareStatement("INSERT INTO " + MappingFileTableName +\r
+     * "(enterpriseid, mappingfilecontents, mimetype,  File_Name) VALUES (?, ?, ?, ?) ON CONFLICT (enterpriseid) DO NOTHING;"\r
+     * )) { pstmt.setString(1, defaultEnterpriseId); pstmt.setBytes(2, bytesArray);\r
+     * pstmt.setString(3, "text/xml"); pstmt.setString(4, file.getName());\r
+     * \r
+     * pstmt.executeUpdate();\r
+     * debugLogger.info("Made sure that default mapping file is present in table"); } } else {\r
+     * errorLogger.error(file.getName() + " is empty"); // exit on empty mapping file\r
+     * System.exit(SpringApplication.exit(applicationContext, () -> {\r
+     * errorLogger.error("Application stoped beacuase default mapping file is empty.."); return -1;\r
+     * })); }\r
+     * \r
+     * } catch (SQLException e) { errorLogger.error("Received exception : " + e.getMessage(), e); //\r
+     * exit on SqlException System.exit(SpringApplication.exit(applicationContext, () -> {\r
+     * errorLogger.error("Application Stoped due to ", e.getCause()); return -1; })); }\r
+     * \r
+     * }\r
+     */\r
+    /*\r
+     * public void fetchMappingFile() {\r
+     * \r
+     * try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) {\r
+     * debugLogger.info("Retrieving data from DB"); PreparedStatement pstmt =\r
+     * con.prepareStatement("SELECT * FROM mapping_file"); ResultSet rs = pstmt.executeQuery(); //\r
+     * parsing the column each time is a linear search int column1Pos =\r
+     * rs.findColumn("enterpriseid"); int column2Pos = rs.findColumn("mappingfilecontents"); String\r
+     * hexString; while (rs.next()) { String column1 = rs.getString(column1Pos); String column2 =\r
+     * rs.getString(column2Pos); hexString = column2.substring(2); byte[] bytes =\r
+     * Hex.decodeHex(hexString.toCharArray()); String data = new String(bytes, "UTF-8");\r
+     * mappingFiles.put(column1, data); }\r
+     * debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" +\r
+     * mappingFiles.size()); } catch (Exception e) { errorLogger.error("Error occured due to :" +\r
+     * e.getMessage()); e.printStackTrace(); }\r
+     * \r
+     * }\r
+     */\r
+    \r
+    public static Map<String, String> getMappingFiles() {\r
+        return mappingFiles;\r
+    }\r
+    \r
+    public static void setMappingFiles(Map<String, String> mappingFiles) {\r
+        VESAdapterInitializer.mappingFiles = mappingFiles;\r
+    }\r
+    \r
+    @Override\r
+    public int getOrder() {\r
+        return 0;\r
+    }\r
+    \r
 }\r
index cbfeead..644c348 100644 (file)
@@ -52,169 +52,143 @@ import org.springframework.stereotype.Component;
  * @author PM00501616\r
  *\r
  */\r
-/**\r
- * @author PM00501616\r
- *\r
- */\r
+\r
 @Component\r
 public class VesService {\r
+    \r
+    private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");\r
+    private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");\r
+    private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");\r
+    \r
+    private boolean isRunning = true;\r
+    @Value("${defaultConfigFilelocation}")\r
+    private String defaultConfigFilelocation;\r
+    @Autowired\r
+    private Creator creator;\r
+    @Autowired\r
+    private UniversalEventAdapter eventAdapter;\r
+    @Autowired\r
+    private DmaapConfig dmaapConfig;\r
+    @Autowired\r
+    private CollectorConfigPropertyRetrival collectorConfigPropertyRetrival;\r
+    private static List<String> list = new LinkedList<String>();\r
+    \r
+    \r
+    /**\r
+     * method triggers universal VES adapter module.\r
+     */\r
+    public void start() throws MapperConfigException {\r
+        debugLogger.info("Creating Subcriber and Publisher with creator.............");\r
+        String topicName = null;\r
+        String publisherTopic = null;\r
+        // Hashmap of subscriber and publisher details in correspondence to the respective\r
+        // collectors in kv file\r
+        Map<String, String> dmaapTopics = collectorConfigPropertyRetrival\r
+                .getDmaapTopics("stream_subscriber", "stream_publisher", defaultConfigFilelocation);\r
         \r
-        private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");\r
-        private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");\r
-        private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");\r
-        \r
-        private boolean isRunning = true;\r
-        @Value("${defaultConfigFilelocation}")\r
-        private String defaultConfigFilelocation;\r
-        @Autowired\r
-        private Creator creator;\r
-        @Autowired\r
-        private UniversalEventAdapter eventAdapter;\r
-        @Autowired\r
-        private DmaapConfig dmaapConfig;\r
-        @Autowired\r
-        private CollectorConfigPropertyRetrival collectorConfigPropertyRetrival;\r
-        private static List<String> list = new LinkedList<String>();\r
-        \r
-        \r
-        /**\r
-         * method triggers universal VES adapter module.\r
-         */\r
-        public void start() throws MapperConfigException {\r
-                debugLogger.info("Creating Subcriber and Publisher with creator.............");\r
-                String topicName = null;\r
-                String publisherTopic = null;\r
-                // Hashmap of subscriber and publisher details in correspondence to the respective\r
-                // collectors in kv file\r
-                Map<String, String> dmaapTopics = collectorConfigPropertyRetrival.getDmaapTopics(\r
-                                "stream_subscriber", "stream_publisher", defaultConfigFilelocation);\r
+        ExecutorService executorService = Executors.newFixedThreadPool(dmaapTopics.size());\r
+        for (Map.Entry<String, String> entry : dmaapTopics.entrySet()) {\r
+            String threadName = entry.getKey();\r
+            // subcriber and corresponding publisher topics in a Map\r
+            Map<String, String> subpubTopics = collectorConfigPropertyRetrival\r
+                    .getTopics(entry.getKey(), entry.getValue(), defaultConfigFilelocation);\r
+            for (Map.Entry<String, String> entry2 : subpubTopics.entrySet()) {\r
+                topicName = entry2.getKey();\r
+                publisherTopic = entry2.getValue();\r
+            }\r
+            \r
+            \r
+            // Publisher and subcriber as per each collector\r
+            DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicName);\r
+            \r
+            DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(publisherTopic);\r
+            debugLogger.info(\r
+                    "Created scriber topic:" + topicName + "publisher topic:" + publisherTopic);\r
+            \r
+            executorService.submit(new Runnable() {\r
                 \r
-                ExecutorService executorService = Executors.newFixedThreadPool(dmaapTopics.size());\r
-                for (Map.Entry<String, String> entry : dmaapTopics.entrySet()) {\r
-                        String threadName = entry.getKey();\r
-                        // subcriber and corresponding publisher topics in a Map\r
-                        Map<String, String> subpubTopics = collectorConfigPropertyRetrival\r
-                                        .getTopics(entry.getKey(), entry.getValue(),\r
-                                                        defaultConfigFilelocation);\r
-                        for (Map.Entry<String, String> entry2 : subpubTopics.entrySet()) {\r
-                                topicName = entry2.getKey();\r
-                                publisherTopic = entry2.getValue();\r
-                        }\r
-                        \r
-                        \r
-                        // Publisher and subcriber as per each collector\r
-                        DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicName);\r
-                        \r
-                        DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(publisherTopic);\r
-                        debugLogger.info("Created scriber topic:" + topicName + "publisher topic:"\r
-                                        + publisherTopic);\r
-                        \r
-                        executorService.submit(new Runnable() {\r
+                @Override\r
+                public void run() {\r
+                    \r
+                    Thread.currentThread().setName(threadName);\r
+                    metricsLogger.info("fetch and publish from and to Dmaap started:"\r
+                            + Thread.currentThread().getName());\r
+                    int pollingInternalInt = dmaapConfig.getPollingInterval();\r
+                    debugLogger.info(\r
+                            "The Polling Interval in Milli Second is :{}" + pollingInternalInt);\r
+                    debugLogger.info("starting subscriber & publisher thread:{}",\r
+                            Thread.currentThread().getName());\r
+                    while (true) {\r
+                        synchronized (this) {\r
+                            for (String incomingJsonString : subcriber.fetchMessages()\r
+                                    .getFetchedMessages()) {\r
+                                list.add(incomingJsonString);\r
                                 \r
-                                @Override\r
-                                public void run() {\r
-                                        \r
-                                        Thread.currentThread().setName(threadName);\r
-                                        metricsLogger.info(\r
-                                                        "fetch and publish from and to Dmaap started:"\r
-                                                                        + Thread.currentThread()\r
-                                                                                        .getName());\r
-                                        int pollingInternalInt = dmaapConfig.getPollingInterval();\r
-                                        debugLogger.info(\r
-                                                        "The Polling Interval in Milli Second is :{}"\r
-                                                                        + pollingInternalInt);\r
-                                        debugLogger.info(\r
-                                                        "starting subscriber & publisher thread:{}",\r
-                                                        Thread.currentThread().getName());\r
-                                        while (true) {\r
-                                                synchronized (this) {\r
-                                                        for (String incomingJsonString : subcriber\r
-                                                                        .fetchMessages()\r
-                                                                        .getFetchedMessages()) {\r
-                                                                list.add(incomingJsonString);\r
-                                                                \r
-                                                        }\r
-                                                        \r
-                                                        if (list.isEmpty()) {\r
-                                                                try {\r
-                                                                        Thread.sleep(pollingInternalInt);\r
-                                                                } catch (InterruptedException e) {\r
-                                                                        e.printStackTrace();\r
-                                                                }\r
-                                                        }\r
-                                                        debugLogger.debug(\r
-                                                                        "number of messages to be converted :{}",\r
-                                                                        list.size());\r
-                                                        \r
-                                                        if (!list.isEmpty()) {\r
-                                                                String val = ((LinkedList<String>) list)\r
-                                                                                .removeFirst();\r
-                                                                List<String> messages =\r
-                                                                                new ArrayList<>();\r
-                                                                String vesEvent =\r
-                                                                                processReceivedJson(\r
-                                                                                                val);\r
-                                                                if (vesEvent != null && (!(vesEvent\r
-                                                                                .isEmpty()\r
-                                                                                || vesEvent.equals(\r
-                                                                                                "")))) {\r
-                                                                        messages.add(vesEvent);\r
-                                                                        publisher.publish(messages);\r
-                                                                        \r
-                                                                        metricsLogger.info(\r
-                                                                                        "Message successfully published to DMaaP Topic-\n"\r
-                                                                                                        + vesEvent);\r
-                                                                }\r
-                                                                \r
-                                                        }\r
-                                                        \r
-                                                }\r
-                                        }\r
-                                        \r
-                                        \r
-                                        \r
+                            }\r
+                            \r
+                            if (list.isEmpty()) {\r
+                                try {\r
+                                    Thread.sleep(pollingInternalInt);\r
+                                } catch (InterruptedException e) {\r
+                                    e.printStackTrace();\r
                                 }\r
-                        });\r
+                            }\r
+                            debugLogger.debug("number of messages to be converted :{}",\r
+                                    list.size());\r
+                            \r
+                            if (!list.isEmpty()) {\r
+                                String val = ((LinkedList<String>) list).removeFirst();\r
+                                List<String> messages = new ArrayList<>();\r
+                                String vesEvent = processReceivedJson(val);\r
+                                if (vesEvent != null\r
+                                        && (!(vesEvent.isEmpty() || vesEvent.equals("")))) {\r
+                                    messages.add(vesEvent);\r
+                                    publisher.publish(messages);\r
+                                    \r
+                                    metricsLogger\r
+                                            .info("Message successfully published to DMaaP Topic-\n"\r
+                                                    + vesEvent);\r
+                                }\r
+                            }\r
+                        }\r
+                    }\r
                 }\r
-                \r
-                \r
-                \r
+            });\r
         }\r
         \r
-        /**\r
-         * method stops universal ves adapter module\r
-         */\r
-        public void stop() {\r
-                isRunning = false;\r
-        }\r
         \r
         \r
-        /**\r
-         * method for processing the incoming json to ves\r
-         * \r
-         * @param incomingJsonString\r
-         * @return ves\r
-         */\r
-        private String processReceivedJson(String incomingJsonString) {\r
-                String outgoingJsonString = null;\r
-                if (!"".equals(incomingJsonString)) {\r
-                        \r
-                        try {\r
-                                \r
-                                outgoingJsonString = eventAdapter.transform(incomingJsonString);\r
-                                \r
-                        } catch (VesException exception) {\r
-                                errorLogger.error(\r
-                                                "Received exception : {},{}"\r
-                                                                + exception.getMessage(),\r
-                                                exception);\r
-                                debugLogger.warn(\r
-                                                "APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");\r
-                        } catch (DMaapException e) {\r
-                                errorLogger.error("Received exception : {}", e.getMessage());\r
-                        }\r
-                }\r
-                return outgoingJsonString;\r
+    }\r
+    \r
+    /**\r
+     * method stops universal ves adapter module\r
+     */\r
+    public void stop() {\r
+        isRunning = false;\r
+    }\r
+    \r
+    \r
+    /**\r
+     * method for processing the incoming json to ves\r
+     * \r
+     * @param incomingJsonString\r
+     * @return ves\r
+     */\r
+    private String processReceivedJson(String incomingJsonString) {\r
+        String outgoingJsonString = null;\r
+        if (!"".equals(incomingJsonString)) {\r
+            \r
+            try {\r
+                \r
+                outgoingJsonString = eventAdapter.transform(incomingJsonString);\r
+                \r
+            } catch (VesException exception) {\r
+                errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception);\r
+                debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");\r
+            } catch (DMaapException e) {\r
+                errorLogger.error("Received exception : {}", e.getMessage());\r
+            }\r
         }\r
+        return outgoingJsonString;\r
+    }\r
 }\r
-\r
index afa5c7c..9f96c62 100644 (file)
@@ -44,129 +44,128 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 \r
 @Component\r
 public class CollectorConfigPropertyRetrival {\r
+    \r
+    \r
+    private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");\r
+    private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");\r
+    private static JSONArray array;\r
+    @Autowired\r
+    private DmaapConfig dmaapConfig;\r
+    \r
+    public static JSONArray collectorConfigArray(String configFile) {\r
+        try {\r
+            JSONParser parser = new JSONParser();\r
+            String content = readFile(configFile);\r
+            JSONObject obj = (JSONObject) parser.parse(content);\r
+            JSONObject appobj = (JSONObject) obj.get("app_preferences");\r
+            array = (JSONArray) appobj.get("collectors");\r
+            \r
+            debugLogger.info("Retrieved JsonArray from Collector Config File");\r
+            \r
+        } catch (ParseException e) {\r
+            errorLogger.error("ParseException occured at position:", e.getPosition());\r
+        }\r
         \r
         \r
-        private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");\r
-        private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");\r
-        private static JSONArray array;\r
-        @Autowired\r
-        private DmaapConfig dmaapConfig;\r
-        \r
-        public static JSONArray collectorConfigArray(String configFile) {\r
-                try {\r
-                        JSONParser parser = new JSONParser();\r
-                        String content = readFile(configFile);\r
-                        JSONObject obj = (JSONObject) parser.parse(content);\r
-                        JSONObject appobj = (JSONObject) obj.get("app_preferences");\r
-                        array = (JSONArray) appobj.get("collectors");\r
-                        \r
-                        debugLogger.info("Retrieved JsonArray from Collector Config File");\r
-                        \r
-                } catch (ParseException e) {\r
-                        errorLogger.error("ParseException occured at position:", e.getPosition());\r
-                }\r
-                \r
-                \r
-                return array;\r
-                \r
-        }\r
+        return array;\r
         \r
-        public static String[] getProperyArray(String properyName,\r
-                        String defaultConfigFilelocation) {\r
-                JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation);\r
-                \r
-                String[] propertyArray = new String[jsonArray.size()];\r
-                \r
-                for (int k = 0; k < jsonArray.size(); k++) {\r
-                        \r
-                        JSONObject collJson = (JSONObject) jsonArray.get(k);\r
-                        \r
-                        propertyArray[k] = (String) collJson.get(properyName);\r
-                }\r
-                debugLogger.info("returning " + properyName + " array from Collector Config");\r
-                return propertyArray;\r
-                \r
-        }\r
+    }\r
+    \r
+    public static String[] getProperyArray(String properyName, String defaultConfigFilelocation) {\r
+        JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation);\r
         \r
-        public Map<String, String> getDmaapTopics(String subscriber, String publisher,\r
-                        String defaultConfigFilelocation) {\r
-                JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation);\r
-                \r
-                Map<String, String> dmaapTopics = new HashMap<>();\r
-                \r
-                for (int k = 0; k < jsonArray.size(); k++) {\r
-                        \r
-                        JSONObject collJson = (JSONObject) jsonArray.get(k);\r
-                        \r
-                        dmaapTopics.put(collJson.get(subscriber).toString(),\r
-                                        collJson.get(publisher).toString());\r
-                        \r
-                }\r
-                debugLogger.info("returning Dmaap topics from Collector Config");\r
-                return dmaapTopics;\r
-                \r
-        }\r
+        String[] propertyArray = new String[jsonArray.size()];\r
         \r
-        public Map<String, String> getTopics(String subscriber, String publisher,\r
-                        String defaultConfigFilelocation) {\r
-                Map<String, String> dmaapTopics = new HashMap<>();\r
-                \r
-                try {\r
-                        \r
-                        ObjectMapper objectMapper = new ObjectMapper();\r
-                        String content = readFile(defaultConfigFilelocation);\r
-                        // read JSON like DOM Parser\r
-                        JsonNode rootNode = objectMapper.readTree(content);\r
-                        JsonNode subscriberUrl = rootNode.path("streams_subscribes")\r
-                                        .path(subscriber).path("dmaap_info").path("topic_url");\r
-                        JsonNode publisherUrl = rootNode.path("streams_publishes").path(publisher)\r
-                                        .path("dmaap_info").path("topic_url");\r
-                        \r
-                        dmaapTopics.put(getTopicName(subscriberUrl.asText()),\r
-                                        getTopicName(publisherUrl.asText()));\r
-                        setDmaapConfig(subscriberUrl.asText());\r
-                } catch (IOException ex) {\r
-                        errorLogger.error("IOException occured:" + ex.getMessage());\r
-                        \r
-                } catch (URISyntaxException e) {\r
-                        \r
-                        errorLogger.error("Invalid URI :" + e.getInput() + ": " + e.getReason());\r
-                }\r
-                \r
-                return dmaapTopics;\r
-                \r
+        for (int k = 0; k < jsonArray.size(); k++) {\r
+            \r
+            JSONObject collJson = (JSONObject) jsonArray.get(k);\r
+            \r
+            propertyArray[k] = (String) collJson.get(properyName);\r
         }\r
+        debugLogger.info("returning " + properyName + " array from Collector Config");\r
+        return propertyArray;\r
         \r
-        public String getTopicName(String url) throws URISyntaxException {\r
-                URI uri = new URI(url);\r
-                String path = uri.getPath();\r
-                String idStr = path.substring(path.lastIndexOf('/') + 1);\r
-                return idStr;\r
-                \r
+    }\r
+    \r
+    public Map<String, String> getDmaapTopics(String subscriber, String publisher,\r
+            String defaultConfigFilelocation) {\r
+        JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation);\r
+        \r
+        Map<String, String> dmaapTopics = new HashMap<>();\r
+        \r
+        for (int k = 0; k < jsonArray.size(); k++) {\r
+            \r
+            JSONObject collJson = (JSONObject) jsonArray.get(k);\r
+            \r
+            dmaapTopics.put(collJson.get(subscriber).toString(),\r
+                    collJson.get(publisher).toString());\r
+            \r
         }\r
+        debugLogger.info("returning Dmaap topics from Collector Config");\r
+        return dmaapTopics;\r
+        \r
+    }\r
+    \r
+    public Map<String, String> getTopics(String subscriber, String publisher,\r
+            String defaultConfigFilelocation) {\r
+        Map<String, String> dmaapTopics = new HashMap<>();\r
         \r
-        public void setDmaapConfig(String url) throws URISyntaxException {\r
-                URI uri = new URI(url);\r
-                dmaapConfig.setDmaaphost(uri.getHost());\r
-                dmaapConfig.setDEFAULT_PORT_NUMBER(uri.getPort());\r
-                \r
+        try {\r
+            \r
+            ObjectMapper objectMapper = new ObjectMapper();\r
+            String content = readFile(defaultConfigFilelocation);\r
+            // read JSON like DOM Parser\r
+            JsonNode rootNode = objectMapper.readTree(content);\r
+            JsonNode subscriberUrl = rootNode.path("streams_subscribes").path(subscriber)\r
+                    .path("dmaap_info").path("topic_url");\r
+            JsonNode publisherUrl = rootNode.path("streams_publishes").path(publisher)\r
+                    .path("dmaap_info").path("topic_url");\r
+            \r
+            dmaapTopics.put(getTopicName(subscriberUrl.asText()),\r
+                    getTopicName(publisherUrl.asText()));\r
+            setDmaapConfig(subscriberUrl.asText());\r
+        } catch (IOException ex) {\r
+            errorLogger.error("IOException occured:" + ex.getMessage());\r
+            \r
+        } catch (URISyntaxException e) {\r
+            \r
+            errorLogger.error("Invalid URI :" + e.getInput() + ": " + e.getReason());\r
         }\r
         \r
-        public static String readFile(String configFileName) {\r
-                String content = null;\r
-                File file = null;\r
-                \r
-                try {\r
-                        file = ResourceUtils.getFile("classpath:" + configFileName);\r
-                        content = new String(Files.readAllBytes(file.toPath()));\r
-                } catch (FileNotFoundException e) {\r
-                        errorLogger.error("colud not find file :", configFileName);\r
-                        \r
-                } catch (IOException e) {\r
-                        errorLogger.error("unable to read the file , reason:", e.getCause());\r
-                }\r
-                \r
-                return content;\r
-                \r
+        return dmaapTopics;\r
+        \r
+    }\r
+    \r
+    public String getTopicName(String url) throws URISyntaxException {\r
+        URI uri = new URI(url);\r
+        String path = uri.getPath();\r
+        String idStr = path.substring(path.lastIndexOf('/') + 1);\r
+        return idStr;\r
+        \r
+    }\r
+    \r
+    public void setDmaapConfig(String url) throws URISyntaxException {\r
+        URI uri = new URI(url);\r
+        dmaapConfig.setDmaaphost(uri.getHost());\r
+        dmaapConfig.setDEFAULT_PORT_NUMBER(uri.getPort());\r
+        \r
+    }\r
+    \r
+    public static String readFile(String configFileName) {\r
+        String content = null;\r
+        File file = null;\r
+        \r
+        try {\r
+            file = ResourceUtils.getFile("classpath:" + configFileName);\r
+            content = new String(Files.readAllBytes(file.toPath()));\r
+        } catch (FileNotFoundException e) {\r
+            errorLogger.error("colud not find file :", configFileName);\r
+            \r
+        } catch (IOException e) {\r
+            errorLogger.error("unable to read the file , reason:", e.getCause());\r
         }\r
+        \r
+        return content;\r
+        \r
+    }\r
 }\r
index 4bc66bb..bb9b127 100644 (file)
@@ -43,181 +43,176 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 \r
 @Component\r
 public class FetchDynamicConfig {\r
+    \r
+    private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");\r
+    private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");\r
+    \r
+    private static String url;\r
+    public static String retString;\r
+    public static String retCBSString;\r
+    private static Map<String, String> env;\r
+    \r
+    public FetchDynamicConfig() {}\r
+    \r
+    public static void cbsCall(String configFile) {\r
         \r
-        private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");\r
-        private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");\r
+        env = System.getenv();\r
+        Boolean areEqual;\r
+        // Call consul api and identify the CBS Service address and port\r
+        getconsul();\r
+        // Construct and invoke CBS API to get application Configuration\r
+        getCBS();\r
+        // Verify if data has changed\r
+        areEqual = verifyConfigChange(configFile);\r
         \r
-        private static String url;\r
-        public static String retString;\r
-        public static String retCBSString;\r
-        private static Map<String, String> env;\r
-        \r
-        public FetchDynamicConfig() {}\r
+        if (!areEqual) {\r
+            FetchDynamicConfig fc = new FetchDynamicConfig();\r
+            fc.writefile(retCBSString, configFile);\r
+        } else {\r
+            debugLogger\r
+                    .info("New config pull results identical -  " + configFile + " NOT refreshed");\r
+        }\r
+    }\r
+    \r
+    private static void getconsul() {\r
+        url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/"\r
+                + env.get("CONFIG_BINDING_SERVICE");\r
+        retString = executecurl(url);\r
+        debugLogger.info("CBS details fetched from Consul");\r
+    }\r
+    \r
+    public static boolean verifyConfigChange(String configFile) {\r
         \r
-        public static void cbsCall(String configFile) {\r
-                \r
-                env = System.getenv();\r
-                Boolean areEqual;\r
-                // Call consul api and identify the CBS Service address and port\r
-                getconsul();\r
-                // Construct and invoke CBS API to get application Configuration\r
-                getCBS();\r
-                // Verify if data has changed\r
-                areEqual = verifyConfigChange(configFile);\r
-                \r
-                if (!areEqual) {\r
-                        FetchDynamicConfig fc = new FetchDynamicConfig();\r
-                        fc.writefile(retCBSString, configFile);\r
-                } else {\r
-                        debugLogger.info("New config pull results identical -  " + configFile\r
-                                        + " NOT refreshed");\r
-                }\r
+        boolean areEqual = false;\r
+        // Read current data\r
+        try {\r
+            \r
+            File f = new File(ClassLoader.getSystemResource(configFile.trim()).getFile());\r
+            \r
+            if (f.exists() && !f.isDirectory()) {\r
+                debugLogger.info(\r
+                        "Comparing local configuration with the configuration fethed from CBS ");\r
+                \r
+                String jsonData = readFile(configFile);\r
+                JSONObject jsonObject = new JSONObject(jsonData);\r
+                \r
+                ObjectMapper mapper = new ObjectMapper();\r
+                \r
+                JsonNode tree1 = mapper.readTree(jsonObject.toString());\r
+                JsonNode tree2 = mapper.readTree(retCBSString);\r
+                areEqual = tree1.equals(tree2);\r
+                debugLogger.info("Comparison value:" + areEqual);\r
+            } else {\r
+                debugLogger.info("First time config file read: " + configFile);\r
+            }\r
+            \r
+        } catch (IOException e) {\r
+            errorLogger.error("Comparison with new fetched data failed" + e.getMessage());\r
+            \r
         }\r
         \r
+        return areEqual;\r
         \r
-        private static void getconsul() {\r
-                url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/"\r
-                                + env.get("CONFIG_BINDING_SERVICE");\r
-                retString = executecurl(url);\r
-                debugLogger.info("CBS details fetched from Consul");\r
-        }\r
+    }\r
+    \r
+    public static void getCBS() {\r
         \r
-        public static boolean verifyConfigChange(String configFile) {\r
-                \r
-                boolean areEqual = false;\r
-                // Read current data\r
-                try {\r
-                        \r
-                        File f = new File(\r
-                                        ClassLoader.getSystemResource(configFile.trim()).getFile());\r
-                        \r
-                        if (f.exists() && !f.isDirectory()) {\r
-                                debugLogger.info(\r
-                                                "Comparing local configuration with the configuration fethed from CBS ");\r
-                                \r
-                                String jsonData = readFile(configFile);\r
-                                JSONObject jsonObject = new JSONObject(jsonData);\r
-                                \r
-                                ObjectMapper mapper = new ObjectMapper();\r
-                                \r
-                                JsonNode tree1 = mapper.readTree(jsonObject.toString());\r
-                                JsonNode tree2 = mapper.readTree(retCBSString);\r
-                                areEqual = tree1.equals(tree2);\r
-                                debugLogger.info("Comparison value:" + areEqual);\r
-                        } else {\r
-                                debugLogger.info("First time config file read: " + configFile);\r
-                        }\r
-                        \r
-                } catch (IOException e) {\r
-                        errorLogger.error(\r
-                                        "Comparison with new fetched data failed" + e.getMessage());\r
-                        \r
-                }\r
-                \r
-                return areEqual;\r
-                \r
+        // consul return as array\r
+        JSONTokener temp = new JSONTokener(retString);\r
+        JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);\r
+        \r
+        String urlPart1 = null;\r
+        if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {\r
+            \r
+            urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort");\r
+            \r
         }\r
+        debugLogger.info("CONFIG_BINDING_SERVICE HOST:PORT is " + urlPart1);\r
         \r
-        public static void getCBS() {\r
-                \r
-                // consul return as array\r
-                JSONTokener temp = new JSONTokener(retString);\r
-                JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0);\r
-                \r
-                String urlPart1 = null;\r
-                if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) {\r
-                        \r
-                        urlPart1 = cbsjobj.getString("ServiceAddress") + ":"\r
-                                        + cbsjobj.getInt("ServicePort");\r
-                        \r
-                }\r
-                debugLogger.info("CONFIG_BINDING_SERVICE HOST:PORT is " + urlPart1);\r
-                \r
-                if (env.containsKey("HOSTNAME")) {\r
-                        url = urlPart1 + "/service_component/" + env.get("HOSTNAME");\r
-                        retCBSString = executecurl(url);\r
-                        debugLogger.info("Configuration fetched from CBS successfully..");\r
-                } else if (env.containsKey("SERVICE_NAME")) {\r
-                        url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");\r
-                        retCBSString = executecurl(url);\r
-                        debugLogger.info("Configuration fetched from CBS successfully..");\r
-                } else {\r
-                        errorLogger.error(\r
-                                        "Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");\r
-                }\r
-                \r
+        if (env.containsKey("HOSTNAME")) {\r
+            url = urlPart1 + "/service_component/" + env.get("HOSTNAME");\r
+            retCBSString = executecurl(url);\r
+            debugLogger.info("Configuration fetched from CBS successfully..");\r
+        } else if (env.containsKey("SERVICE_NAME")) {\r
+            url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME");\r
+            retCBSString = executecurl(url);\r
+            debugLogger.info("Configuration fetched from CBS successfully..");\r
+        } else {\r
+            errorLogger.error(\r
+                    "Service name environment variable - HOSTNAME/SERVICE_NAME not found within container ");\r
         }\r
         \r
-        public void writefile(String retCBSString, String configFile) {\r
-                \r
-                String indentedretstring = (new JSONObject(retCBSString)).toString(4);\r
-                File f = new File(ClassLoader.getSystemResource(configFile.trim()).getFile());\r
-                try {\r
-                        debugLogger.info("Overwriting local configuration file " + configFile\r
-                                        + " with configuartions received from CBS");\r
-                        \r
-                        \r
-                        File file2 = ResourceUtils.getFile("classpath:" + configFile);\r
-                        FileWriter fstream = new FileWriter(file2, false);\r
-                        PrintWriter printWriter = new PrintWriter(fstream);\r
-                        printWriter.print(indentedretstring);\r
-                        printWriter.close();\r
-                        \r
-                        debugLogger.info("New Config successfully written to local file to "\r
-                                        + configFile);\r
-                } catch (IOException e) {\r
-                        errorLogger.error("Error in writing configuration into local KV file "\r
-                                        + configFile + retString + e.getMessage());\r
-                        e.printStackTrace();\r
-                }\r
-                \r
+    }\r
+    \r
+    public void writefile(String retCBSString, String configFile) {\r
+        \r
+        String indentedretstring = (new JSONObject(retCBSString)).toString(4);\r
+        File f = new File(ClassLoader.getSystemResource(configFile.trim()).getFile());\r
+        try {\r
+            debugLogger.info("Overwriting local configuration file " + configFile\r
+                    + " with configuartions received from CBS");\r
+            \r
+            \r
+            File file2 = ResourceUtils.getFile("classpath:" + configFile);\r
+            FileWriter fstream = new FileWriter(file2, false);\r
+            PrintWriter printWriter = new PrintWriter(fstream);\r
+            printWriter.print(indentedretstring);\r
+            printWriter.close();\r
+            \r
+            debugLogger.info("New Config successfully written to local file to " + configFile);\r
+        } catch (IOException e) {\r
+            errorLogger.error("Error in writing configuration into local KV file " + configFile\r
+                    + retString + e.getMessage());\r
+            e.printStackTrace();\r
         }\r
         \r
-        public static String readFile(String configFileName) {\r
-                String content = null;\r
-                File file = null;\r
-                \r
-                try {\r
-                        file = ResourceUtils.getFile("classpath:" + configFileName);\r
-                        content = new String(Files.readAllBytes(file.toPath()));\r
-                } catch (FileNotFoundException e) {\r
-                        errorLogger.error("colud not find file :", file.getName());\r
-                        \r
-                } catch (IOException e) {\r
-                        errorLogger.error("unable to read the file , reason:", e.getCause());\r
-                } catch (Exception e) {\r
-                        errorLogger.error("Exception occured , reason:", e.getMessage());\r
-                }\r
-                \r
-                return content;\r
-                \r
+    }\r
+    \r
+    public static String readFile(String configFileName) {\r
+        String content = null;\r
+        File file = null;\r
+        \r
+        try {\r
+            file = ResourceUtils.getFile("classpath:" + configFileName);\r
+            content = new String(Files.readAllBytes(file.toPath()));\r
+        } catch (FileNotFoundException e) {\r
+            errorLogger.error("colud not find file :", file.getName());\r
+            \r
+        } catch (IOException e) {\r
+            errorLogger.error("unable to read the file , reason:", e.getCause());\r
+        } catch (Exception e) {\r
+            errorLogger.error("Exception occured , reason:", e.getMessage());\r
         }\r
         \r
-        private static String executecurl(String url) {\r
-                \r
-                String[] command = {"curl", "-v", url};\r
-                ProcessBuilder process = new ProcessBuilder(command);\r
-                Process p;\r
-                String result = null;\r
-                try {\r
-                        p = process.start();\r
-                        InputStreamReader ipr = new InputStreamReader(p.getInputStream());\r
-                        BufferedReader reader = new BufferedReader(ipr);\r
-                        StringBuilder builder = new StringBuilder();\r
-                        String line;\r
-                        \r
-                        while ((line = reader.readLine()) != null) {\r
-                                builder.append(line);\r
-                        }\r
-                        result = builder.toString();\r
-                        reader.close();\r
-                        ipr.close();\r
-                } catch (IOException e) {\r
-                        errorLogger.error("error", e);\r
-                        e.printStackTrace();\r
-                }\r
-                return result;\r
-                \r
+        return content;\r
+        \r
+    }\r
+    \r
+    private static String executecurl(String url) {\r
+        \r
+        String[] command = {"curl", "-v", url};\r
+        ProcessBuilder process = new ProcessBuilder(command);\r
+        Process p;\r
+        String result = null;\r
+        try {\r
+            p = process.start();\r
+            InputStreamReader ipr = new InputStreamReader(p.getInputStream());\r
+            BufferedReader reader = new BufferedReader(ipr);\r
+            StringBuilder builder = new StringBuilder();\r
+            String line;\r
+            \r
+            while ((line = reader.readLine()) != null) {\r
+                builder.append(line);\r
+            }\r
+            result = builder.toString();\r
+            reader.close();\r
+            ipr.close();\r
+        } catch (IOException e) {\r
+            errorLogger.error("error", e);\r
+            e.printStackTrace();\r
         }\r
+        return result;\r
         \r
+    }\r
+    \r
 }\r