Fixes for security voilations
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / backends / memory / MemoryConsumerFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.backends.memory;
23
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.HashMap;
27
28 import com.att.dmf.mr.CambriaApiException;
29 import com.att.dmf.mr.backends.Consumer;
30 import com.att.dmf.mr.backends.ConsumerFactory;
31 /**
32  * 
33  * @author anowarul.islam
34  *
35  */
36 public class MemoryConsumerFactory implements ConsumerFactory
37 {
38         /**
39          * 
40          * Initializing constructor
41          * @param q
42          */
43         public MemoryConsumerFactory ( MemoryQueue q )
44         {
45                 fQueue = q;
46         }
47
48         /**
49          * 
50          * @param topic
51          * @param consumerGroupId
52          * @param clientId
53          * @param timeoutMs
54          * @return Consumer
55          */
56         @Override
57         public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs, String remotehost )
58         {
59                 return new MemoryConsumer ( topic, consumerGroupId );
60         }
61
62         private final MemoryQueue fQueue;
63
64         /**
65          * 
66          * Define nested inner class
67          *
68          */
69         private class MemoryConsumer implements Consumer
70         {
71                 /**
72                  * 
73                  * Initializing MemoryConsumer constructor 
74                  * @param topic
75                  * @param consumer
76                  * 
77                  */
78                 public MemoryConsumer ( String topic, String consumer )
79                 {
80                         fTopic = topic;
81                         fConsumer = consumer;
82                         fCreateMs = System.currentTimeMillis ();
83                         fLastAccessMs = fCreateMs;
84                 }
85
86                 @Override
87                 /**
88                  * 
89                  * return consumer details  
90                  */
91                 public Message nextMessage ()
92                 {
93                         return fQueue.get ( fTopic, fConsumer );
94                 }
95
96                 private final String fTopic;
97                 private final String fConsumer;
98                 private final long fCreateMs;
99                 private long fLastAccessMs;
100
101                 @Override
102                 public boolean close() {
103                         //Nothing to close/clean up.
104                         return true;
105                 }
106                 /**
107                  * 
108                  */
109                 public void commitOffsets()
110                 {
111                         // ignoring this aspect
112                 }
113                 /**
114                  * get offset
115                  */
116                 public long getOffset()
117                 {
118                         return 0;
119                 }
120
121                 @Override
122                 /**
123                  * get consumer topic name
124                  */
125                 public String getName ()
126                 {
127                         return fTopic + "/" + fConsumer;
128                 }
129
130                 @Override
131                 public long getCreateTimeMs ()
132                 {
133                         return fCreateMs;
134                 }
135
136                 @Override
137                 public long getLastAccessMs ()
138                 {
139                         return fLastAccessMs;
140                 }
141
142                 
143
144                 @Override
145                 public void setOffset(long offset) {
146                         // TODO Auto-generated method stub
147                         
148                 }
149
150                 
151         }
152
153         @Override
154         public void destroyConsumer(String topic, String consumerGroupId,
155                         String clientId) {
156                 //No cache for memory consumers, so NOOP
157         }
158
159         @Override
160         public void dropCache ()
161         {
162                 // nothing to do - there's no cache here
163         }
164
165         @Override
166         /**
167          * @return ArrayList<MemoryConsumer>
168          */
169         public Collection<? extends Consumer> getConsumers ()
170         {
171                 return new ArrayList<MemoryConsumer> ();
172         }
173
174         @Override
175         public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
176                         String remotehost) throws UnavailableException, CambriaApiException {
177                 // TODO Auto-generated method stub
178                 return null;
179         }
180
181         
182 }