ad925b1c5c662392da9200915c70b168ff8080e2
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.apache.http.HttpException;
44 import org.apache.http.HttpStatus;
45 import org.json.JSONArray;
46 import org.json.JSONObject;
47
48 import com.att.aft.dme2.api.DME2Client;
49 import com.att.aft.dme2.api.DME2Exception;
50 import com.att.nsa.mr.client.HostSelector;
51 import com.att.nsa.mr.client.MRBatchingPublisher;
52 import com.att.nsa.mr.client.response.MRPublisherResponse;
53 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
54
55 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
56 {
57         public static class Builder 
58         {
59                 public Builder ()
60                 {
61                 }
62
63                 public Builder againstUrls ( Collection<String> baseUrls )
64                 {
65                         fUrls = baseUrls;
66                         return this;
67                 }
68
69                 public Builder onTopic ( String topic )
70                 {
71                         fTopic = topic;
72                         return this;
73                 }
74
75                 public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
76                 {
77                         fMaxBatchSize = maxBatchSize;
78                         fMaxBatchAgeMs = maxBatchAgeMs;
79                         return this;
80                 }
81
82                 public Builder compress ( boolean compress )
83                 {
84                         fCompress = compress;
85                         return this;
86                 }
87                 
88                 public Builder httpThreadTime ( int threadOccuranceTime )
89                 {
90                         this.threadOccuranceTime = threadOccuranceTime;
91                         return this;
92                 }
93                 
94                 public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
95                 {
96                         fAllowSelfSignedCerts = allowSelfSignedCerts;
97                         return this;
98                 }
99                 
100                 public Builder withResponse ( boolean withResponse)
101                 {
102                         fWithResponse = withResponse;
103                         return this;
104                 }
105                 public MRSimplerBatchPublisher build ()
106                 {
107                         if(!fWithResponse) 
108                         {
109                                 try {
110                                         return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
111                                 } catch (MalformedURLException e) {
112                                         throw new RuntimeException(e);
113                                 }
114                         } else 
115                         {
116                                 try {
117                                         return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
118                                 } catch (MalformedURLException e) {
119                                         throw new RuntimeException(e);
120                                 }
121                         }
122                                 
123                 }
124
125                 private Collection<String> fUrls;
126                 private String fTopic;
127                 private int fMaxBatchSize = 100;
128                 private long fMaxBatchAgeMs = 1000;
129                 private boolean fCompress = false;
130                 private int threadOccuranceTime = 50;
131                 private boolean fAllowSelfSignedCerts = false;
132                 private boolean fWithResponse = false;
133                 
134         };
135
136         @Override
137         public int send ( String partition, String msg )
138         {
139                 return send ( new message ( partition, msg ) );
140         }
141         @Override
142         public int send ( String msg )
143         {
144                 return send ( new message ( null, msg ) );
145         }
146
147
148         @Override
149         public int send ( message msg )
150         {
151                 final LinkedList<message> list = new LinkedList<message> ();
152                 list.add ( msg );
153                 return send ( list );
154         }
155         
156         
157
158         @Override
159         public synchronized int send ( Collection<message> msgs )
160         {
161                 if ( fClosed )
162                 {
163                         throw new IllegalStateException ( "The publisher was closed." );
164                 }
165                 
166                 for ( message userMsg : msgs )
167                 {
168                         fPending.add ( new TimestampedMessage ( userMsg ) );
169                 }
170                 return getPendingMessageCount ();
171         }
172
173         @Override
174         public synchronized int getPendingMessageCount ()
175         {
176                 return fPending.size ();
177         }
178
179         @Override
180         public void close ()
181         {
182                 try
183                 {
184                         final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
185                         if ( remains.size() > 0 )
186                         {
187                                 getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
188                                         + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
189                         }
190                 }
191                 catch ( InterruptedException e )
192                 {
193                         getLog().warn ( "Possible message loss. " + e.getMessage(), e );
194                 }
195                 catch ( IOException e )
196                 {
197                         getLog().warn ( "Possible message loss. " + e.getMessage(), e );
198                 }
199         }
200
201         @Override
202         public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
203         {
204                 synchronized ( this )
205                 {
206                         fClosed = true;
207
208                         // stop the background sender
209                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
210                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
211                         fExec.shutdown ();
212                 }
213
214                 final long now = Clock.now ();
215                 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
216                 final long timeoutAtMs = now + waitInMs;
217
218                 while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
219                 {
220                         send ( true );
221                         Thread.sleep ( 250 );
222                 }
223
224                 synchronized ( this )
225                 {
226                         final LinkedList<message> result = new LinkedList<message> ();
227                         fPending.drainTo ( result );
228                         return result;
229                 }
230         }
231
232         /**
233          * Possibly send a batch to the MR server. This is called by the background thread
234          * and the close() method
235          * 
236          * @param force
237          */
238         private synchronized void send ( boolean force )
239         {
240                 if ( force || shouldSendNow () )
241                 {
242                         if ( !sendBatch () )
243                         {
244                                 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
245
246                                 // note the time for back-off
247                                 fDontSendUntilMs = sfWaitAfterError + Clock.now ();
248                         }
249                 }
250         }
251
252         private synchronized boolean shouldSendNow ()
253         {
254                 boolean shouldSend = false;
255                 if ( fPending.size () > 0 )
256                 {
257                         final long nowMs = Clock.now ();
258
259                         shouldSend = ( fPending.size() >= fMaxBatchSize );
260                         if ( !shouldSend )
261                         {
262                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
263                                 shouldSend = sendAtMs <= nowMs;
264                         }
265
266                         // however, wait after an error
267                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs; 
268                 }
269                 return shouldSend;
270         }
271
272         private synchronized boolean sendBatch ()
273         {
274                 // it's possible for this call to be made with an empty list. in this case, just return.
275                 if ( fPending.size() < 1 )
276                 {
277                         return true;
278                 }
279
280                 final long nowMs = Clock.now ();
281                 
282                 host = this.fHostSelector.selectBaseHost();
283                 
284                 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
285                 
286
287                 try
288                 {
289                         /*final String contentType =
290                                 fCompress ?
291                                         MRFormat.CAMBRIA_ZIP.toString () :
292                                         MRFormat.CAMBRIA.toString () 
293                         ;*/
294             
295                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
296                         OutputStream os = baseStream;
297                         final String contentType = props.getProperty("contenttype");
298                         if(contentType.equalsIgnoreCase("application/json")){
299                                 JSONArray jsonArray = new JSONArray();
300                                 for ( TimestampedMessage m : fPending )
301                                 {
302                                         JSONObject jsonObject = new JSONObject(m.fMsg);
303                                                                 
304                                                 jsonArray.put(jsonObject);
305                                 
306                                 }
307                                 os.write (jsonArray.toString().getBytes() );    
308                                 os.close();
309
310                                 }else if (contentType.equalsIgnoreCase("text/plain")){
311                                         for ( TimestampedMessage m : fPending )
312                                         {                                                                               
313                                                 os.write ( m.fMsg.getBytes() );
314                                                 os.write ( '\n' );
315                                         }
316                                         os.close ();
317                                 } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
318                                         if ( contentType.equalsIgnoreCase("application/cambria-zip") )
319                                         {
320                                                 os = new GZIPOutputStream ( baseStream );
321                                         }
322                                         for ( TimestampedMessage m : fPending )
323                                         {
324                                                 
325                                                 os.write ( ( "" + m.fPartition.length () ).getBytes() );
326                                                 os.write ( '.' );
327                                                 os.write ( ( "" + m.fMsg.length () ).getBytes() );
328                                                 os.write ( '.' );
329                                                 os.write ( m.fPartition.getBytes() );
330                                                 os.write ( m.fMsg.getBytes() );
331                                                 os.write ( '\n' );
332                                         }
333                                         os.close ();
334                                 }else{
335                                         for ( TimestampedMessage m : fPending )
336                                         {                                                                               
337                                                 os.write ( m.fMsg.getBytes() );
338                                         
339                                         }
340                                         os.close ();
341                                 }
342              
343                 
344
345                         final long startMs = Clock.now ();
346                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
347                                 
348                         
349                                 DME2Configue();
350                                 
351                                 Thread.sleep(5);
352                                 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
353                                 sender.setPayload(os.toString());               
354                                 String dmeResponse = sender.sendAndWait(5000L);
355                                 
356                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
357                                                 + dmeResponse.toString();
358                                 getLog().info(logLine);
359                                 fPending.clear();
360                                 return true;
361                         } 
362                         
363                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
364                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
365                                 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
366                                 //System.out.println(result.getInt("status"));
367                                 //Here we are checking for error response. If HTTP status
368                                 //code is not within the http success response code
369                                 //then we consider this as error and return false
370                                 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
371                                         return false;
372                                 }
373                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
374                                 getLog().info(logLine);
375                                 fPending.clear();
376                                 return true;
377                         } 
378                         
379                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
380                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
381                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
382                                 
383                                 
384                                 //System.out.println(result.getInt("status"));
385                                 //Here we are checking for error response. If HTTP status
386                                 //code is not within the http success response code
387                                 //then we consider this as error and return false
388                                 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
389                                         return false;
390                                 }
391                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
392                                 getLog().info(logLine);
393                                 fPending.clear();
394                                 return true;
395                         }
396                 }
397                 catch ( IllegalArgumentException x ) {
398                         getLog().warn ( x.getMessage(), x );
399                 } catch ( IOException x ) {
400                         getLog().warn ( x.getMessage(), x );
401                 } catch (HttpException x) {
402                         getLog().warn ( x.getMessage(), x );
403                 } catch (Exception x) {
404                         getLog().warn(x.getMessage(), x);
405                 }
406                 return false;
407         }
408
409         public synchronized MRPublisherResponse sendBatchWithResponse () 
410         {
411                 // it's possible for this call to be made with an empty list. in this case, just return.
412                 if ( fPending.size() < 1 )
413                 {
414                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
415                         pubResponse.setResponseMessage("No Messages to send");
416                         return pubResponse;
417                 }
418
419                 final long nowMs = Clock.now ();
420                 
421                 host = this.fHostSelector.selectBaseHost();
422                 
423                 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
424                 OutputStream os=null;
425                 try
426                 {
427                         
428                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
429                          os = baseStream;
430                         final String contentType = props.getProperty("contenttype");
431                         if(contentType.equalsIgnoreCase("application/json")){
432                                 JSONArray jsonArray = new JSONArray();
433                                 for ( TimestampedMessage m : fPending )
434                                 {
435                                         JSONObject jsonObject = new JSONObject(m.fMsg);
436                                                                 
437                                                 jsonArray.put(jsonObject);
438                                 
439                                 }
440                                 os.write (jsonArray.toString().getBytes() );    
441                                 }else if (contentType.equalsIgnoreCase("text/plain")){
442                                         for ( TimestampedMessage m : fPending )
443                                         {                                                                               
444                                                 os.write ( m.fMsg.getBytes() );
445                                                 os.write ( '\n' );
446                                         }
447                                 } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
448                                         if ( contentType.equalsIgnoreCase("application/cambria-zip") )
449                                         {
450                                                 os = new GZIPOutputStream ( baseStream );
451                                         }
452                                         for ( TimestampedMessage m : fPending )
453                                         {
454                                                 
455                                                 os.write ( ( "" + m.fPartition.length () ).getBytes() );
456                                                 os.write ( '.' );
457                                                 os.write ( ( "" + m.fMsg.length () ).getBytes() );
458                                                 os.write ( '.' );
459                                                 os.write ( m.fPartition.getBytes() );
460                                                 os.write ( m.fMsg.getBytes() );
461                                                 os.write ( '\n' );
462                                         }
463                                         os.close ();
464                                 }else{
465                                         for ( TimestampedMessage m : fPending )
466                                         {                                                                               
467                                                 os.write ( m.fMsg.getBytes() );
468                                         
469                                         }
470                                 }
471              
472                 
473
474                         final long startMs = Clock.now ();
475                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
476                                 
477                         
478                                 try {
479                                 DME2Configue();
480                                 
481                                 Thread.sleep(5);
482                                 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
483                                 sender.setPayload(os.toString());               
484                                                 
485                                 
486                                 String dmeResponse = sender.sendAndWait(5000L);
487                                 System.out.println("dmeres->"+dmeResponse);             
488                                 
489                                 
490                                 pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
491                                 
492                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
493                                         
494                                         return pubResponse;
495                                 }
496                                 final String logLine = String.valueOf((Clock.now() - startMs))
497                                                 + dmeResponse.toString();
498                                 getLog().info(logLine);
499                                 fPending.clear();
500                                 
501                                 }
502                                 catch (DME2Exception x) {
503                                         getLog().warn(x.getMessage(), x);
504                                         pubResponse.setResponseCode(x.getErrorCode());
505                                         pubResponse.setResponseMessage(x.getErrorMessage());
506                                 } catch (URISyntaxException x) {
507                                         
508                                         getLog().warn(x.getMessage(), x);
509                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
510                                         pubResponse.setResponseMessage(x.getMessage());
511                                 } catch (Exception x) {
512
513                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
514                                         pubResponse.setResponseMessage(x.getMessage());
515                                         
516                                 }
517                                 
518                                 return pubResponse;
519                         } 
520                         
521                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
522                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
523                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
524                                 //System.out.println(result.getInt("status"));
525                                 //Here we are checking for error response. If HTTP status
526                                 //code is not within the http success response code
527                                 //then we consider this as error and return false
528                                 
529                                 
530                                 pubResponse = createMRPublisherResponse(result,pubResponse);
531                                 
532                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
533                                         
534                                         return pubResponse;
535                                 }
536                                 
537                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
538                                 getLog().info(logLine);
539                                 fPending.clear();
540                                 return pubResponse;
541                         } 
542                         
543                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
544                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
545                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
546                                 
547                                 //System.out.println(result.getInt("status"));
548                                 //Here we are checking for error response. If HTTP status
549                                 //code is not within the http success response code
550                                 //then we consider this as error and return false
551                                 pubResponse = createMRPublisherResponse(result,pubResponse);
552                                 
553                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
554                                         
555                                         return pubResponse;
556                                 }
557                                 
558                                 final String logLine = String.valueOf((Clock.now() - startMs));
559                                 getLog().info(logLine);
560                                 fPending.clear();
561                                 return pubResponse;
562                         }
563                 }
564                 catch ( IllegalArgumentException x ) {
565                         getLog().warn ( x.getMessage(), x );
566                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
567                         pubResponse.setResponseMessage(x.getMessage());
568                         
569                 } catch ( IOException x ) {
570                         getLog().warn ( x.getMessage(), x );
571                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
572                         pubResponse.setResponseMessage(x.getMessage());
573                         
574                 } catch (HttpException x) {
575                         getLog().warn ( x.getMessage(), x );
576                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
577                         pubResponse.setResponseMessage(x.getMessage());
578                         
579                 } catch (Exception x) {
580                         getLog().warn(x.getMessage(), x);
581                         
582                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
583                         pubResponse.setResponseMessage(x.getMessage());
584                         
585                 }
586                 
587                 finally {
588                         if (fPending.size()>0) {
589                                 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
590                                 pubResponse.setPendingMsgs(fPending.size());
591                         }
592                         if (os != null) {
593                                 try {
594                                 os.close();
595                                 } catch (Exception x) {
596                                         getLog().warn(x.getMessage(), x);
597                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
598                                         pubResponse.setResponseMessage("Error in closing Output Stream");
599                                 }
600                                 }
601                 }
602                 
603                 return pubResponse;
604         }
605         
606 private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
607                 
608          if (reply.isEmpty()) 
609          {
610                  
611                  mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
612                  mrPubResponse.setResponseMessage("Please verify the Producer properties");
613          }
614          else if(reply.startsWith("{"))
615          {
616                         JSONObject jObject = new JSONObject(reply);
617                         if(jObject.has("message") && jObject.has("status"))
618                         {
619                                 String message = jObject.getString("message");
620                                 if(null != message)
621                                 {
622                                         mrPubResponse.setResponseMessage(message);      
623                                 }
624                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
625                         }
626                         else
627                          {
628                                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
629                                         mrPubResponse.setResponseMessage(reply);        
630                          }
631      }
632          else if (reply.startsWith("<"))
633          {
634                  String responseCode = getHTTPErrorResponseCode(reply);
635                  if( responseCode.contains("403"))
636                         {
637                          responseCode = "403";
638                         }       
639                  mrPubResponse.setResponseCode(responseCode);
640                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));   
641          }
642          
643                 return mrPubResponse;
644         }
645
646         private final String fTopic;
647         private final int fMaxBatchSize;
648         private final long fMaxBatchAgeMs;
649         private final boolean fCompress;
650         private int threadOccuranceTime;
651         private boolean fClosed;
652         private String username;
653         private String password;
654         private String host;
655         
656         //host selector
657         private HostSelector fHostSelector = null;
658         
659         private final LinkedBlockingQueue<TimestampedMessage> fPending;
660         private long fDontSendUntilMs;
661         private final ScheduledThreadPoolExecutor fExec;
662
663         private String latitude;
664         private String longitude;
665         private String version;
666         private String serviceName;
667         private String env;
668         private String partner;
669         private String routeOffer;
670         private String subContextPath;
671         private String protocol;
672         private String methodType;
673         private String url;
674         private String dmeuser;
675         private String dmepassword;
676         private String contentType;
677         private static final long sfWaitAfterError = 10000;
678         private HashMap<String, String> DMETimeOuts;
679         private DME2Client sender;
680         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
681         public String producerFilePath;
682         private String authKey;
683         private String authDate;
684         private String handlers;
685         private Properties props;
686         public static String routerFilePath;
687         public static Map<String, String> headers=new HashMap<String, String>();
688         public static MultivaluedMap<String, Object> headersMap;
689         
690         
691         private MRPublisherResponse pubResponse;
692         
693         public MRPublisherResponse getPubResponse() {
694                 return pubResponse;
695         }
696         public void setPubResponse(MRPublisherResponse pubResponse) {
697                 this.pubResponse = pubResponse;
698         }
699         
700         public static String getRouterFilePath() {
701                 return routerFilePath;
702         }
703
704         public static void setRouterFilePath(String routerFilePath) {
705                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
706         }
707
708         public Properties getProps() {
709                 return props;
710         }
711
712         public void setProps(Properties props) {
713                 this.props = props;
714         }
715
716         public String getProducerFilePath() {
717                 return producerFilePath;
718         }
719
720         public void setProducerFilePath(String producerFilePath) {
721                 this.producerFilePath = producerFilePath;
722         }
723
724         public String getProtocolFlag() {
725                 return protocolFlag;
726         }
727
728         public void setProtocolFlag(String protocolFlag) {
729                 this.protocolFlag = protocolFlag;
730         }
731         
732         
733         private void DME2Configue() throws Exception {
734                 try {
735                         
736                 /*      FileReader reader = new FileReader(new File (producerFilePath));
737                         Properties props = new Properties();            
738                         props.load(reader);*/
739                         latitude = props.getProperty("Latitude");
740                         longitude = props.getProperty("Longitude");
741                         version = props.getProperty("Version");
742                         serviceName = props.getProperty("ServiceName");
743                         env = props.getProperty("Environment");
744                         partner = props.getProperty("Partner");
745                         routeOffer = props.getProperty("routeOffer");
746                         subContextPath = props.getProperty("SubContextPath")+fTopic;
747                         /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
748                                 subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
749                         }*/                     
750                         protocol = props.getProperty("Protocol");
751                         methodType = props.getProperty("MethodType");
752                         dmeuser = props.getProperty("username");
753                         dmepassword = props.getProperty("password");
754                         contentType = props.getProperty("contenttype");
755                         handlers = props.getProperty("sessionstickinessrequired");
756                         routerFilePath= props.getProperty("DME2preferredRouterFilePath");
757                         
758                         /**
759                          * Changes to DME2Client url to use Partner for auto failover between data centers
760                          * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
761                          */
762                         
763
764                         String partitionKey = props.getProperty("partition");
765                         
766                         if (partner != null && !partner.isEmpty() ) 
767                         { 
768                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
769                 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
770                     url = url + "&partitionKey=" + partitionKey;
771                 }
772                         }
773                         else if (routeOffer!=null && !routeOffer.isEmpty()) 
774                         { 
775                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
776                 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
777                     url = url + "&partitionKey=" + partitionKey;
778                 }
779                         }
780                          
781                         DMETimeOuts = new HashMap<String, String>();
782                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
783                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
784                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
785                         DMETimeOuts.put("Content-Type", contentType);
786                         System.setProperty("AFT_LATITUDE", latitude);
787                         System.setProperty("AFT_LONGITUDE", longitude);
788                         System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
789                         //System.setProperty("DME2.DEBUG", "true");
790                 //      System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
791                         //System.out.println("XXXXXX"+url);
792                         
793                         //SSL changes
794                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
795                                         "SSLv3,TLSv1,TLSv1.1");
796                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
797                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
798                         
799                         //SSL changes
800                         
801                         sender = new DME2Client(new URI(url), 5000L);
802                                 
803                         sender.setAllowAllHttpReturnCodes(true);
804                         sender.setMethod(methodType);
805                         sender.setSubContext(subContextPath);   
806                         sender.setCredentials(dmeuser, dmepassword);
807                         sender.setHeaders(DMETimeOuts);
808                         if(handlers.equalsIgnoreCase("yes")){
809                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
810                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
811                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
812                                 }else{
813                                         sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
814                                 }
815                 } catch (DME2Exception x) {
816                         getLog().warn(x.getMessage(), x);
817                         throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
818                 } catch (URISyntaxException x) {
819                         
820                         getLog().warn(x.getMessage(), x);
821                         throw new URISyntaxException(url,x.getMessage());
822                 } catch (Exception x) {
823
824                         getLog().warn(x.getMessage(), x);
825                         throw new Exception(x.getMessage());
826                 }
827         }
828         
829         private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
830         {
831                 super ( hosts );
832
833                 if ( topic == null || topic.length() < 1 )
834                 {
835                         throw new IllegalArgumentException ( "A topic must be provided." );
836                 }
837                 
838                 fHostSelector = new HostSelector(hosts, null);
839                 fClosed = false;
840                 fTopic = topic;
841                 fMaxBatchSize = maxBatchSize;
842                 fMaxBatchAgeMs = maxBatchAgeMs;
843                 fCompress = compress;
844
845                 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
846                 fDontSendUntilMs = 0;
847                 fExec = new ScheduledThreadPoolExecutor ( 1 );
848                 pubResponse = new MRPublisherResponse();
849                 
850         }
851         
852         private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
853         {
854                 super ( hosts );
855
856                 if ( topic == null || topic.length() < 1 )
857                 {
858                         throw new IllegalArgumentException ( "A topic must be provided." );
859                 }
860                 
861                 fHostSelector = new HostSelector(hosts, null);
862                 fClosed = false;
863                 fTopic = topic;
864                 fMaxBatchSize = maxBatchSize;
865                 fMaxBatchAgeMs = maxBatchAgeMs;
866                 fCompress = compress;
867                 threadOccuranceTime=httpThreadOccurnace;
868                 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
869                 fDontSendUntilMs = 0;
870                 fExec = new ScheduledThreadPoolExecutor ( 1 );
871                 fExec.scheduleAtFixedRate ( new Runnable()
872                 {
873                         @Override
874                         public void run ()
875                         {
876                                 send ( false );
877                         }
878                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
879         }
880
881         private static class TimestampedMessage extends message
882         {
883                 public TimestampedMessage ( message m )
884                 {
885                         super ( m );
886                         timestamp = Clock.now();
887                 }
888                 public final long timestamp;
889         }
890
891         public String getUsername() {
892                 return username;
893         }
894
895         public void setUsername(String username) {
896                 this.username = username;
897         }
898
899         public String getPassword() {
900                 return password;
901         }
902
903         public void setPassword(String password) {
904                 this.password = password;
905         }
906
907         public String getHost() {
908                 return host;
909         }
910
911         public void setHost(String host) {
912                 this.host = host;
913         }
914
915         public String getContentType() {
916                 return contentType;
917         }
918
919         public void setContentType(String contentType) {
920                 this.contentType = contentType;
921         }
922
923         public String getAuthKey() {
924                 return authKey;
925         }
926
927         public void setAuthKey(String authKey) {
928                 this.authKey = authKey;
929         }
930
931         public String getAuthDate() {
932                 return authDate;
933         }
934
935         public void setAuthDate(String authDate) {
936                 this.authDate = authDate;
937         }
938         
939 }