From e456b4d6e2f37873b5c967735cff8fb6f29e03b7 Mon Sep 17 00:00:00 2001 From: Pooja03 Date: Thu, 18 Apr 2019 19:57:48 +0530 Subject: [PATCH] Fixing mapping file and formatting issues Fixing mapping file issue by adding & and formatting issues mention in comments of https://gerrit.onap.org/r/#/c/85334/ Change-Id: If474fda9205fc66dcc0f5caa77d7f35ba14328a0 Issue-ID: DCAEGEN2-1445 Signed-off-by: Pooja03 --- .../blueprints/k8s-vesmapper.yaml-template.yaml | 8 +- .../service/VESAdapterInitializer.java | 390 ++++++++++----------- .../universalvesadapter/service/VesService.java | 280 +++++++-------- .../utils/CollectorConfigPropertyRetrival.java | 225 ++++++------ .../utils/FetchDynamicConfig.java | 315 ++++++++--------- 5 files changed, 588 insertions(+), 630 deletions(-) diff --git a/UniversalVesAdapter/dpo/blueprints/k8s-vesmapper.yaml-template.yaml b/UniversalVesAdapter/dpo/blueprints/k8s-vesmapper.yaml-template.yaml index 6821fa2..ad334c4 100644 --- a/UniversalVesAdapter/dpo/blueprints/k8s-vesmapper.yaml-template.yaml +++ b/UniversalVesAdapter/dpo/blueprints/k8s-vesmapper.yaml-template.yaml @@ -19,16 +19,16 @@ description: "This blueprint deploys the UniversalVESAdapter(UVA) as a Docker container\n" imports: - "http://www.getcloudify.org/spec/cloudify/3.4/types.yaml" - - "https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R3/k8splugin/1.4.4/k8splugin_types.yaml" + - "https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.12/k8splugin_types.yaml" inputs: rcc_notification_url: - default: "http://message-router.onap.svc.cluster.local:3904/events/unathenticated.DCAE_RCC_OUTPUT" + default: "http://message-router.onap.svc.cluster.local:3904/events/unauthenticated.DCAE_RCC_OUTPUT" type: string snmp_notification_url: default: "http://message-router.onap.svc.cluster.local:3904/events/unauthenticated.ONAP-COLLECTOR-SNMPTRAP" type: string tag_version: - default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.mapper.vesadapter.universalvesadaptor:latest" + default: "nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.mapper.vesadapter.universalvesadaptor:1.0.0-SNAPSHOT" type: string universal_mapper_name: default: dcaegen2-svc-mapper @@ -64,7 +64,7 @@ node_templates: identifier: notification-id mapping-files: - - defaultMappingFile-rcc-notification: "org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Version._4_0_1'pnfRegistration'org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.VesEventListenerVersion._7_0_1'registration_'+commonEventHeader.ts1'VESMapper'org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Domain.PNF_REGISTRATIONcommonEventHeader.domaincommonEventHeader.ts1commonEventHeader.ts1org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Priority.NORMALpnfRegistrationFields.vendorName+'-'+pnfRegistrationFields.serialNumberorg.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields.PnfRegistrationFieldsVersion._2_0" + defaultMappingFile-rcc-notification: "org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Version._4_0_1'pnfRegistration'org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.VesEventListenerVersion._7_0_1'registration_'+commonEventHeader.ts1'VESMapper'org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Domain.PNF_REGISTRATIONcommonEventHeader.domaincommonEventHeader.ts1commonEventHeader.ts1org.onap.dcaegen2.ves.domain.ves70.CommonEventHeader.Priority.NORMALpnfRegistrationFields.vendorName+'-'+pnfRegistrationFields.serialNumberorg.onap.dcaegen2.ves.domain.ves70.PnfRegistrationFields.PnfRegistrationFieldsVersion._2_0" stream_publisher: ves-pnfRegistration stream_subscriber: rcc-notification streams_publishes: diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java index f2adc9b..2ac4fe9 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VESAdapterInitializer.java @@ -45,218 +45,208 @@ import org.springframework.stereotype.Component; // AdapterInitializer @Component public class VESAdapterInitializer implements CommandLineRunner, Ordered { - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + + @Value("${defaultConfigFilelocation}") + String defaultConfigFilelocation; + @Value("${server.port}") + String serverPort; + + private static Map mappingFiles = new HashMap(); + private static Map env; + + @Autowired + private ApplicationContext applicationContext; + + @Override + public void run(String... args) throws Exception { + debugLogger.info("The Default Config file Location:" + defaultConfigFilelocation.trim()); - @Value("${defaultConfigFilelocation}") - String defaultConfigFilelocation; - @Value("${server.port}") - String serverPort; - - private static Map mappingFiles = new HashMap(); - private static Map env; - - @Autowired - private ApplicationContext applicationContext; + if (ClassLoader.getSystemResource(defaultConfigFilelocation.trim()) == null) { + errorLogger.error( + "Default Config file " + defaultConfigFilelocation.trim() + " is missing"); + System.exit(SpringApplication.exit(applicationContext, () -> { + errorLogger.error("Application stoped due to missing default Config file"); + return -1; + })); + } + env = System.getenv(); + for (Map.Entry entry : env.entrySet()) { + debugLogger.debug(entry.getKey() + ":" + entry.getValue()); + } - @Override - public void run(String... args) throws Exception { - debugLogger.info("The Default Config file Location:" - + defaultConfigFilelocation.trim()); - // final Path configFilePath = - // Paths.get(defaultConfigFilelocation.trim()).toAbsolutePath(); - // File f = new File(configFilePath.toString()); - - if (ClassLoader.getSystemResource(defaultConfigFilelocation.trim()) == null) { - errorLogger.error("Default Config file " + defaultConfigFilelocation.trim() - + " is missing"); - System.exit(SpringApplication.exit(applicationContext, () -> { - errorLogger.error( - "Application stoped due to missing default Config file"); - return -1; - })); - } - env = System.getenv(); - for (Map.Entry entry : env.entrySet()) { - debugLogger.debug(entry.getKey() + ":" + entry.getValue()); - } - - // check for consul details - if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") - && env.containsKey("HOSTNAME")) { - debugLogger.info(">>>Dynamic configuration to be used"); - FetchDynamicConfig.cbsCall(defaultConfigFilelocation); - - } else { - debugLogger.info(">>>Static configuration to be used"); - - } - readJsonToMap(defaultConfigFilelocation); - - // prepareDatabase(); - // fetchMappingFile(); - - debugLogger.info("Triggering controller's start url "); - executecurl("http://localhost:" + serverPort + "/start"); + // check for consul details + if (env.containsKey("CONSUL_HOST") && env.containsKey("CONFIG_BINDING_SERVICE") + && env.containsKey("HOSTNAME")) { + debugLogger.info(">>>Dynamic configuration to be used"); + FetchDynamicConfig.cbsCall(defaultConfigFilelocation); + + } else { + debugLogger.info(">>>Static configuration to be used"); + } + readJsonToMap(defaultConfigFilelocation); + + // prepareDatabase(); + // fetchMappingFile(); + debugLogger.info("Triggering controller's start url "); + executecurl("http://localhost:" + serverPort + "/start"); + } + + + private static String executecurl(String url) { - private static String executecurl(String url) { + debugLogger.info("Running curl command for url:{}", url); + String[] command = {"curl", "-v", url}; + ProcessBuilder process = new ProcessBuilder(command); + Process p; + String result = null; + try { + p = process.start(); + try (InputStreamReader ipr = new InputStreamReader(p.getInputStream()); + BufferedReader reader = new BufferedReader(ipr)) { + StringBuilder builder = new StringBuilder(); + String line; - debugLogger.info("Running curl command for url:{}", url); - String[] command = {"curl", "-v", url}; - ProcessBuilder process = new ProcessBuilder(command); - Process p; - String result = null; - try { - p = process.start(); - try (InputStreamReader ipr = new InputStreamReader(p.getInputStream()); - BufferedReader reader = new BufferedReader(ipr)) { - StringBuilder builder = new StringBuilder(); - String line; - - while ((line = reader.readLine()) != null) { - builder.append(line); - } - result = builder.toString(); - } - } catch (IOException e) { - errorLogger.error("error", e); + while ((line = reader.readLine()) != null) { + builder.append(line); } - return result; - + result = builder.toString(); + } + } catch (IOException e) { + errorLogger.error("error", e); } + return result; - private void readJsonToMap(String configFile) { - try { - JSONArray collectorArray = CollectorConfigPropertyRetrival - .collectorConfigArray(configFile); + } + + private void readJsonToMap(String configFile) { + try { + JSONArray collectorArray = + CollectorConfigPropertyRetrival.collectorConfigArray(configFile); + + for (int i = 0; i < collectorArray.size(); i++) { + JSONObject obj2 = (JSONObject) collectorArray.get(i); + + if (obj2.containsKey("mapping-files")) { + + JSONArray a1 = (JSONArray) obj2.get("mapping-files"); + + for (int j = 0; j < a1.size(); j++) { + JSONObject obj3 = (JSONObject) a1.get(j); + Set> set = obj3.entrySet(); - for (int i = 0; i < collectorArray.size(); i++) { - JSONObject obj2 = (JSONObject) collectorArray.get(i); - - if (obj2.containsKey("mapping-files")) { - - JSONArray a1 = (JSONArray) obj2.get("mapping-files"); - - for (int j = 0; j < a1.size(); j++) { - JSONObject obj3 = (JSONObject) a1.get(j); - Set> set = obj3.entrySet(); - - for (Entry entry : set) { - - mappingFiles.put(entry.getKey(), - entry.getValue()); - } - } - - } + for (Entry entry : set) { + + mappingFiles.put(entry.getKey(), entry.getValue()); } - - } catch (Exception e) { - e.printStackTrace(); - errorLogger.error( - "Exception occured while reading Collector config file cause: ", - e.getCause()); + } + } - - } - - - /* - * private void prepareDatabase() throws IOException { - * - * - * debugLogger.info("The Default Mapping file Location:" + - * defaultMappingFileLocation.trim()); - * - * if (ClassLoader.getSystemResource(defaultMappingFileLocation.trim()) == null) { - * errorLogger.error( "Default mapping file " + defaultMappingFileLocation.trim() + - * " is missing"); System.exit(SpringApplication.exit(applicationContext, () -> { - * errorLogger.error("Application stoped due to missing default mapping file"); return -1; - * })); } - * - * File file = new File( - * ClassLoader.getSystemResource(defaultMappingFileLocation.trim()).getFile()); - * - * try (FileInputStream fileInputStream = new FileInputStream(file)) { bytesArray = new - * byte[(int) file.length()]; fileInputStream.read(bytesArray); - * - * } catch (IOException e1) { - * errorLogger.error("Exception Occured while reading the default mapping file ,Cause: " + - * e1.getMessage(), e1); // exit on missing default mapping file - * System.exit(SpringApplication.exit(applicationContext, () -> { - * errorLogger.error("Application stoped due to missing default mapping file"); return -1; - * })); } - * - * try (Connection con = DriverManager.getConnection(dBurl, user, pwd); // creating table if - * not exist PreparedStatement pstmt11 = - * con.prepareStatement("CREATE TABLE IF NOT EXISTS public." + MappingFileTableName + "\r\n" - * + "(\r\n" + - * " enterpriseid character varying COLLATE pg_catalog.\"default\" NOT NULL,\r\n" + - * " mappingfilecontents bytea,\r\n" + - * " mimetype character varying COLLATE pg_catalog.\"default\",\r\n" + - * " file_name character varying COLLATE pg_catalog.\"default\",\r\n" + - * " CONSTRAINT mapping_file_pkey5 PRIMARY KEY (enterpriseid)\r\n" + ")\r\n" + - * "WITH (\r\n" + " OIDS = FALSE\r\n" + ")\r\n" + "TABLESPACE pg_default;")) { - * - * metricsLogger.info("Postgresql Connection successful..."); - * debugLogger.debug("Connection object:{}" , con.toString()); - * - * pstmt11.executeUpdate(); - * debugLogger.info("CREATE TABLE IF NOT EXISTS executed successfully...."); - * - * if ((bytesArray.length > 0) && (!Arrays.toString(bytesArray).equals(""))) { - * - * try (PreparedStatement pstmt = con.prepareStatement("INSERT INTO " + MappingFileTableName - * + - * "(enterpriseid, mappingfilecontents, mimetype, File_Name) VALUES (?, ?, ?, ?) ON CONFLICT (enterpriseid) DO NOTHING;" - * )) { pstmt.setString(1, defaultEnterpriseId); pstmt.setBytes(2, bytesArray); - * pstmt.setString(3, "text/xml"); pstmt.setString(4, file.getName()); - * - * pstmt.executeUpdate(); - * debugLogger.info("Made sure that default mapping file is present in table"); } } else { - * errorLogger.error(file.getName() + " is empty"); // exit on empty mapping file - * System.exit(SpringApplication.exit(applicationContext, () -> { - * errorLogger.error("Application stoped beacuase default mapping file is empty.."); return - * -1; })); } - * - * } catch (SQLException e) { errorLogger.error("Received exception : " + e.getMessage(), - * e); // exit on SqlException System.exit(SpringApplication.exit(applicationContext, () -> - * { errorLogger.error("Application Stoped due to ", e.getCause()); return -1; })); } - * - * } - */ - /* - * public void fetchMappingFile() { - * - * try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) { - * debugLogger.info("Retrieving data from DB"); PreparedStatement pstmt = - * con.prepareStatement("SELECT * FROM mapping_file"); ResultSet rs = pstmt.executeQuery(); - * // parsing the column each time is a linear search int column1Pos = - * rs.findColumn("enterpriseid"); int column2Pos = rs.findColumn("mappingfilecontents"); - * String hexString; while (rs.next()) { String column1 = rs.getString(column1Pos); String - * column2 = rs.getString(column2Pos); hexString = column2.substring(2); byte[] bytes = - * Hex.decodeHex(hexString.toCharArray()); String data = new String(bytes, "UTF-8"); - * mappingFiles.put(column1, data); } - * debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" + - * mappingFiles.size()); } catch (Exception e) { errorLogger.error("Error occured due to :" - * + e.getMessage()); e.printStackTrace(); } - * - * } - */ - - public static Map getMappingFiles() { - return mappingFiles; - } - - public static void setMappingFiles(Map mappingFiles) { - VESAdapterInitializer.mappingFiles = mappingFiles; - } - - @Override - public int getOrder() { - return 0; + } + + } catch (Exception e) { + e.printStackTrace(); + errorLogger.error("Exception occured while reading Collector config file cause: ", + e.getCause()); } + } + + + /* + * private void prepareDatabase() throws IOException { + * + * + * debugLogger.info("The Default Mapping file Location:" + defaultMappingFileLocation.trim()); + * + * if (ClassLoader.getSystemResource(defaultMappingFileLocation.trim()) == null) { + * errorLogger.error( "Default mapping file " + defaultMappingFileLocation.trim() + + * " is missing"); System.exit(SpringApplication.exit(applicationContext, () -> { + * errorLogger.error("Application stoped due to missing default mapping file"); return -1; })); + * } + * + * File file = new File( + * ClassLoader.getSystemResource(defaultMappingFileLocation.trim()).getFile()); + * + * try (FileInputStream fileInputStream = new FileInputStream(file)) { bytesArray = new + * byte[(int) file.length()]; fileInputStream.read(bytesArray); + * + * } catch (IOException e1) { + * errorLogger.error("Exception Occured while reading the default mapping file ,Cause: " + + * e1.getMessage(), e1); // exit on missing default mapping file + * System.exit(SpringApplication.exit(applicationContext, () -> { + * errorLogger.error("Application stoped due to missing default mapping file"); return -1; })); + * } + * + * try (Connection con = DriverManager.getConnection(dBurl, user, pwd); // creating table if not + * exist PreparedStatement pstmt11 = con.prepareStatement("CREATE TABLE IF NOT EXISTS public." + + * MappingFileTableName + "\r\n" + "(\r\n" + + * " enterpriseid character varying COLLATE pg_catalog.\"default\" NOT NULL,\r\n" + + * " mappingfilecontents bytea,\r\n" + + * " mimetype character varying COLLATE pg_catalog.\"default\",\r\n" + + * " file_name character varying COLLATE pg_catalog.\"default\",\r\n" + + * " CONSTRAINT mapping_file_pkey5 PRIMARY KEY (enterpriseid)\r\n" + ")\r\n" + "WITH (\r\n" + + * " OIDS = FALSE\r\n" + ")\r\n" + "TABLESPACE pg_default;")) { + * + * metricsLogger.info("Postgresql Connection successful..."); + * debugLogger.debug("Connection object:{}" , con.toString()); + * + * pstmt11.executeUpdate(); + * debugLogger.info("CREATE TABLE IF NOT EXISTS executed successfully...."); + * + * if ((bytesArray.length > 0) && (!Arrays.toString(bytesArray).equals(""))) { + * + * try (PreparedStatement pstmt = con.prepareStatement("INSERT INTO " + MappingFileTableName + + * "(enterpriseid, mappingfilecontents, mimetype, File_Name) VALUES (?, ?, ?, ?) ON CONFLICT (enterpriseid) DO NOTHING;" + * )) { pstmt.setString(1, defaultEnterpriseId); pstmt.setBytes(2, bytesArray); + * pstmt.setString(3, "text/xml"); pstmt.setString(4, file.getName()); + * + * pstmt.executeUpdate(); + * debugLogger.info("Made sure that default mapping file is present in table"); } } else { + * errorLogger.error(file.getName() + " is empty"); // exit on empty mapping file + * System.exit(SpringApplication.exit(applicationContext, () -> { + * errorLogger.error("Application stoped beacuase default mapping file is empty.."); return -1; + * })); } + * + * } catch (SQLException e) { errorLogger.error("Received exception : " + e.getMessage(), e); // + * exit on SqlException System.exit(SpringApplication.exit(applicationContext, () -> { + * errorLogger.error("Application Stoped due to ", e.getCause()); return -1; })); } + * + * } + */ + /* + * public void fetchMappingFile() { + * + * try (Connection con = DriverManager.getConnection(dBurl, user, pwd)) { + * debugLogger.info("Retrieving data from DB"); PreparedStatement pstmt = + * con.prepareStatement("SELECT * FROM mapping_file"); ResultSet rs = pstmt.executeQuery(); // + * parsing the column each time is a linear search int column1Pos = + * rs.findColumn("enterpriseid"); int column2Pos = rs.findColumn("mappingfilecontents"); String + * hexString; while (rs.next()) { String column1 = rs.getString(column1Pos); String column2 = + * rs.getString(column2Pos); hexString = column2.substring(2); byte[] bytes = + * Hex.decodeHex(hexString.toCharArray()); String data = new String(bytes, "UTF-8"); + * mappingFiles.put(column1, data); } + * debugLogger.info("DB Initialization Completed, Total # Mappingfiles are" + + * mappingFiles.size()); } catch (Exception e) { errorLogger.error("Error occured due to :" + + * e.getMessage()); e.printStackTrace(); } + * + * } + */ + + public static Map getMappingFiles() { + return mappingFiles; + } + + public static void setMappingFiles(Map mappingFiles) { + VESAdapterInitializer.mappingFiles = mappingFiles; + } + + @Override + public int getOrder() { + return 0; + } + } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java index cbfeead..644c348 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java @@ -52,169 +52,143 @@ import org.springframework.stereotype.Component; * @author PM00501616 * */ -/** - * @author PM00501616 - * - */ + @Component public class VesService { + + private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger"); + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + + private boolean isRunning = true; + @Value("${defaultConfigFilelocation}") + private String defaultConfigFilelocation; + @Autowired + private Creator creator; + @Autowired + private UniversalEventAdapter eventAdapter; + @Autowired + private DmaapConfig dmaapConfig; + @Autowired + private CollectorConfigPropertyRetrival collectorConfigPropertyRetrival; + private static List list = new LinkedList(); + + + /** + * method triggers universal VES adapter module. + */ + public void start() throws MapperConfigException { + debugLogger.info("Creating Subcriber and Publisher with creator............."); + String topicName = null; + String publisherTopic = null; + // Hashmap of subscriber and publisher details in correspondence to the respective + // collectors in kv file + Map dmaapTopics = collectorConfigPropertyRetrival + .getDmaapTopics("stream_subscriber", "stream_publisher", defaultConfigFilelocation); - private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger"); - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); - - private boolean isRunning = true; - @Value("${defaultConfigFilelocation}") - private String defaultConfigFilelocation; - @Autowired - private Creator creator; - @Autowired - private UniversalEventAdapter eventAdapter; - @Autowired - private DmaapConfig dmaapConfig; - @Autowired - private CollectorConfigPropertyRetrival collectorConfigPropertyRetrival; - private static List list = new LinkedList(); - - - /** - * method triggers universal VES adapter module. - */ - public void start() throws MapperConfigException { - debugLogger.info("Creating Subcriber and Publisher with creator............."); - String topicName = null; - String publisherTopic = null; - // Hashmap of subscriber and publisher details in correspondence to the respective - // collectors in kv file - Map dmaapTopics = collectorConfigPropertyRetrival.getDmaapTopics( - "stream_subscriber", "stream_publisher", defaultConfigFilelocation); + ExecutorService executorService = Executors.newFixedThreadPool(dmaapTopics.size()); + for (Map.Entry entry : dmaapTopics.entrySet()) { + String threadName = entry.getKey(); + // subcriber and corresponding publisher topics in a Map + Map subpubTopics = collectorConfigPropertyRetrival + .getTopics(entry.getKey(), entry.getValue(), defaultConfigFilelocation); + for (Map.Entry entry2 : subpubTopics.entrySet()) { + topicName = entry2.getKey(); + publisherTopic = entry2.getValue(); + } + + + // Publisher and subcriber as per each collector + DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicName); + + DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(publisherTopic); + debugLogger.info( + "Created scriber topic:" + topicName + "publisher topic:" + publisherTopic); + + executorService.submit(new Runnable() { - ExecutorService executorService = Executors.newFixedThreadPool(dmaapTopics.size()); - for (Map.Entry entry : dmaapTopics.entrySet()) { - String threadName = entry.getKey(); - // subcriber and corresponding publisher topics in a Map - Map subpubTopics = collectorConfigPropertyRetrival - .getTopics(entry.getKey(), entry.getValue(), - defaultConfigFilelocation); - for (Map.Entry entry2 : subpubTopics.entrySet()) { - topicName = entry2.getKey(); - publisherTopic = entry2.getValue(); - } - - - // Publisher and subcriber as per each collector - DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicName); - - DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(publisherTopic); - debugLogger.info("Created scriber topic:" + topicName + "publisher topic:" - + publisherTopic); - - executorService.submit(new Runnable() { + @Override + public void run() { + + Thread.currentThread().setName(threadName); + metricsLogger.info("fetch and publish from and to Dmaap started:" + + Thread.currentThread().getName()); + int pollingInternalInt = dmaapConfig.getPollingInterval(); + debugLogger.info( + "The Polling Interval in Milli Second is :{}" + pollingInternalInt); + debugLogger.info("starting subscriber & publisher thread:{}", + Thread.currentThread().getName()); + while (true) { + synchronized (this) { + for (String incomingJsonString : subcriber.fetchMessages() + .getFetchedMessages()) { + list.add(incomingJsonString); - @Override - public void run() { - - Thread.currentThread().setName(threadName); - metricsLogger.info( - "fetch and publish from and to Dmaap started:" - + Thread.currentThread() - .getName()); - int pollingInternalInt = dmaapConfig.getPollingInterval(); - debugLogger.info( - "The Polling Interval in Milli Second is :{}" - + pollingInternalInt); - debugLogger.info( - "starting subscriber & publisher thread:{}", - Thread.currentThread().getName()); - while (true) { - synchronized (this) { - for (String incomingJsonString : subcriber - .fetchMessages() - .getFetchedMessages()) { - list.add(incomingJsonString); - - } - - if (list.isEmpty()) { - try { - Thread.sleep(pollingInternalInt); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - debugLogger.debug( - "number of messages to be converted :{}", - list.size()); - - if (!list.isEmpty()) { - String val = ((LinkedList) list) - .removeFirst(); - List messages = - new ArrayList<>(); - String vesEvent = - processReceivedJson( - val); - if (vesEvent != null && (!(vesEvent - .isEmpty() - || vesEvent.equals( - "")))) { - messages.add(vesEvent); - publisher.publish(messages); - - metricsLogger.info( - "Message successfully published to DMaaP Topic-\n" - + vesEvent); - } - - } - - } - } - - - + } + + if (list.isEmpty()) { + try { + Thread.sleep(pollingInternalInt); + } catch (InterruptedException e) { + e.printStackTrace(); } - }); + } + debugLogger.debug("number of messages to be converted :{}", + list.size()); + + if (!list.isEmpty()) { + String val = ((LinkedList) list).removeFirst(); + List messages = new ArrayList<>(); + String vesEvent = processReceivedJson(val); + if (vesEvent != null + && (!(vesEvent.isEmpty() || vesEvent.equals("")))) { + messages.add(vesEvent); + publisher.publish(messages); + + metricsLogger + .info("Message successfully published to DMaaP Topic-\n" + + vesEvent); + } + } + } + } } - - - + }); } - /** - * method stops universal ves adapter module - */ - public void stop() { - isRunning = false; - } - /** - * method for processing the incoming json to ves - * - * @param incomingJsonString - * @return ves - */ - private String processReceivedJson(String incomingJsonString) { - String outgoingJsonString = null; - if (!"".equals(incomingJsonString)) { - - try { - - outgoingJsonString = eventAdapter.transform(incomingJsonString); - - } catch (VesException exception) { - errorLogger.error( - "Received exception : {},{}" - + exception.getMessage(), - exception); - debugLogger.warn( - "APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); - } catch (DMaapException e) { - errorLogger.error("Received exception : {}", e.getMessage()); - } - } - return outgoingJsonString; + } + + /** + * method stops universal ves adapter module + */ + public void stop() { + isRunning = false; + } + + + /** + * method for processing the incoming json to ves + * + * @param incomingJsonString + * @return ves + */ + private String processReceivedJson(String incomingJsonString) { + String outgoingJsonString = null; + if (!"".equals(incomingJsonString)) { + + try { + + outgoingJsonString = eventAdapter.transform(incomingJsonString); + + } catch (VesException exception) { + errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception); + debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); + } catch (DMaapException e) { + errorLogger.error("Received exception : {}", e.getMessage()); + } } + return outgoingJsonString; + } } - diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java index afa5c7c..9f96c62 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/CollectorConfigPropertyRetrival.java @@ -44,129 +44,128 @@ import com.fasterxml.jackson.databind.ObjectMapper; @Component public class CollectorConfigPropertyRetrival { + + + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + private static JSONArray array; + @Autowired + private DmaapConfig dmaapConfig; + + public static JSONArray collectorConfigArray(String configFile) { + try { + JSONParser parser = new JSONParser(); + String content = readFile(configFile); + JSONObject obj = (JSONObject) parser.parse(content); + JSONObject appobj = (JSONObject) obj.get("app_preferences"); + array = (JSONArray) appobj.get("collectors"); + + debugLogger.info("Retrieved JsonArray from Collector Config File"); + + } catch (ParseException e) { + errorLogger.error("ParseException occured at position:", e.getPosition()); + } - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); - private static JSONArray array; - @Autowired - private DmaapConfig dmaapConfig; - - public static JSONArray collectorConfigArray(String configFile) { - try { - JSONParser parser = new JSONParser(); - String content = readFile(configFile); - JSONObject obj = (JSONObject) parser.parse(content); - JSONObject appobj = (JSONObject) obj.get("app_preferences"); - array = (JSONArray) appobj.get("collectors"); - - debugLogger.info("Retrieved JsonArray from Collector Config File"); - - } catch (ParseException e) { - errorLogger.error("ParseException occured at position:", e.getPosition()); - } - - - return array; - - } + return array; - public static String[] getProperyArray(String properyName, - String defaultConfigFilelocation) { - JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation); - - String[] propertyArray = new String[jsonArray.size()]; - - for (int k = 0; k < jsonArray.size(); k++) { - - JSONObject collJson = (JSONObject) jsonArray.get(k); - - propertyArray[k] = (String) collJson.get(properyName); - } - debugLogger.info("returning " + properyName + " array from Collector Config"); - return propertyArray; - - } + } + + public static String[] getProperyArray(String properyName, String defaultConfigFilelocation) { + JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation); - public Map getDmaapTopics(String subscriber, String publisher, - String defaultConfigFilelocation) { - JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation); - - Map dmaapTopics = new HashMap<>(); - - for (int k = 0; k < jsonArray.size(); k++) { - - JSONObject collJson = (JSONObject) jsonArray.get(k); - - dmaapTopics.put(collJson.get(subscriber).toString(), - collJson.get(publisher).toString()); - - } - debugLogger.info("returning Dmaap topics from Collector Config"); - return dmaapTopics; - - } + String[] propertyArray = new String[jsonArray.size()]; - public Map getTopics(String subscriber, String publisher, - String defaultConfigFilelocation) { - Map dmaapTopics = new HashMap<>(); - - try { - - ObjectMapper objectMapper = new ObjectMapper(); - String content = readFile(defaultConfigFilelocation); - // read JSON like DOM Parser - JsonNode rootNode = objectMapper.readTree(content); - JsonNode subscriberUrl = rootNode.path("streams_subscribes") - .path(subscriber).path("dmaap_info").path("topic_url"); - JsonNode publisherUrl = rootNode.path("streams_publishes").path(publisher) - .path("dmaap_info").path("topic_url"); - - dmaapTopics.put(getTopicName(subscriberUrl.asText()), - getTopicName(publisherUrl.asText())); - setDmaapConfig(subscriberUrl.asText()); - } catch (IOException ex) { - errorLogger.error("IOException occured:" + ex.getMessage()); - - } catch (URISyntaxException e) { - - errorLogger.error("Invalid URI :" + e.getInput() + ": " + e.getReason()); - } - - return dmaapTopics; - + for (int k = 0; k < jsonArray.size(); k++) { + + JSONObject collJson = (JSONObject) jsonArray.get(k); + + propertyArray[k] = (String) collJson.get(properyName); } + debugLogger.info("returning " + properyName + " array from Collector Config"); + return propertyArray; - public String getTopicName(String url) throws URISyntaxException { - URI uri = new URI(url); - String path = uri.getPath(); - String idStr = path.substring(path.lastIndexOf('/') + 1); - return idStr; - + } + + public Map getDmaapTopics(String subscriber, String publisher, + String defaultConfigFilelocation) { + JSONArray jsonArray = collectorConfigArray(defaultConfigFilelocation); + + Map dmaapTopics = new HashMap<>(); + + for (int k = 0; k < jsonArray.size(); k++) { + + JSONObject collJson = (JSONObject) jsonArray.get(k); + + dmaapTopics.put(collJson.get(subscriber).toString(), + collJson.get(publisher).toString()); + } + debugLogger.info("returning Dmaap topics from Collector Config"); + return dmaapTopics; + + } + + public Map getTopics(String subscriber, String publisher, + String defaultConfigFilelocation) { + Map dmaapTopics = new HashMap<>(); - public void setDmaapConfig(String url) throws URISyntaxException { - URI uri = new URI(url); - dmaapConfig.setDmaaphost(uri.getHost()); - dmaapConfig.setDEFAULT_PORT_NUMBER(uri.getPort()); - + try { + + ObjectMapper objectMapper = new ObjectMapper(); + String content = readFile(defaultConfigFilelocation); + // read JSON like DOM Parser + JsonNode rootNode = objectMapper.readTree(content); + JsonNode subscriberUrl = rootNode.path("streams_subscribes").path(subscriber) + .path("dmaap_info").path("topic_url"); + JsonNode publisherUrl = rootNode.path("streams_publishes").path(publisher) + .path("dmaap_info").path("topic_url"); + + dmaapTopics.put(getTopicName(subscriberUrl.asText()), + getTopicName(publisherUrl.asText())); + setDmaapConfig(subscriberUrl.asText()); + } catch (IOException ex) { + errorLogger.error("IOException occured:" + ex.getMessage()); + + } catch (URISyntaxException e) { + + errorLogger.error("Invalid URI :" + e.getInput() + ": " + e.getReason()); } - public static String readFile(String configFileName) { - String content = null; - File file = null; - - try { - file = ResourceUtils.getFile("classpath:" + configFileName); - content = new String(Files.readAllBytes(file.toPath())); - } catch (FileNotFoundException e) { - errorLogger.error("colud not find file :", configFileName); - - } catch (IOException e) { - errorLogger.error("unable to read the file , reason:", e.getCause()); - } - - return content; - + return dmaapTopics; + + } + + public String getTopicName(String url) throws URISyntaxException { + URI uri = new URI(url); + String path = uri.getPath(); + String idStr = path.substring(path.lastIndexOf('/') + 1); + return idStr; + + } + + public void setDmaapConfig(String url) throws URISyntaxException { + URI uri = new URI(url); + dmaapConfig.setDmaaphost(uri.getHost()); + dmaapConfig.setDEFAULT_PORT_NUMBER(uri.getPort()); + + } + + public static String readFile(String configFileName) { + String content = null; + File file = null; + + try { + file = ResourceUtils.getFile("classpath:" + configFileName); + content = new String(Files.readAllBytes(file.toPath())); + } catch (FileNotFoundException e) { + errorLogger.error("colud not find file :", configFileName); + + } catch (IOException e) { + errorLogger.error("unable to read the file , reason:", e.getCause()); } + + return content; + + } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java index 4bc66bb..bb9b127 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/utils/FetchDynamicConfig.java @@ -43,181 +43,176 @@ import com.fasterxml.jackson.databind.ObjectMapper; @Component public class FetchDynamicConfig { + + private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); + private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + + private static String url; + public static String retString; + public static String retCBSString; + private static Map env; + + public FetchDynamicConfig() {} + + public static void cbsCall(String configFile) { - private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger"); - private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); + env = System.getenv(); + Boolean areEqual; + // Call consul api and identify the CBS Service address and port + getconsul(); + // Construct and invoke CBS API to get application Configuration + getCBS(); + // Verify if data has changed + areEqual = verifyConfigChange(configFile); - private static String url; - public static String retString; - public static String retCBSString; - private static Map env; - - public FetchDynamicConfig() {} + if (!areEqual) { + FetchDynamicConfig fc = new FetchDynamicConfig(); + fc.writefile(retCBSString, configFile); + } else { + debugLogger + .info("New config pull results identical - " + configFile + " NOT refreshed"); + } + } + + private static void getconsul() { + url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" + + env.get("CONFIG_BINDING_SERVICE"); + retString = executecurl(url); + debugLogger.info("CBS details fetched from Consul"); + } + + public static boolean verifyConfigChange(String configFile) { - public static void cbsCall(String configFile) { - - env = System.getenv(); - Boolean areEqual; - // Call consul api and identify the CBS Service address and port - getconsul(); - // Construct and invoke CBS API to get application Configuration - getCBS(); - // Verify if data has changed - areEqual = verifyConfigChange(configFile); - - if (!areEqual) { - FetchDynamicConfig fc = new FetchDynamicConfig(); - fc.writefile(retCBSString, configFile); - } else { - debugLogger.info("New config pull results identical - " + configFile - + " NOT refreshed"); - } + boolean areEqual = false; + // Read current data + try { + + File f = new File(ClassLoader.getSystemResource(configFile.trim()).getFile()); + + if (f.exists() && !f.isDirectory()) { + debugLogger.info( + "Comparing local configuration with the configuration fethed from CBS "); + + String jsonData = readFile(configFile); + JSONObject jsonObject = new JSONObject(jsonData); + + ObjectMapper mapper = new ObjectMapper(); + + JsonNode tree1 = mapper.readTree(jsonObject.toString()); + JsonNode tree2 = mapper.readTree(retCBSString); + areEqual = tree1.equals(tree2); + debugLogger.info("Comparison value:" + areEqual); + } else { + debugLogger.info("First time config file read: " + configFile); + } + + } catch (IOException e) { + errorLogger.error("Comparison with new fetched data failed" + e.getMessage()); + } + return areEqual; - private static void getconsul() { - url = env.get("CONSUL_HOST") + ":8500/v1/catalog/service/" - + env.get("CONFIG_BINDING_SERVICE"); - retString = executecurl(url); - debugLogger.info("CBS details fetched from Consul"); - } + } + + public static void getCBS() { - public static boolean verifyConfigChange(String configFile) { - - boolean areEqual = false; - // Read current data - try { - - File f = new File( - ClassLoader.getSystemResource(configFile.trim()).getFile()); - - if (f.exists() && !f.isDirectory()) { - debugLogger.info( - "Comparing local configuration with the configuration fethed from CBS "); - - String jsonData = readFile(configFile); - JSONObject jsonObject = new JSONObject(jsonData); - - ObjectMapper mapper = new ObjectMapper(); - - JsonNode tree1 = mapper.readTree(jsonObject.toString()); - JsonNode tree2 = mapper.readTree(retCBSString); - areEqual = tree1.equals(tree2); - debugLogger.info("Comparison value:" + areEqual); - } else { - debugLogger.info("First time config file read: " + configFile); - } - - } catch (IOException e) { - errorLogger.error( - "Comparison with new fetched data failed" + e.getMessage()); - - } - - return areEqual; - + // consul return as array + JSONTokener temp = new JSONTokener(retString); + JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0); + + String urlPart1 = null; + if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) { + + urlPart1 = cbsjobj.getString("ServiceAddress") + ":" + cbsjobj.getInt("ServicePort"); + } + debugLogger.info("CONFIG_BINDING_SERVICE HOST:PORT is " + urlPart1); - public static void getCBS() { - - // consul return as array - JSONTokener temp = new JSONTokener(retString); - JSONObject cbsjobj = (JSONObject) new JSONArray(temp).get(0); - - String urlPart1 = null; - if (cbsjobj.has("ServiceAddress") && cbsjobj.has("ServicePort")) { - - urlPart1 = cbsjobj.getString("ServiceAddress") + ":" - + cbsjobj.getInt("ServicePort"); - - } - debugLogger.info("CONFIG_BINDING_SERVICE HOST:PORT is " + urlPart1); - - if (env.containsKey("HOSTNAME")) { - url = urlPart1 + "/service_component/" + env.get("HOSTNAME"); - retCBSString = executecurl(url); - debugLogger.info("Configuration fetched from CBS successfully.."); - } else if (env.containsKey("SERVICE_NAME")) { - url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME"); - retCBSString = executecurl(url); - debugLogger.info("Configuration fetched from CBS successfully.."); - } else { - errorLogger.error( - "Service name environment variable - HOSTNAME/SERVICE_NAME not found within container "); - } - + if (env.containsKey("HOSTNAME")) { + url = urlPart1 + "/service_component/" + env.get("HOSTNAME"); + retCBSString = executecurl(url); + debugLogger.info("Configuration fetched from CBS successfully.."); + } else if (env.containsKey("SERVICE_NAME")) { + url = urlPart1 + "/service_component/" + env.get("SERVICE_NAME"); + retCBSString = executecurl(url); + debugLogger.info("Configuration fetched from CBS successfully.."); + } else { + errorLogger.error( + "Service name environment variable - HOSTNAME/SERVICE_NAME not found within container "); } - public void writefile(String retCBSString, String configFile) { - - String indentedretstring = (new JSONObject(retCBSString)).toString(4); - File f = new File(ClassLoader.getSystemResource(configFile.trim()).getFile()); - try { - debugLogger.info("Overwriting local configuration file " + configFile - + " with configuartions received from CBS"); - - - File file2 = ResourceUtils.getFile("classpath:" + configFile); - FileWriter fstream = new FileWriter(file2, false); - PrintWriter printWriter = new PrintWriter(fstream); - printWriter.print(indentedretstring); - printWriter.close(); - - debugLogger.info("New Config successfully written to local file to " - + configFile); - } catch (IOException e) { - errorLogger.error("Error in writing configuration into local KV file " - + configFile + retString + e.getMessage()); - e.printStackTrace(); - } - + } + + public void writefile(String retCBSString, String configFile) { + + String indentedretstring = (new JSONObject(retCBSString)).toString(4); + File f = new File(ClassLoader.getSystemResource(configFile.trim()).getFile()); + try { + debugLogger.info("Overwriting local configuration file " + configFile + + " with configuartions received from CBS"); + + + File file2 = ResourceUtils.getFile("classpath:" + configFile); + FileWriter fstream = new FileWriter(file2, false); + PrintWriter printWriter = new PrintWriter(fstream); + printWriter.print(indentedretstring); + printWriter.close(); + + debugLogger.info("New Config successfully written to local file to " + configFile); + } catch (IOException e) { + errorLogger.error("Error in writing configuration into local KV file " + configFile + + retString + e.getMessage()); + e.printStackTrace(); } - public static String readFile(String configFileName) { - String content = null; - File file = null; - - try { - file = ResourceUtils.getFile("classpath:" + configFileName); - content = new String(Files.readAllBytes(file.toPath())); - } catch (FileNotFoundException e) { - errorLogger.error("colud not find file :", file.getName()); - - } catch (IOException e) { - errorLogger.error("unable to read the file , reason:", e.getCause()); - } catch (Exception e) { - errorLogger.error("Exception occured , reason:", e.getMessage()); - } - - return content; - + } + + public static String readFile(String configFileName) { + String content = null; + File file = null; + + try { + file = ResourceUtils.getFile("classpath:" + configFileName); + content = new String(Files.readAllBytes(file.toPath())); + } catch (FileNotFoundException e) { + errorLogger.error("colud not find file :", file.getName()); + + } catch (IOException e) { + errorLogger.error("unable to read the file , reason:", e.getCause()); + } catch (Exception e) { + errorLogger.error("Exception occured , reason:", e.getMessage()); } - private static String executecurl(String url) { - - String[] command = {"curl", "-v", url}; - ProcessBuilder process = new ProcessBuilder(command); - Process p; - String result = null; - try { - p = process.start(); - InputStreamReader ipr = new InputStreamReader(p.getInputStream()); - BufferedReader reader = new BufferedReader(ipr); - StringBuilder builder = new StringBuilder(); - String line; - - while ((line = reader.readLine()) != null) { - builder.append(line); - } - result = builder.toString(); - reader.close(); - ipr.close(); - } catch (IOException e) { - errorLogger.error("error", e); - e.printStackTrace(); - } - return result; - + return content; + + } + + private static String executecurl(String url) { + + String[] command = {"curl", "-v", url}; + ProcessBuilder process = new ProcessBuilder(command); + Process p; + String result = null; + try { + p = process.start(); + InputStreamReader ipr = new InputStreamReader(p.getInputStream()); + BufferedReader reader = new BufferedReader(ipr); + StringBuilder builder = new StringBuilder(); + String line; + + while ((line = reader.readLine()) != null) { + builder.append(line); + } + result = builder.toString(); + reader.close(); + ipr.close(); + } catch (IOException e) { + errorLogger.error("error", e); + e.printStackTrace(); } + return result; + } + } -- 2.16.6