1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # -*- coding: utf-8 -*-
23 Provides utilities for cdap components
30 from dcae_cli.util.logger import get_logger
31 from dcae_cli.util.exc import DcaeException
32 from dcae_cli.util import discovery
34 _logger = get_logger('cdap-utils')
35 _logger.setLevel(logging.DEBUG)
38 def _merge_spec_config_into_broker_put(jar, config, spec, params, templated_conf):
40 The purpose of this function is to form the CDAP Broker PUT from the CDAP compponent jar, spec, config, and params, where:
42 - config is the CDAP "auxillary file"
43 - spec is the CDAP component specification
44 - params contains the subkeys "app_config", "app_preferences", "program_preferences" from the parameters config specification
45 - (this last one isn't REALLY needed because it is a subset of "spec", but some preprocessing has already been done, specifically "normalize_cdap_params"
47 The CDAP Broker API existed well before the component spec, so there is overlap with different naming.
48 In the future, if this component spec becomes production and everyone follows it,
49 I will change the broker API to use the same keys so that this mapping becomes unneccessary.
50 However, while this is still a moving project, I am simply going to do a horrible mapping here.
52 The CDAP broker PUT looks as follows:
54 "service_component_type" : ...,
56 "artifact_name" : ...,
57 "artifact_version" : ...,
59 "app_preferences" : ...,
60 "program_preferences": ...,
64 "service_endpoints" : ...
67 "So you cooked up a story and dropped the six of us into a *meat grinder*" - Arnold Schwarzenegger, Predator.
69 #RE: Streams/consumes: this is used in the designer for validation but does not lead to anything in the CDAP developers configuration.
72 #map services/provides into service_endpoints broker JSON
73 services = spec["services"]["provides"] # is [] if empty
77 se.append({"service_name" : s["service_name"], "service_endpoint" : s["service_endpoint"], "endpoint_method" : s["verb"]})
80 "cdap_application_type" : "program-flowlet", #TODO! Fix this once Hydrator apps is integrated into this CLI tool.
81 "service_component_type" : spec["self"]["component_type"],
83 "artifact_version" : config["artifact_version"],
84 "artifact_name" : config["artifact_name"],
85 "artifact_version" : config["artifact_version"],
86 "programs": config["programs"],
87 "streamname" : config["streamname"],
91 Optionals = {v : config[v] for v in [i for i in ["namespace"] if i in config]}
93 #not a fan of whatever is going on in update such that I can't return this in single line
94 BrokerPut.update(Optionals)
95 BrokerPut.update(params)
97 # NOTE: app_config comes from params
98 BrokerPut["app_config"]["services_calls"] = templated_conf["services_calls"]
99 BrokerPut["app_config"]["streams_publishes"] = templated_conf["streams_publishes"]
100 BrokerPut["app_config"]["streams_subscribes"] = templated_conf["streams_subscribes"]
104 def _get_broker_url_from_profile(profile):
106 Gets the broker URL from profile
108 #Functions named so well you don't need docstrings. (C) tombo 2017
109 res = requests.get("http://{0}:8500/v1/catalog/service/{1}".format(profile.consul_host, profile.cdap_broker)).json()
110 return "http://{ip}:{port}".format(ip=res[0]["ServiceAddress"], port=res[0]["ServicePort"])
113 def run_component(catalog, params, instance_name, profile, jar, config, spec, templated_conf):
115 Runs a CDAP Component
117 By the time this function is called, the instance_name and instance_name:rel have already been pushed into consul by this parent function
118 instance_name will be overwritten by the broker and the rels key will be used by the broker to call the CBS
120 broker_url = _get_broker_url_from_profile(profile)
122 #register with the broker
123 broker_put = _merge_spec_config_into_broker_put(jar, config, spec, params, templated_conf)
125 #helps the component developer debug their spec if CDAP throws a 400
126 _logger.info("Your (unbound, bound will be shown if deployment completes) app_config is being sent as")
127 _logger.info(json.dumps(broker_put["app_config"]))
129 _logger.info("Your app_preferences are being sent as")
130 _logger.info(json.dumps(broker_put["app_preferences"]))
132 _logger.info("Your program_preferences are being sent as")
133 _logger.info(json.dumps(broker_put["program_preferences"]))
135 response = requests.put("{brokerurl}/application/{appname}".format(brokerurl=broker_url, appname=instance_name),
137 headers = {'content-type':'application/json'})
139 deploy_success = False
141 response.raise_for_status() #bomb if not 2xx
142 deploy_success = True
144 #need this to raise a dirty status code for tests to work, so not just logging
145 raise DcaeException("A Deployment Error Occured. Broker Response: {0}, Broker Response Text: {1}".format(response.status_code, response.text))
148 #TODO: not sure what this error handling looks like, should never happen that a deploy succeeds but this get fails
149 #Get the cluster URL to tell the user to go check their application
150 response = requests.get(broker_url)
151 response.raise_for_status() #bomb if not 2xx
152 cdap_cluster = response.json()["managed cdap url"]
154 #Fetch the Application's AppConfig to show them what the bound config looks like:
155 #TODO: This should be an endpoint in the broker. I filed an issue in the broker. For now, do the horrendous special character mapping here.
156 #TODO: This only fetches AppConfig, add AppPreferences
157 ns = "default" if "namespace" not in broker_put else broker_put["namespace"]
158 mapped_appname = ''.join(e for e in instance_name if e.isalnum())
159 r = requests.get("{0}/v3/namespaces/{1}/apps/{2}".format(cdap_cluster, ns, mapped_appname)).json()
160 config = r["configuration"]
162 _logger.info("Deployment Complete!")
163 _logger.info("The CDAP cluster API is at {0}. The *GUI* Port is {1}. You may now go check your application there to confirm it is running correctly.".format(cdap_cluster, response.json()["cdap GUI port"]))
164 _logger.info("Your instance name is: {0}. In CDAP, this will appear as: {1}".format(instance_name, mapped_appname))
165 _logger.info("The bound Configuration for this application is: {0}".format(config))
167 #TODO: Should we tell the user about metrics and healthcheck to try those too?
169 def normalize_cdap_params(spec):
171 The CDAP component specification includes some optional fields that the broker expects.
172 This parses the specification, includes those fields if those are there, and sets the broker defaults otherwise
175 p = spec["parameters"]
177 Params["app_preferences"] = {} if "app_preferences" not in p else {param["name"] : param["value"] for param in p["app_preferences"]}
179 Params["app_config"] = {} if "app_config" not in p else {param["name"] : param["value"] for param in p["app_config"]}
181 if "program_preferences" not in p:
182 Params["program_preferences"] = []
184 Params["program_preferences"] = []
185 for tup in p["program_preferences"]:
186 Params["program_preferences"].append({"program_id" : tup["program_id"],
187 "program_type" : tup["program_type"],
188 "program_pref" : {param["name"] : param["value"] for param in tup["program_pref"]}})
191 def undeploy_component(profile, instance_name):
193 Undeploys a CDAP Component, which in CDAP terms means stop and delete
195 broker_url = _get_broker_url_from_profile(profile)
198 response = requests.delete("{brokerurl}/application/{appname}".format(brokerurl=broker_url, appname=instance_name))
200 response.raise_for_status() #bomb if not 2xx
201 _logger.info("Undeploy complete.")
203 except Exception as e:
204 _logger.error("An undeploy Error Occured: {2}. Broker Response: {0}, Broker Response Text: {1}".format(response.status_code, response.text, e))