1 # -------------------------------------------------------------------------
2 # Copyright (c) 2015-2017 AT&T Intellectual Property
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # -------------------------------------------------------------------------
20 OSDF Manager Main Flask Application
24 from threading import Thread # for scaling up, may need celery with RabbitMQ or redis
26 from flask import Flask, request, Response, g
31 import osdf.adapters.policy.interface
32 import osdf.config.credentials
33 import osdf.config.loader
34 import osdf.operation.error_handling
35 import osdf.operation.responses
37 from osdf.adapters.policy.interface import get_policies
38 from osdf.config.base import osdf_config
39 from osdf.optimizers.placementopt.conductor.remote_opt_processor import process_placement_opt
40 from osdf.webapp.appcontroller import auth_basic
41 from optparse import OptionParser
42 from osdf.operation.exceptions import BusinessException
43 from osdf.operation.error_handling import request_exception_to_json_body, internal_error_message
44 from requests import RequestException
45 from schematics.exceptions import DataError
46 from osdf.logging.osdf_logging import MH, audit_log, error_log, debug_log
47 from osdf.models.api.placementRequest import PlacementAPI
49 ERROR_TEMPLATE = osdf.ERROR_TEMPLATE
53 BAD_CLIENT_REQUEST_MESSAGE = 'Client sent an invalid request'
56 @app.errorhandler(BusinessException)
57 def handle_business_exception(e):
58 """An exception explicitly raised due to some business rule"""
59 error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc()))
60 err_msg = ERROR_TEMPLATE.render(description=str(e))
61 response = Response(err_msg, content_type='application/json; charset=utf-8')
62 response.status_code = 400
66 @app.errorhandler(RequestException)
67 def handle_request_exception(e):
68 """Returns a detailed synchronous message to the calling client
69 when osdf fails due to a remote call to another system"""
70 error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc()))
71 err_msg = request_exception_to_json_body(e)
72 response = Response(err_msg, content_type='application/json; charset=utf-8')
73 response.status_code = 400
77 @app.errorhandler(DataError)
78 def handle_data_error(e):
79 """Returns a detailed message to the calling client when the initial synchronous message is invalid"""
80 error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc()))
84 "text": BAD_CLIENT_REQUEST_MESSAGE,
85 "exceptionMessage": str(e.errors),
86 "errorType": "InvalidClientRequest"
90 body_as_json = json.dumps(body_dictionary)
91 response = Response(body_as_json, content_type='application/json; charset=utf-8')
92 response.status_code = 400
96 @app.route("/api/oof/v1/placement", methods=["POST"])
97 @auth_basic.login_required
98 def do_placement_opt():
99 """Perform placement optimization after validating the request and fetching policies
100 Make a call to the call-back URL with the output of the placement request.
101 Note: Call to Conductor for placement optimization may have redirects, so account for them
103 request_json = request.get_json()
104 req_id = request_json['requestInfo']['requestId']
105 g.request_id = req_id
106 audit_log.info(MH.received_request(request.url, request.remote_addr, json.dumps(request_json)))
108 PlacementAPI(request_json).validate()
110 # Currently policies are being used only during placement, so only fetch them if placement demands is not empty
111 policies, prov_status = {}, None
113 if 'placementDemand' in request_json['placementInfo']['demandInfo']:
114 policies, prov_status = get_policies(request_json, "placement")
116 audit_log.info(MH.new_worker_thread(req_id, "[for placement]"))
117 t = Thread(target=process_placement_opt, args=(request_json, policies, osdf_config, prov_status))
119 audit_log.info(MH.accepted_valid_request(req_id, request))
120 return osdf.operation.responses.osdf_response_for_request_accept(
121 req_id=req_id, text="Accepted placement request. Response will be posted to callback URL")
124 @app.errorhandler(500)
125 def internal_failure(error):
126 """Returned when unexpected coding errors occur during initial synchronous processing"""
127 error_log.error("Synchronous error for request id {} {}".format(g.request_id, traceback.format_exc()))
128 response = Response(internal_error_message, content_type='application/json; charset=utf-8')
129 response.status_code = 500
133 def get_options(argv):
134 program_version_string = '%%prog %s' % "v1.0"
135 program_longdesc = ""
138 parser = OptionParser(version=program_version_string, epilog=program_longdesc, description=program_license)
139 parser.add_option("-l", "--local", dest="local", help="run locally", action="store_true", default=False)
140 parser.add_option("-t", "--devtest", dest="devtest", help="run in dev/test environment", action="store_true",
142 parser.add_option("-d", "--debughost", dest="debughost", help="IP Address of host running debug server", default='')
143 parser.add_option("-p", "--debugport", dest="debugport", help="Port number of debug server", type=int, default=5678)
144 opts, args = parser.parse_args(argv)
147 debug_log.debug('pydevd.settrace({}, port={})'.format(opts.debughost, opts.debugport))
148 pydevd.settrace(opts.debughost, port=opts.debugport)
152 if __name__ == "__main__":
154 sys_conf = osdf_config['core']['osdf_system']
155 ports = sys_conf['osdf_ports']
156 internal_port, external_port = ports['internal'], ports['external']
158 local_host = sys_conf['osdf_ip_default']
159 common_app_opts = dict(host=local_host, threaded=True, use_reloader=False)
161 ssl_opts = sys_conf.get('ssl_context')
163 common_app_opts.update({'ssl_context': tuple(ssl_opts)})
165 opts = get_options(sys.argv)
166 if not opts.local and not opts.devtest: # normal deployment
167 app.run(port=internal_port, debug=False, **common_app_opts)
169 port = internal_port if opts.local else external_port
170 app.run(port=port, debug=True, **common_app_opts)