# Data Router SUbscrihers
- def add_subscriber(self, feed_id, location, delivery_url, username, password, status=None):
+ def add_subscriber(self, feed_id, location, delivery_url, username, password, decompress, privileged, status=None):
'''
Add a publisher to feed feed_id at location location with user, pass, and status
'''
'dcaeLocationName' : location,
'deliveryURL' : delivery_url,
'username' : username,
- 'userpwd' : password
+ 'userpwd' : password,
+ 'decompress': decompress,
+ 'privilegedSubscriber': privileged
}
if status:
# Get the parameters for the call
feed_id = ctx.target.instance.runtime_properties["feed_id"]
- location = ctx.source.instance.runtime_properties[target_feed]["location"]
- delivery_url = ctx.source.instance.runtime_properties[target_feed]["delivery_url"]
- username = ctx.source.instance.runtime_properties[target_feed]["username"]
- password = ctx.source.instance.runtime_properties[target_feed]["password"]
+ feed = ctx.source.instance.runtime_properties[target_feed]
+ location = feed["location"]
+ delivery_url = feed["delivery_url"]
+ username = feed["username"]
+ password = feed["password"]
+ decompress = feed["decompress"] if "decompress" in feed else False
+ privileged = feed["privileged"] if "privileged" in feed else False
# Make the request to add the subscriber to the feed
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
- add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password)
+ add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged)
add_sub.raise_for_status()
subscriber_info = add_sub.json()
subscriber_id = subscriber_info["subId"]
"location" : location,
"delivery_url" : delivery_url,
"username" : username,
- "password" : password
+ "password" : password,
+ "decompress": decompress,
+ "privilegedSubscriber": privileged
}
ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))