X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fdmf%2Fmr%2Fbeans%2FDMaaPKafkaMetaBroker.java;fp=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fnsa%2Fcambria%2Fbeans%2FDMaaPKafkaMetaBroker.java;h=643eae9e269e0dca154fdea316e9b3a25e678e4c;hb=b32effcaf5684d5e2f338a4537b71a2375c534e5;hp=e7d777e4304bfb4bc00cf84b1bf8d7f334427439;hpb=0823cb186012c8e6b7de3d979dfabb9f838da7c2;p=dmaap%2Fmessagerouter%2Fmsgrtr.git diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java similarity index 71% rename from src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java rename to src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java index e7d777e..643eae9 100644 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -8,39 +8,49 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ -package com.att.nsa.cambria.beans; +package com.att.dmf.mr.beans; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.KafkaFuture; +import org.json.JSONObject; +import org.json.JSONArray; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.metabroker.Broker; +import com.att.dmf.mr.metabroker.Broker1; +import com.att.dmf.mr.metabroker.Topic; +import com.att.dmf.mr.utils.ConfigurationReader; //import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; - -import org.json.JSONArray; -import org.json.JSONObject; -import org.springframework.beans.factory.annotation.Qualifier; - -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.metabroker.Broker; -import com.att.nsa.cambria.metabroker.Topic; -import com.att.nsa.cambria.utils.ConfigurationReader; +//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil; import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; @@ -50,20 +60,42 @@ import com.att.nsa.security.NsaAcl; import com.att.nsa.security.NsaAclUtils; import com.att.nsa.security.NsaApiKey; -import kafka.admin.AdminUtils; -import kafka.utils.ZKStringSerializer$; /** * class performing all topic operations * - * @author author + * @author anowarul.islam * */ - -public class DMaaPKafkaMetaBroker implements Broker { +//@Component +public class DMaaPKafkaMetaBroker implements Broker1 { + + public DMaaPKafkaMetaBroker() { + fZk = null; + fCambriaConfig = null; + fBaseTopicData = null; + final Properties props = new Properties (); + String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { + + fkafkaBrokers = "localhost:9092"; + } + + + + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); + // fKafkaAdminClient = null; + } //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); + private final AdminClient fKafkaAdminClient; + /** @@ -79,6 +111,36 @@ public class DMaaPKafkaMetaBroker implements Broker { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); + final Properties props = new Properties (); + String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { + + fkafkaBrokers = "localhost:9092"; + } + + + + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); + // fKafkaAdminClient = null; + + + } + + public DMaaPKafkaMetaBroker( rrNvReadable settings, + ZkClient zk, ConfigDb configDb,AdminClient client) { + //fSettings = settings; + fZk = zk; + fCambriaConfig = configDb; + fBaseTopicData = configDb.parse("/topics"); + fKafkaAdminClient= client; + // fKafkaAdminClient = null; + + } @Override @@ -91,7 +153,6 @@ public class DMaaPKafkaMetaBroker implements Broker { for (String topic : topics) { result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); } - JSONObject dataObj = new JSONObject(); dataObj.put("topics", new JSONObject()); @@ -132,7 +193,7 @@ public class DMaaPKafkaMetaBroker implements Broker { */ @Override public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, - boolean transactionEnabled) throws TopicExistsException, CambriaApiException { + boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { log.info("Creating topic: " + topic); try { log.info("Check if topic [" + topic + "] exist."); @@ -162,78 +223,53 @@ public class DMaaPKafkaMetaBroker implements Broker { } // create via kafka - try { - ZkClient zkClient = null; - try { - log.info("Loading zookeeper client for creating topic."); - // FIXME: use of this scala module$ thing is a goofy hack to - // make Kafka aware of the - // topic creation. (Otherwise, the topic is only partially - // created in ZK.) - zkClient = ZkClientFactory.createZkClient(); - log.info("Zookeeper client loaded successfully. Creating topic."); - AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties()); - } catch (kafka.common.TopicExistsException e) { - log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e); - throw new TopicExistsException(topic); - } catch (ZkNoNodeException e) { - log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e); - // Kafka throws this when the server isn't running (and perhaps - // hasn't ever run) - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "The Kafka cluster is not setup."); - } catch (kafka.admin.AdminOperationException e) { - // Kafka throws this when the server isn't running (and perhaps - // hasn't ever run) - log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(), - e); - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "The Kafka cluster can't handle your request. Talk to the administrators."); - } finally { - log.info("Closing zookeeper connection."); - if (zkClient != null) - zkClient.close(); + + try + { + final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () ); + final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); + final KafkaFuture ctrResult = ctr.all (); + ctrResult.get (); + // underlying Kafka topic created. now setup our API info + return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled ); } - - log.info("Creating topic entry for topic: " + topic); - // underlying Kafka topic created. now setup our API info - return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); - } catch (ConfigDbException excp) { - log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp); - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "Failed to create topic data. Talk to the administrators."); - } + catch ( InterruptedException e ) + { + //timer.fail ( "Timeout" ); + log.warn ( "Execution of describeTopics timed out." ); + throw new ConfigDbException ( e ); + } + catch ( ExecutionException e ) + { + //timer.fail ( "ExecutionError" ); + log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); + throw new ConfigDbException ( e.getCause () ); + } + } @Override - public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException { + public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException { log.info("Deleting topic: " + topic); ZkClient zkClient = null; try { log.info("Loading zookeeper client for topic deletion."); - // FIXME: use of this scala module$ thing is a goofy hack to make - // Kafka aware of the - // topic creation. (Otherwise, the topic is only partially created + // topic creation. (Otherwise, the topic is only partially created // in ZK.) - zkClient = ZkClientFactory.createZkClient(); - + /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, + ZKStringSerializer$.MODULE$); + String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); + if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; + ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false); + */ + + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - AdminUtils.deleteTopic(zkClient, topic); - } catch (kafka.common.TopicExistsException e) { + //AdminUtils.deleteTopic(zkutils, topic); + } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); - throw new TopicExistsException(topic); - } catch (ZkNoNodeException e) { - log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e); - // Kafka throws this when the server isn't running (and perhaps - // hasn't ever run) - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup."); - } catch (kafka.admin.AdminOperationException e) { - // Kafka throws this when the server isn't running (and perhaps - // hasn't ever run) - log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e); - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "The Kafka cluster can't handle your request. Talk to the administrators."); - } finally { + throw new ConfigDbException(e); + } finally { log.info("Closing zookeeper connection."); if (zkClient != null) zkClient.close(); @@ -242,8 +278,6 @@ public class DMaaPKafkaMetaBroker implements Broker { // throw new UnsupportedOperationException ( "We can't programmatically // delete Kafka topics yet." ); } - - //private final rrNvReadable fSettings; private final ZkClient fZk; @@ -295,7 +329,7 @@ public class DMaaPKafkaMetaBroker implements Broker { * class performing all user opearation like user is eligible to read, * write. permitting a user to write and read, * - * @author author + * @author anowarul.islam * */ public static class KafkaTopic implements Topic { @@ -328,25 +362,27 @@ public class DMaaPKafkaMetaBroker implements Broker { // created topics. JSONObject readers = o.optJSONObject ( "readers" ); if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl; - fReaders = fromJson ( readers ); + fReaders = fromJson ( readers ); JSONObject writers = o.optJSONObject ( "writers" ); if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl; fWriters = fromJson ( writers ); } - private NsaAcl fromJson(JSONObject o) { - NsaAcl acl = new NsaAcl(); - if (o != null) { - JSONArray a = o.optJSONArray("allowed"); - if (a != null) { - for (int i = 0; i < a.length(); ++i) { - String user = a.getString(i); - acl.add(user); - } + + private NsaAcl fromJson(JSONObject o) { + NsaAcl acl = new NsaAcl(); + if (o != null) { + JSONArray a = o.optJSONArray("allowed"); + if (a != null) { + for (int i = 0; i < a.length(); ++i) { + String user = a.getString(i); + acl.add(user); } } - return acl; } + return acl; + } + @Override public String getName() { return fName; @@ -445,7 +481,7 @@ public class DMaaPKafkaMetaBroker implements Broker { private final NsaAcl fReaders; private final NsaAcl fWriters; private boolean fTransactionEnabled; - + public boolean isTransactionEnabled() { return fTransactionEnabled; }