* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*
*******************************************************************************/
-package com.att.nsa.cambria.beans;
+package com.att.dmf.mr.beans;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.json.JSONObject;
+import org.json.JSONArray;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.metabroker.Broker;
+import com.att.dmf.mr.metabroker.Broker1;
+import com.att.dmf.mr.metabroker.Topic;
+import com.att.dmf.mr.utils.ConfigurationReader;
//import org.apache.log4-j.Logger;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.metabroker.Broker;
-import com.att.nsa.cambria.metabroker.Topic;
-import com.att.nsa.cambria.utils.ConfigurationReader;
+//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
import com.att.nsa.configs.ConfigDb;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.configs.ConfigPath;
import com.att.nsa.security.NsaAclUtils;
import com.att.nsa.security.NsaApiKey;
-import kafka.admin.AdminUtils;
-import kafka.utils.ZKStringSerializer$;
/**
* class performing all topic operations
*
- * @author author
+ * @author anowarul.islam
*
*/
-
-public class DMaaPKafkaMetaBroker implements Broker {
+//@Component
+public class DMaaPKafkaMetaBroker implements Broker1 {
+
+ public DMaaPKafkaMetaBroker() {
+ fZk = null;
+ fCambriaConfig = null;
+ fBaseTopicData = null;
+ final Properties props = new Properties ();
+ String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
+
+ fkafkaBrokers = "localhost:9092";
+ }
+
+
+
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+ /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");*/
+ fKafkaAdminClient=AdminClient.create ( props );
+ // fKafkaAdminClient = null;
+ }
//private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
+ private final AdminClient fKafkaAdminClient;
+
/**
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
+ final Properties props = new Properties ();
+ String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
+
+ fkafkaBrokers = "localhost:9092";
+ }
+
+
+
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+ /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");*/
+ fKafkaAdminClient=AdminClient.create ( props );
+ // fKafkaAdminClient = null;
+
+
+ }
+
+ public DMaaPKafkaMetaBroker( rrNvReadable settings,
+ ZkClient zk, ConfigDb configDb,AdminClient client) {
+ //fSettings = settings;
+ fZk = zk;
+ fCambriaConfig = configDb;
+ fBaseTopicData = configDb.parse("/topics");
+ fKafkaAdminClient= client;
+ // fKafkaAdminClient = null;
+
+
}
@Override
for (String topic : topics) {
result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
}
-
JSONObject dataObj = new JSONObject();
dataObj.put("topics", new JSONObject());
*/
@Override
public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException {
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
log.info("Creating topic: " + topic);
try {
log.info("Check if topic [" + topic + "] exist.");
}
// create via kafka
- try {
- ZkClient zkClient = null;
- try {
- log.info("Loading zookeeper client for creating topic.");
- // FIXME: use of this scala module$ thing is a goofy hack to
- // make Kafka aware of the
- // topic creation. (Otherwise, the topic is only partially
- // created in ZK.)
- zkClient = ZkClientFactory.createZkClient();
- log.info("Zookeeper client loaded successfully. Creating topic.");
- AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties());
- } catch (kafka.common.TopicExistsException e) {
- log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e);
- throw new TopicExistsException(topic);
- } catch (ZkNoNodeException e) {
- log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e);
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "The Kafka cluster is not setup.");
- } catch (kafka.admin.AdminOperationException e) {
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(),
- e);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "The Kafka cluster can't handle your request. Talk to the administrators.");
- } finally {
- log.info("Closing zookeeper connection.");
- if (zkClient != null)
- zkClient.close();
+
+ try
+ {
+ final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
+ final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
+ final KafkaFuture<Void> ctrResult = ctr.all ();
+ ctrResult.get ();
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
}
-
- log.info("Creating topic entry for topic: " + topic);
- // underlying Kafka topic created. now setup our API info
- return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
- } catch (ConfigDbException excp) {
- log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "Failed to create topic data. Talk to the administrators.");
- }
+ catch ( InterruptedException e )
+ {
+ //timer.fail ( "Timeout" );
+ log.warn ( "Execution of describeTopics timed out." );
+ throw new ConfigDbException ( e );
+ }
+ catch ( ExecutionException e )
+ {
+ //timer.fail ( "ExecutionError" );
+ log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
+ throw new ConfigDbException ( e.getCause () );
+ }
+
}
@Override
- public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException {
+ public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
log.info("Deleting topic: " + topic);
ZkClient zkClient = null;
try {
log.info("Loading zookeeper client for topic deletion.");
- // FIXME: use of this scala module$ thing is a goofy hack to make
- // Kafka aware of the
- // topic creation. (Otherwise, the topic is only partially created
+ // topic creation. (Otherwise, the topic is only partially created
// in ZK.)
- zkClient = ZkClientFactory.createZkClient();
-
+ /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
+ ZKStringSerializer$.MODULE$);
+ String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
+ if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
+ ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
+ */
+
+ fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
- AdminUtils.deleteTopic(zkClient, topic);
- } catch (kafka.common.TopicExistsException e) {
+ //AdminUtils.deleteTopic(zkutils, topic);
+ } catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
- throw new TopicExistsException(topic);
- } catch (ZkNoNodeException e) {
- log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e);
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup.");
- } catch (kafka.admin.AdminOperationException e) {
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "The Kafka cluster can't handle your request. Talk to the administrators.");
- } finally {
+ throw new ConfigDbException(e);
+ } finally {
log.info("Closing zookeeper connection.");
if (zkClient != null)
zkClient.close();
// throw new UnsupportedOperationException ( "We can't programmatically
// delete Kafka topics yet." );
}
-
-
//private final rrNvReadable fSettings;
private final ZkClient fZk;
* class performing all user opearation like user is eligible to read,
* write. permitting a user to write and read,
*
- * @author author
+ * @author anowarul.islam
*
*/
public static class KafkaTopic implements Topic {
// created topics.
JSONObject readers = o.optJSONObject ( "readers" );
if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
- fReaders = fromJson ( readers );
+ fReaders = fromJson ( readers );
JSONObject writers = o.optJSONObject ( "writers" );
if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
fWriters = fromJson ( writers );
}
- private NsaAcl fromJson(JSONObject o) {
- NsaAcl acl = new NsaAcl();
- if (o != null) {
- JSONArray a = o.optJSONArray("allowed");
- if (a != null) {
- for (int i = 0; i < a.length(); ++i) {
- String user = a.getString(i);
- acl.add(user);
- }
+
+ private NsaAcl fromJson(JSONObject o) {
+ NsaAcl acl = new NsaAcl();
+ if (o != null) {
+ JSONArray a = o.optJSONArray("allowed");
+ if (a != null) {
+ for (int i = 0; i < a.length(); ++i) {
+ String user = a.getString(i);
+ acl.add(user);
}
}
- return acl;
}
+ return acl;
+ }
+
@Override
public String getName() {
return fName;
private final NsaAcl fReaders;
private final NsaAcl fWriters;
private boolean fTransactionEnabled;
-
+
public boolean isTransactionEnabled() {
return fTransactionEnabled;
}