+
+ }
+
+ @Test
+ public void testSendBatchWithResponseConText() throws Exception {
+
+ setUp("text/plain");
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath());
+
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", msg1.toString());
+
+ final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
+ Assert.assertEquals(1, stuck.size());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponseContCambria() throws Exception {
+
+ setUp("application/cambria-zip");
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath());
+
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", msg1.toString());
+
+ final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
+ Assert.assertEquals(1, stuck.size());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponseProtKey() throws Exception {
+
+ setUp(null);
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath());
+ pub.setProtocolFlag(ProtocolTypeConstants.AUTH_KEY.getValue());
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", msg1.toString());
+
+ final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
+ Assert.assertEquals(1, stuck.size());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponseProtAaf() throws Exception {
+
+ setUp(null);
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath());
+ pub.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", msg1.toString());
+
+ final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
+ Assert.assertEquals(1, stuck.size());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponseProtNoAuth() throws Exception {
+
+ setUp(null);
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath());
+ pub.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue());
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", msg1.toString());
+
+ final List<message> stuck = pub.close(1, TimeUnit.SECONDS);
+ Assert.assertEquals(1, stuck.size());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponsecontypeText() throws Exception {
+
+ setUp("text/plain");
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath(), true);
+
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", "payload");
+ MRPublisherResponse pubResponse = new MRPublisherResponse();
+ pub.setPubResponse(pubResponse);
+
+ MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
+ Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponsecontypeCambria() throws Exception {
+
+ setUp("application/cambria-zip");
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath(), true);
+
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", "payload");
+ MRPublisherResponse pubResponse = new MRPublisherResponse();
+ pub.setPubResponse(pubResponse);
+
+ MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
+ Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponsePrAuthKey() throws Exception {
+
+ setUp(null);
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath(), true);
+ pub.setProtocolFlag(ProtocolTypeConstants.AUTH_KEY.getValue());
+
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", msg1.toString());
+ MRPublisherResponse pubResponse = new MRPublisherResponse();
+ pub.setPubResponse(pubResponse);
+
+ MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
+ Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponsePrAaf() throws Exception {
+
+ setUp(null);
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath(), true);
+ pub.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", msg1.toString());
+ MRPublisherResponse pubResponse = new MRPublisherResponse();
+ pub.setPubResponse(pubResponse);
+
+ MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
+ Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
+
+ }
+
+ @Test
+ public void testSendBatchWithResponsePrNoauth() throws Exception {
+
+ setUp(null);
+
+ final MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath(), true);
+ pub.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue());
+
+ // publish some messages
+ final JSONObject msg1 = new JSONObject();
+ pub.send("MyPartitionKey", msg1.toString());
+ MRPublisherResponse pubResponse = new MRPublisherResponse();
+ pub.setPubResponse(pubResponse);
+
+ MRPublisherResponse mrPublisherResponse = pub.sendBatchWithResponse();
+ Assert.assertEquals(1, mrPublisherResponse.getPendingMsgs());
+
+ }
+
+ @Test
+ public void createPublisherResponse() throws Exception{
+ setUp(null);
+ MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath(), true);
+
+ MRPublisherResponse response=pub.createMRPublisherResponse("{\"message\": \"published the message\", \"status\": \"200\"}", new MRPublisherResponse());
+ assertEquals("200", response.getResponseCode());
+
+ }
+
+ @Test
+ public void createPublisherResponseSucc() throws Exception{
+ setUp(null);
+ MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath(), true);
+
+ MRPublisherResponse response=pub.createMRPublisherResponse("{\"fakemessage\": \"published the message\", \"fakestatus\": \"200\"}", new MRPublisherResponse());
+ assertEquals("200", response.getResponseCode());
+
+ }
+
+ @Test
+ public void createPublisherResponseError() throws Exception{
+ setUp(null);
+ MRSimplerBatchPublisher pub = (MRSimplerBatchPublisher) MRClientFactory
+ .createBatchingPublisher(outFile.getPath(), true);
+
+ MRPublisherResponse response=pub.createMRPublisherResponse("", new MRPublisherResponse());
+ assertEquals("400", response.getResponseCode());