* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*
*******************************************************************************/
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.beans.DMaaPContext;
-import com.att.nsa.cambria.beans.TopicBean;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
-import com.att.nsa.cambria.exception.DMaaPErrorMessages;
-import com.att.nsa.cambria.exception.DMaaPResponseCode;
-import com.att.nsa.cambria.exception.ErrorResponse;
-import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
-import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
-import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
-import com.att.nsa.cambria.service.TopicService;
-import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.beans.DMaaPContext;
+import com.att.dmf.mr.beans.TopicBean;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
+import com.att.dmf.mr.exception.DMaaPErrorMessages;
+import com.att.dmf.mr.exception.DMaaPResponseCode;
+import com.att.dmf.mr.exception.ErrorResponse;
+import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
+import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
+import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
+import com.att.dmf.mr.service.TopicService;
+import com.att.dmf.mr.utils.ConfigurationReader;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
/**
- * This class is a CXF REST service which acts as gateway for MR Topic Service.
- *
- * @author author
+ * This class is a CXF REST service which acts
+ * as gateway for MR Topic Service.
+ * @author Ramkumar Sembaiyan
*
*/
/**
* Logger obj
*/
- // private static final Logger LOGGER = Logger
- // .getLogger(TopicRestService.class);
+ //private static final Logger LOGGER = Logger .getLogger(TopicRestService.class);
private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
/**
* Config Reader
* TopicService obj
*/
@Autowired
- private TopicService tService;
-
+ private TopicService topicService;
+
/**
* DMaaPErrorMessages obj
*/
@Autowired
private DMaaPErrorMessages errorMessages;
-
- private DMaaPContext dmaapContext = new DMaaPContext();
-
+
/**
* mrNamespace
*/
- // @Value("${msgRtr.namespace.aaf}")
- // private String mrNamespace;
+ //@Value("${msgRtr.namespace.aaf}")
+// private String mrNamespace;
+
/**
* Fetches a list of topics from the current kafka instance and converted
* into json object.
*
* @return list of the topics in json format
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
* @throws IOException
* @throws JSONException
- */
+ * */
@GET
- // @Produces(MediaType.TEXT_PLAIN)
+ //@Produces(MediaType.TEXT_PLAIN)
public void getTopics() throws CambriaApiException {
try {
-
+
LOGGER.info("Authenticating the user before fetching the topics");
- // String permission = "com.att.dmaap.mr.topic|*|view";
- String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.namespace.aaf");
- String permission = mrNameS + "|" + "*" + "|" + "view";
+ //String permission = "com.att.dmaap.mr.topic|*|view";
+ String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
+ String permission =mrNameS+"|"+"*"+"|"+"view";
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // Check if client is using AAF CADI Basic Authorization
- // If yes then check for AAF role authentication else display all
- // topics
- if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
- if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
+ //Check if client is using AAF CADI Basic Authorization
+ //If yes then check for AAF role authentication else display all topics
+ if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+ {
+ if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+ {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
LOGGER.info(errRes.toString());
throw new DMaaPAccessDeniedException(errRes);
-
+
+
}
- }
-
- LOGGER.info("Fetching all Topics");
-
- tService.getTopics(getDmaapContext());
-
- LOGGER.info("Returning List of all Topics");
-
+ }
+
+ LOGGER.info("Fetching all Topics");
+ //topicService = new com.att.dmf.mr.service.impl.TopicServiceImpl();
+ topicService.getTopics(getDmaapContext());
+
+ LOGGER.info("Returning List of all Topics");
+
+
} catch (JSONException | ConfigDbException | IOException excp) {
- LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
- errorMessages.getTopicsfailure() + excp.getMessage());
+ LOGGER.error(
+ "Failed to retrieve list of all topics: "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
+ errorMessages.getTopicsfailure()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
+
}
}
* into json object.
*
* @return list of the topics in json format
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
* @throws IOException
* @throws JSONException
- */
+ * */
@GET
@Path("/listAll")
- // @Produces(MediaType.TEXT_PLAIN)
+ //@Produces(MediaType.TEXT_PLAIN)
public void getAllTopics() throws CambriaApiException {
try {
-
+
LOGGER.info("Authenticating the user before fetching the topics");
- // String permission = "com.att.dmaap.mr.topic|*|view";
- String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.namespace.aaf");
- String permission = mrNameS + "|" + "*" + "|" + "view";
+ //String permission = "com.att.dmaap.mr.topic|*|view";
+ String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
+ String permission =mrNameS+"|"+"*"+"|"+"view";
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // Check if client is using AAF CADI Basic Authorization
- // If yes then check for AAF role authentication else display all
- // topics
- if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
- if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
+ //Check if client is using AAF CADI Basic Authorization
+ //If yes then check for AAF role authentication else display all topics
+ if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+ {
+ if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+ {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
LOGGER.info(errRes.toString());
throw new DMaaPAccessDeniedException(errRes);
-
+
+
}
- }
-
- LOGGER.info("Fetching all Topics");
-
- tService.getAllTopics(getDmaapContext());
-
- LOGGER.info("Returning List of all Topics");
-
+ }
+
+ LOGGER.info("Fetching all Topics");
+
+ topicService.getAllTopics(getDmaapContext());
+
+ LOGGER.info("Returning List of all Topics");
+
+
} catch (JSONException | ConfigDbException | IOException excp) {
- LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
- errorMessages.getTopicsfailure() + excp.getMessage());
+ LOGGER.error(
+ "Failed to retrieve list of all topics: "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
+ errorMessages.getTopicsfailure()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
+
}
}
+
/**
* Returns details of the topic whose name is passed as a parameter
*
* - name of the topic
* @return details of a topic whose name is mentioned in the request in json
* format.
- * @throws AccessDeniedException
- * @throws DMaaPAccessDeniedException
+ * @throws AccessDeniedException
+ * @throws DMaaPAccessDeniedException
* @throws IOException
- */
+ * */
@GET
@Path("/{topicName}")
- // @Produces(MediaType.TEXT_PLAIN)
+ //@Produces(MediaType.TEXT_PLAIN)
public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
try {
-
- LOGGER.info("Authenticating the user before fetching the details about topic = " + topicName);
+
+ LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName);
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-
- // String permission=
- // "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
-
- // Check if client is using AAF CADI Basic Authorization
- // If yes then check for AAF role authentication else display all
- // topics
- if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
+
+ //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
+
+ //Check if client is using AAF CADI Basic Authorization
+ //If yes then check for AAF role authentication else display all topics
+ if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+ {
String permission = aaf.aafPermissionString(topicName, "view");
- if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
- LOGGER.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
- }
- }
-
- LOGGER.info("Fetching Topic: " + topicName);
-
- tService.getTopic(getDmaapContext(), topicName);
-
- LOGGER.info("Fetched details of topic: " + topicName);
-
+ if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+ {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+ }
+ }
+
+ LOGGER.info("Fetching Topic: " + topicName);
+
+ topicService.getTopic(getDmaapContext(), topicName);
+
+ LOGGER.info("Fetched details of topic: " + topicName);
+
} catch (ConfigDbException | IOException | TopicExistsException excp) {
- LOGGER.error("Failed to retrieve details of topic: " + topicName, excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
- errorMessages.getTopicDetailsFail() + topicName + excp.getMessage());
+ LOGGER.error("Failed to retrieve details of topic: " + topicName,
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
+ errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
+
}
}
+
+
/**
* This method is still not working. Need to check on post call and how to
* accept parameters for post call
* it will have the bean object
* @throws TopicExistsException
* @throws CambriaApiException
- * @throws JSONException
+ * @throws JSONException
* @throws IOException
* @throws AccessDeniedException
*
- */
+ * */
@POST
@Path("/create")
@Consumes({ MediaType.APPLICATION_JSON })
- // @Produces(MediaType.TEXT_PLAIN)
- public void createTopic(TopicBean topicBean) throws CambriaApiException{
- try {
- LOGGER.info("Creating Topic." + topicBean.getTopicName());
-
- tService.createTopic(getDmaapContext(), topicBean);
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
+ try {
+ LOGGER.info("Creating Topic."+topicBean.getTopicName());
+
+ topicService.createTopic(getDmaapContext(), topicBean);
LOGGER.info("Topic created Successfully.");
- } catch (TopicExistsException ex) {
-
- LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail() + ex.getMessage());
- LOGGER.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
- } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail() + excp.getMessage());
+ }
+ catch (TopicExistsException ex){
+
+ LOGGER.error("Error while creating a topic: " + ex.getMessage(),
+ ex);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ ex.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+
+
+
+ }catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- } catch (CambriaApiException | IOException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail() + excp.getMessage());
+
+ }catch (CambriaApiException | IOException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
*
* @param topicName
* topic
- * @throws CambriaApiException
+ * @throws CambriaApiException
* @throws IOException
- */
+ * */
@DELETE
@Path("/{topicName}")
- // @Produces(MediaType.TEXT_PLAIN)
+ //@Produces(MediaType.TEXT_PLAIN)
public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
try {
LOGGER.info("Deleting Topic: " + topicName);
- tService.deleteTopic(getDmaapContext(), topicName);
+ topicService.deleteTopic(getDmaapContext(), topicName);
LOGGER.info("Topic [" + topicName + "] deleted successfully.");
- } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail() + excp.getMessage());
+ } catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) {
+
+ }catch (IOException | ConfigDbException
+ | CambriaApiException | TopicExistsException excp) {
LOGGER.error("Error while deleting topic: " + topicName, excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getDeleteTopicFail() + topicName + excp.getMessage());
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
private DMaaPContext getDmaapContext() {
+ DMaaPContext dmaapContext = new DMaaPContext();
dmaapContext.setRequest(request);
dmaapContext.setResponse(response);
dmaapContext.setConfigReader(configReader);
* This method will fetch the details of publisher by giving topic name
*
* @param topicName
- * @throws CambriaApiException
- * @throws AccessDeniedException
+ * @throws CambriaApiException
+ * @throws AccessDeniedException
*/
@GET
@Path("/{topicName}/producers")
- // @Produces(MediaType.TEXT_PLAIN)
- public void getPublishersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException {
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getPublishersByTopicName(
+ @PathParam("topicName") String topicName) throws CambriaApiException {
try {
-
- // String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // String permission = aaf.aafPermissionString(topicName, "view");
- // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
- // permission))
- // {
- LOGGER.info("Fetching list of all the publishers for topic " + topicName);
-
- tService.getPublishersByTopicName(getDmaapContext(), topicName);
-
- LOGGER.info("Returning list of all the publishers for topic " + topicName);
- // }else{
- // LOGGER.error("Error while fetching list of publishers for topic
- // "+ topicName);
- //
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" fetch list of publishers
- // "+errorMessages.getNotPermitted2());
- // LOGGER.info(errRes);
- // throw new DMaaPAccessDeniedException(errRes);
- //
- // }
-
+
+// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+// String permission = aaf.aafPermissionString(topicName, "view");
+// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+// {
+ LOGGER.info("Fetching list of all the publishers for topic "
+ + topicName);
+
+ topicService.getPublishersByTopicName(getDmaapContext(), topicName);
+
+ LOGGER.info("Returning list of all the publishers for topic "
+ + topicName);
+// }else{
+// LOGGER.error("Error while fetching list of publishers for topic "+ topicName);
+//
+// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+// errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2());
+// LOGGER.info(errRes);
+// throw new DMaaPAccessDeniedException(errRes);
+//
+// }
+
} catch (IOException | ConfigDbException | TopicExistsException excp) {
- LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
- "Error while fetching list of publishers for topic: " + topicName + excp.getMessage());
+ LOGGER.error("Error while fetching list of publishers for topic "
+ + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
+ "Error while fetching list of publishers for topic: "
+ + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
*
* @param topicName
* @param producerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@PUT
@Path("/{topicName}/producers/{producerId}")
- public void permitPublisherForTopic(@PathParam("topicName") String topicName,
+ public void permitPublisherForTopic(
+ @PathParam("topicName") String topicName,
@PathParam("producerId") String producerId) throws CambriaApiException {
try {
- LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
+ LOGGER.info("Granting write access to producer [" + producerId
+ + "] for topic " + topicName);
- tService.permitPublisherForTopic(getDmaapContext(), topicName, producerId);
+ topicService.permitPublisherForTopic(getDmaapContext(), topicName,
+ producerId);
- LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName);
+ LOGGER.info("Write access has been granted to producer ["
+ + producerId + "] for topic " + topicName);
} catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail() + excp.getMessage());
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- } catch (ConfigDbException | IOException | TopicExistsException excp) {
- LOGGER.error("Error while granting write access to producer [" + producerId + "] for topic " + topicName,
- excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
- "Error while granting write access to producer [" + producerId + "] for topic " + topicName
- + excp.getMessage());
+
+ }catch ( ConfigDbException | IOException
+ | TopicExistsException excp) {
+ LOGGER.error("Error while granting write access to producer ["
+ + producerId + "] for topic " + topicName, excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
+ "Error while granting write access to producer ["
+ + producerId + "] for topic " + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
*
* @param topicName
* @param producerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@DELETE
@Path("/{topicName}/producers/{producerId}")
public void denyPublisherForTopic(@PathParam("topicName") String topicName,
@PathParam("producerId") String producerId) throws CambriaApiException {
try {
- LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
+ LOGGER.info("Revoking write access to producer [" + producerId
+ + "] for topic " + topicName);
- tService.denyPublisherForTopic(getDmaapContext(), topicName, producerId);
+ topicService.denyPublisherForTopic(getDmaapContext(), topicName,
+ producerId);
- LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName);
+ LOGGER.info("Write access revoked for producer [" + producerId
+ + "] for topic " + topicName);
} catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail() + excp.getMessage());
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- } catch (ConfigDbException | IOException | TopicExistsException excp) {
- LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName,
- excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
- "Error while revoking write access to producer [" + producerId + "] for topic " + topicName
- + excp.getMessage());
+
+ }catch ( ConfigDbException | IOException
+ | TopicExistsException excp) {
+ LOGGER.error("Error while revoking write access for producer ["
+ + producerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
+ "Error while revoking write access to producer ["
+ + producerId + "] for topic " + topicName + excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
}
* Get the consumer details by the topic name
*
* @param topicName
- * @throws AccessDeniedException
- * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
*/
@GET
@Path("/{topicName}/consumers")
- // @Produces(MediaType.TEXT_PLAIN)
- public void getConsumersByTopicName(@PathParam("topicName") String topicName)
- throws CambriaApiException {
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException,
+ CambriaApiException {
try {
-
- // String permission =
- // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // String permission = aaf.aafPermissionString(topicName, "view");
- // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
- // permission))
- // {
- LOGGER.info("Fetching list of all consumers for topic " + topicName);
-
- tService.getConsumersByTopicName(getDmaapContext(), topicName);
-
- LOGGER.info("Returning list of all consumers for topic " + topicName);
-
- // }else{
- // LOGGER.error(
- // "Error while fetching list of all consumers for topic "
- // + topicName);
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" fetch list of consumers
- // "+errorMessages.getNotPermitted2());
- // LOGGER.info(errRes);
- // throw new DMaaPAccessDeniedException(errRes);
- //
- //
- // }
-
+
+
+// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
+// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+// String permission = aaf.aafPermissionString(topicName, "view");
+// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+// {
+ LOGGER.info("Fetching list of all consumers for topic " + topicName);
+
+ topicService.getConsumersByTopicName(getDmaapContext(), topicName);
+
+ LOGGER.info("Returning list of all consumers for topic "
+ + topicName);
+
+// }else{
+// LOGGER.error(
+// "Error while fetching list of all consumers for topic "
+// + topicName);
+// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+// errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2());
+// LOGGER.info(errRes);
+// throw new DMaaPAccessDeniedException(errRes);
+//
+//
+// }
+
+
+
} catch (IOException | ConfigDbException | TopicExistsException excp) {
- LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
- "Error while fetching list of all consumers for topic: " + topicName + excp.getMessage());
+ LOGGER.error(
+ "Error while fetching list of all consumers for topic "
+ + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
+ "Error while fetching list of all consumers for topic: "
+ + topicName+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
*
* @param topicName
* @param consumerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@PUT
@Path("/{topicName}/consumers/{consumerId}")
- public void permitConsumerForTopic(@PathParam("topicName") String topicName,
+ public void permitConsumerForTopic(
+ @PathParam("topicName") String topicName,
@PathParam("consumerId") String consumerId) throws CambriaApiException {
try {
- LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
-
- tService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId);
-
- LOGGER.info("Read access granted to consumer [" + consumerId + "] for topic " + topicName);
- } catch (AccessDeniedException | ConfigDbException | IOException | TopicExistsException excp) {
- LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName,
- excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
- "Error while granting read access to consumer [" + consumerId + "] for topic " + topicName
- + excp.getMessage());
+ LOGGER.info("Granting read access to consumer [" + consumerId
+ + "] for topic " + topicName);
+
+ topicService.permitConsumerForTopic(getDmaapContext(), topicName,
+ consumerId);
+
+ LOGGER.info("Read access granted to consumer [" + consumerId
+ + "] for topic " + topicName);
+ } catch (AccessDeniedException | ConfigDbException | IOException
+ | TopicExistsException excp) {
+ LOGGER.error("Error while granting read access to consumer ["
+ + consumerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
+ "Error while granting read access to consumer ["
+ + consumerId + "] for topic " + topicName+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
+
}
}
*
* @param topicName
* @param consumerId
- * @throws CambriaApiException
+ * @throws CambriaApiException
*/
@DELETE
@Path("/{topicName}/consumers/{consumerId}")
public void denyConsumerForTopic(@PathParam("topicName") String topicName,
@PathParam("consumerId") String consumerId) throws CambriaApiException {
try {
- LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
-
- tService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId);
-
- LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName);
- } catch (ConfigDbException | IOException | TopicExistsException excp) {
- LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName,
- excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
- "Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName
- + excp.getMessage());
+ LOGGER.info("Revoking read access to consumer [" + consumerId
+ + "] for topic " + topicName);
+
+ topicService.denyConsumerForTopic(getDmaapContext(), topicName,
+ consumerId);
+
+ LOGGER.info("Read access revoked to consumer [" + consumerId
+ + "] for topic " + topicName);
+ } catch ( ConfigDbException | IOException
+ | TopicExistsException excp) {
+ LOGGER.error("Error while revoking read access to consumer ["
+ + consumerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
+ "Error while revoking read access to consumer ["
+ + consumerId + "] for topic " + topicName+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
- } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
- LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
- errorMessages.getCreateTopicFail() + excp.getMessage());
+ }catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
+
+ }
+ }
- }
+ public TopicService getTopicService() {
+ return topicService;
+ }
+
+ public void setTopicService(TopicService topicService) {
+ this.topicService = topicService;
}
+
+
+
}