Add dcae-cli and component-json-schemas projects
[dcaegen2/platform/cli.git] / dcae-cli / dcae_cli / catalog / mock / catalog.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # -*- coding: utf-8 -*-
22 """
23 Provides the mock catalog
24 """
25 import os
26 import json
27 import contextlib
28 import itertools
29 from functools import partial
30 from datetime import datetime
31
32 import six
33
34 from sqlalchemy import create_engine as create_engine_, event, and_, or_
35 from sqlalchemy.exc import IntegrityError
36 from sqlalchemy.orm import sessionmaker
37 from sqlalchemy.orm.exc import NoResultFound
38 from sqlalchemy_utils import database_exists, create_database, drop_database
39
40 from dcae_cli import _version as cli_version
41 from dcae_cli.catalog.mock.tables import Component, Format, FormatPair, Base
42 from dcae_cli.catalog.mock.schema import validate_component, validate_format, apply_defaults_docker_config
43 from dcae_cli.util import reraise_with_msg, get_app_dir
44 from dcae_cli.util.config import get_config, get_path_component_spec, \
45     get_path_data_format
46 from dcae_cli.util.logger import get_logger
47 from dcae_cli.util.docker_util import image_exists
48 from dcae_cli.catalog.exc import CatalogError, DuplicateEntry, MissingEntry, FrozenEntry
49 from dcae_cli.util.cdap_util import normalize_cdap_params
50
51
52 logger = get_logger('Catalog')
53
54
55 #INTERNAL HELPERS
56 def _get_component(session, name, version):
57     '''Returns a single component ORM'''
58     try:
59         if not version:
60             query = session.query(Component).filter(Component.name==name).order_by(Component.version.desc()).limit(1)
61         else:
62             query = session.query(Component).filter(Component.name==name, Component.version==version)
63         return query.one()
64     except NoResultFound:
65         comp_msg = "{}:{}".format(name, version) if version else name
66         raise MissingEntry("Component '{}' was not found in the catalog".format(comp_msg))
67
68 def _get_docker_image_from_spec(spec):
69     images = [ art["uri"] for art in spec["artifacts"] if art["type"] == "docker image" ]
70     return images[0]
71
72 def _get_docker_image(session, name, version):
73     '''Returns the docker image name of a given component'''
74     comp = _get_component(session, name, version)
75     return _get_docker_image_from_spec(comp.get_spec_as_dict())
76
77 def _add_docker_component(session, user, spec, update, enforce_image=True):
78     '''Adds/updates a docker component to the catalog'''
79     image = _get_docker_image_from_spec(spec)
80
81     if enforce_image and not image_exists(image):
82         raise CatalogError("Specified image '{}' does not exist locally.".format(image))
83
84     comp = build_generic_component(session, user, spec, update)
85     session.commit()
86
87 def _get_cdap_jar_from_spec(spec):
88     jars = [ art["uri"] for art in spec["artifacts"] if art["type"] == "jar" ]
89     return jars[0]
90
91 def _add_cdap_component(session, user, spec, update):
92     '''Adds/updates a cdap component to the catalog'''
93     comp = build_generic_component(session, user, spec, update)
94     session.commit()
95
96
97 #PUBLIC FUNCTIONS
98 @contextlib.contextmanager
99 def SessionTransaction(engine):
100     '''Provides a transactional scope around a series of operations'''
101     Session = sessionmaker(engine)
102     try:
103         session = Session()
104         yield session
105         session.commit()
106     except IntegrityError as e:
107         session.rollback()
108         _raise_if_duplicate(str(engine.url), e)
109         raise
110     except Exception:
111         session.rollback()
112         raise
113     finally:
114         session.close()
115
116
117 _dup_e = DuplicateEntry('Entry already exists. Try using the --update flag.')
118
119 def _raise_if_duplicate(url, e):
120     '''Raises if the exception relates to duplicate entries'''
121     if 'sqlite' in url:
122         if 'UNIQUE' in e.orig.args[0].upper():
123             raise _dup_e
124     elif 'postgres' in url:
125         # e.orig is of type psycopg2.IntegrityError that has 
126         # pgcode which uses the following:
127         #
128         # https://www.postgresql.org/docs/current/static/errcodes-appendix.html#ERRCODES-TABLE
129         #
130         # 23505 means "unique_violation"
131         if e.orig.pgcode == "23505":
132             raise _dup_e
133
134 def create_engine(base, db_name=None, purge_existing=False, db_url=None):
135     '''Returns an initialized database engine'''
136     if db_url is None:
137         if db_name is None:
138             # no url or db name indicates we want to use the tool's configured db
139             config = get_config()
140             url = config['db_url']
141         else:
142             # if only a db name is given, interpret as a sqlite db in the app dir. this maintains backwards compat with existing tests.
143             db_path = os.path.join(get_app_dir(), db_name)
144             url = ''.join(('sqlite:///', db_path))
145     else:
146         # a full db url is the most explicit input and should be used over other inputs if provided
147         url = db_url
148
149     if not database_exists(url):
150         create_database(url)
151     elif purge_existing:
152         drop_database(url)
153         create_database(url)
154
155     engine = create_engine_(url)
156     _configure_engine(engine)
157     base.metadata.create_all(engine)
158     return engine
159
160
161 def _configure_engine(engine):
162     '''Performs additional db-specific configurations'''
163     str_url = str(engine.url)
164     if 'sqlite' in str_url:
165         event.listen(engine, 'connect', lambda conn, record: conn.execute('pragma foreign_keys=ON'))
166
167
168 def get_format(session, name, version):
169     '''Returns a single data format ORM'''
170     try:
171         if not version:
172             query = session.query(Format).filter(Format.name==name).order_by(Format.version.desc()).limit(1)
173         else:
174             query = session.query(Format).filter(Format.name==name, Format.version==version)
175         return query.one()
176     except NoResultFound:
177         msg = "{}:{}".format(name, version) if version else name
178         raise MissingEntry("Data format '{}' was not found in the catalog.".format(msg))
179
180 def _create_format_tuple(entry):
181     '''Create tuple to identify format'''
182     return (entry['format'], entry['version'])
183
184
185 def _get_format_pair(session, req_name, req_version, resp_name, resp_version, create=True):
186     '''Returns a single data format pair ORM'''
187     req = get_format(session, req_name, req_version)
188     resp = get_format(session, resp_name, resp_version)
189
190     query = session.query(FormatPair).filter(and_(FormatPair.req == req, FormatPair.resp == resp))
191     try:
192         return query.one()
193     except NoResultFound:
194         if not create:
195             raise MissingEntry("Data format pair with request '{}:{}' and response '{}:{}' was not found in the catalog.".format(req.name, req.version, resp.name, resp.version))
196
197     pair = FormatPair(req=req, resp=resp)
198     session.add(pair)
199     return pair
200
201 def _create_format_pair_tuple(entry):
202     '''Create tuple to identify format pair'''
203     req_name, req_version = entry['request']['format'], entry['request']['version']
204     resp_name, resp_version = entry['response']['format'], entry['response']['version']
205     return (req_name, req_version, resp_name, resp_version)
206
207 def _get_unique_format_things(create_tuple, get_func, entries):
208     '''Get unique format things (formats, format pairs, ..)
209
210     Args
211     ----
212     create_tuple: Function that has the signature dict->tuple
213     get_func: Function that has the signature *tuple->orm
214     entries: list of dicts that have data format details that come from
215     streams.publishes, streams.subscribes, services.calls, services.provides
216
217     Return
218     ------
219     List of unique orms
220     '''
221     src = set(create_tuple(entry) for entry in entries)
222     return [get_func(*yo) for yo in src]
223
224
225 def verify_component(session, name, version):
226     '''Returns the orm name and version of a given component'''
227     orm = _get_component(session, name, version)
228     return orm.name, orm.version
229
230
231 def get_component_type(session, name, version):
232     '''Returns the component_type of a given component'''
233     return _get_component(session, name, version).component_type
234
235
236 def get_component_spec(session, name, version):
237     '''Returns the spec dict of a given component'''
238     return json.loads(_get_component(session, name, version).spec)
239
240
241 def get_format_spec(session, name, version):
242     '''Returns the spec dict of a given data format'''
243     return json.loads(get_format(session, name, version).spec)
244
245
246 def build_generic_component(session, user, spec, update):
247     '''Builds, adds, and returns a generic component ORM. Does not commit changes.'''
248     attrs = spec['self'].copy()
249     attrs['spec'] = json.dumps(spec)
250
251     # TODO: This should really come from the spec too
252     attrs['owner'] = user
253
254     # grab existing or create a new component
255     name, version = attrs['name'], attrs['version']
256     if update:
257         comp = _get_component(session, name, version)
258         if comp.is_published():
259             raise FrozenEntry("Component '{}:{}' has been pushed and cannot be updated".format(name, version))
260     else:
261         comp = Component()
262         session.add(comp)
263
264     # REVIEW: Inject these parameters as function arguments instead of this
265     # hidden approach?
266     # WATCH: This has to be done here before the code below because there is a
267     # commit somewhere below and since these fields are not nullable, you'll get a
268     # violation.
269     comp.cli_version = cli_version.__version__
270     comp.schema_path = get_path_component_spec()
271
272     # build the ORM
273     for attr, val in six.iteritems(attrs):
274         setattr(comp, attr, val)
275
276     # update relationships
277     get_format_local = partial(get_format, session)
278     get_unique_formats = partial(_get_unique_format_things, _create_format_tuple,
279             get_format_local)
280
281     try:
282         comp.publishes = get_unique_formats(spec['streams']['publishes'])
283     except MissingEntry as e:
284         reraise_with_msg(e, 'Add failed while traversing "publishes"')
285
286     try:
287         comp.subscribes = get_unique_formats(spec['streams']['subscribes'])
288     except MissingEntry as e:
289         reraise_with_msg(e, 'Add failed while traversing "subscribes"')
290
291     get_format_pairs = partial(_get_format_pair, session)
292     get_unique_format_pairs = partial(_get_unique_format_things,
293             _create_format_pair_tuple, get_format_pairs)
294
295     try:
296         comp.provides = get_unique_format_pairs(spec['services']['provides'])
297     except MissingEntry as e:
298         reraise_with_msg(e, 'Add failed while traversing "provides"')
299
300     try:
301         comp.calls = get_unique_format_pairs(spec['services']['calls'])
302     except MissingEntry as e:
303         reraise_with_msg(e, 'Add failed while traversing "calls"')
304
305     return comp
306
307
308 def add_format(session, spec, user, update):
309     '''Helper function which adds a data format to the catalog'''
310     attrs = spec['self'].copy()
311     attrs['spec'] = json.dumps(spec)
312     name, version = attrs['name'], attrs['version']
313
314     # TODO: This should really come from the spec too
315     attrs['owner'] = user
316
317     if update:
318         data_format = get_format(session, name, version)
319         if data_format.is_published():
320             raise FrozenEntry("Data format {}:{} has been pushed and cannot be updated".format(name, version))
321     else:
322         data_format = Format()
323         session.add(data_format)
324
325     # build the ORM
326     for attr, val in six.iteritems(attrs):
327         setattr(data_format, attr, val)
328
329     # REVIEW: Inject these parameters as function arguments instead of this
330     # hidden approach?
331     data_format.cli_version = cli_version.__version__
332     data_format.schema_path = get_path_data_format()
333
334     session.commit()
335
336
337 def _filter_neighbors(session, neighbors=None):
338     '''Returns a Component query filtered by available neighbors'''
339     if neighbors is None:
340         query = session.query(Component)
341     else:
342         subfilt = or_(and_(Component.name==n, Component.version==v) for n,v in neighbors)
343         query = session.query(Component).filter(subfilt)
344     return query
345
346
347 def get_subscribers(session, orm, neighbors=None):
348     '''Returns a list of component ORMs which subscribe to the specified format'''
349     query = _filter_neighbors(session, neighbors)
350     return query.filter(Component.subscribes.contains(orm)).all()
351
352
353 def get_providers(session, orm, neighbors=None):
354     '''Returns a list of component ORMs which provide the specified format pair'''
355     query = _filter_neighbors(session, neighbors)
356     return query.filter(Component.provides.contains(orm)).all()
357
358
359 def _match_pub(entries, orms):
360     '''Aligns the publishes orms with spec entries to get the config key'''
361     lookup = {(orm.name, orm.version): orm for orm in orms}
362     for entry in entries:
363         if "http" not in entry["type"]:
364             continue
365
366         key = (entry['format'], entry['version'])
367         yield entry['config_key'], lookup[key]
368
369
370 def _match_call(entries, orms):
371     '''Aligns the calls orms with spec entries to get the config key'''
372     lookup = {(orm.req.name, orm.req.version, orm.resp.name, orm.resp.version): orm for orm in orms}
373     for entry in entries:
374         key = (entry['request']['format'], entry['request']['version'], entry['response']['format'], entry['response']['version'])
375         yield entry['config_key'], lookup[key]
376
377 def get_discovery(get_params_func, session, name, version,  neighbors=None):
378     '''Returns the parameters and interface map for a given component and considering its neighbors'''
379     comp = _get_component(session, name, version)
380     spec = json.loads(comp.spec)
381     interfaces = dict()
382     for key, orm in _match_pub(spec['streams']['publishes'], comp.publishes):
383         interfaces[key] = [(c.name, c.version) for c in get_subscribers(session, orm, neighbors) if not c is comp]
384
385     for key, orm in _match_call(spec['services']['calls'], comp.calls):
386         interfaces[key] = [(c.name, c.version) for c in get_providers(session, orm, neighbors) if not c is comp]
387
388     params = get_params_func(spec)
389     return params, interfaces
390
391 _get_discovery_for_cdap = partial(get_discovery, normalize_cdap_params)
392 _get_discovery_for_docker = partial(get_discovery,
393         lambda spec: {param['name']: param['value'] for param in spec['parameters']})
394
395
396 def _get_discovery_for_dmaap(get_component_spec_func, name, version):
397     """Get all config keys that are for dmaap streams
398
399     Returns:
400     --------
401     Tuple of message router config keys list, data router config keys list
402     """
403     spec = get_component_spec_func(name, version)
404
405     all_streams = spec["streams"].get("publishes", []) \
406             + spec["streams"].get("subscribes", [])
407
408     def is_for_message_router(stream):
409         return stream["type"] == "message router" \
410                 or stream["type"] == "message_router"
411
412     mr_keys = [ stream["config_key"] for stream in filter(is_for_message_router, all_streams) ]
413
414     def is_for_data_router(stream):
415         return stream["type"] == "data router" \
416                 or stream["type"] == "data_router"
417
418     dr_keys = [ stream["config_key"] for stream in filter(is_for_data_router, all_streams) ]
419     return mr_keys, dr_keys
420
421
422 def _filter_latest(orms):
423     '''Filters and yields only (name, version, *) orm tuples with the highest version'''
424     get_first_key_func = lambda x: x[0]
425     # itertools.groupby requires the input to be sorted
426     sorted_orms = sorted(orms, key=get_first_key_func)
427     for _, g in itertools.groupby(sorted_orms, get_first_key_func):
428         yield max(g, key=lambda x: x[1])
429
430
431 def list_components(session, user, only_published, subscribes=None, publishes=None,
432         provides=None, calls=None, latest=True):
433     """Get list of components
434
435     Returns:
436     --------
437     List of component orms as dicts
438     """
439     filters = list()
440     if subscribes:
441         filters.extend(Component.subscribes.contains(get_format(session, n, v)) for n, v in subscribes)
442     if publishes:
443         filters.extend(Component.publishes.contains(get_format(session, n, v)) for n, v in publishes)
444     if provides:
445         filters.extend(Component.provides.contains(_get_format_pair(session, reqn, reqv, respn, respv, create=False))
446                        for (reqn, reqv), (respn, respv) in provides)
447     if calls:
448         filters.extend(Component.calls.contains(_get_format_pair(session, reqn, reqv, respn, respv, create=False))
449                        for (reqn, reqv), (respn, respv) in calls)
450     if filters:
451         query = session.query(Component).filter(or_(*filters))
452     else:
453         query = session.query(Component)
454
455     if user:
456         query = query.filter(Component.owner==user)
457     if only_published:
458         query = query.filter(Component.when_published!=None)
459
460     orms = ((orm.name, orm.version, orm.component_type, orm) for orm in query)
461
462     if latest:
463         orms = _filter_latest(orms)
464
465     return [ orm.__dict__ for _, _, _, orm in orms ]
466
467
468 def _list_formats(session, user, only_published, latest=True):
469     """Get list of data formats
470
471     Returns
472     -------
473     List of data format orms as dicts
474     """
475     query = session.query(Format).order_by(Format.modified.desc())
476
477     if user:
478         query = query.filter(Format.owner==user)
479     if only_published:
480         query = query.filter(Format.when_published!=None)
481
482     orms = [ (orm.name, orm.version, orm) for orm in query ]
483
484     if latest:
485         orms = _filter_latest(orms)
486     return [ orm.__dict__ for _, _, orm in orms ]
487
488
489 def build_config_keys_map(spec):
490     """Build config keys map
491
492     Return
493     ------
494     Dict where each item:
495
496         <config_key>: { "group": <grouping>, "type": <http|message_router|data_router> }
497
498     where grouping includes "streams_publishes", "streams_subscribes", "services_calls"
499     """
500     # subscribing as http doesn't have config key
501     ss = [ (s["config_key"], { "group": "streams_subscribes", "type": s["type"] })
502         for s in spec["streams"]["subscribes"] if "config_key" in s]
503     sp = [ (s["config_key"], { "group": "streams_publishes", "type": s["type"] })
504         for s in spec["streams"]["publishes"] ]
505     sc = [ (s["config_key"], { "group": "services_calls" })
506         for s in spec["services"]["calls"] ]
507     return dict(ss+sp+sc)
508
509
510 def get_data_router_subscriber_route(spec, config_key):
511     """Get route by config key for data router subscriber
512
513     Utility method that parses the component spec
514     """
515     for s in spec["streams"].get("subscribes", []):
516         if s["type"] in ["data_router", "data router"] \
517                 and s["config_key"] == config_key:
518             return s["route"]
519
520     raise MissingEntry("No data router subscriber for {0}".format(config_key))
521
522
523 class MockCatalog(object):
524
525     def __init__(self, purge_existing=False, enforce_image=True, db_name=None, engine=None, db_url=None):
526         self.engine = create_engine(Base, db_name=db_name, purge_existing=purge_existing, db_url=db_url) if engine is None else engine
527         self.enforce_image = enforce_image
528
529     def add_component(self, user, spec, update=False):
530         '''Validates component specification and adds component to the mock catalog'''
531         validate_component(spec)
532
533         component_type = spec["self"]["component_type"]
534
535         with SessionTransaction(self.engine) as session:
536             if component_type == "cdap":
537                 _add_cdap_component(session, user, spec, update)
538             elif component_type == "docker":
539                 _add_docker_component(session, user, spec, update,
540                         enforce_image=self.enforce_image)
541             else:
542                 raise CatalogError("Unknown component type: {0}".format(component_type))
543
544     def get_docker_image(self, name, version):
545         '''Returns the docker image name associated with this component'''
546         with SessionTransaction(self.engine) as session:
547             return _get_docker_image(session, name, version)
548
549     def get_docker(self, name, version):
550         with SessionTransaction(self.engine) as session:
551             comp = _get_component(session, name, version)
552             spec = comp.get_spec_as_dict()
553             # NOTE: Defaults are being applied for docker config here at read
554             # time. Not completely sure that this is the correct approach. The
555             # benefit is that defaults can be changed without altering the stored
556             # specs. It's a nice layering.
557             docker_config = apply_defaults_docker_config(spec["auxilary"])
558             return _get_docker_image_from_spec(spec), docker_config, spec
559
560     def get_docker_config(self, name, version):
561         _, docker_config, _ = self.get_docker(name, version)
562         return docker_config
563
564     def get_cdap(self, name, version):
565         '''Returns a tuple representing this cdap component
566
567         Returns
568         -------
569         tuple(jar, config, spec)
570             jar: string
571                 URL where the CDAP jar is located.
572             config: dict
573                 A dictionary loaded from the CDAP JSON configuration file.
574             spec: dict
575                 The dcae-cli component specification file.
576         '''
577         with SessionTransaction(self.engine) as session:
578             comp = _get_component(session, name, version)
579             spec = comp.get_spec_as_dict()
580             cdap_config = spec["auxilary"]
581             return _get_cdap_jar_from_spec(spec), cdap_config, spec
582
583     def get_component_type(self, name, version):
584         '''Returns the component type associated with this component'''
585         with SessionTransaction(self.engine) as session:
586             return get_component_type(session, name, version)
587
588     def get_component_spec(self, name, version):
589         '''Returns the spec dict associated with this component'''
590         with SessionTransaction(self.engine) as session:
591             return get_component_spec(session, name, version)
592
593     def get_format_spec(self, name, version):
594         '''Returns the spec dict associated with this data format'''
595         with SessionTransaction(self.engine) as session:
596             return get_format_spec(session, name, version)
597
598     def add_format(self, spec, user, update=False):
599         '''Validates data format specification and adds data format to the mock catalog'''
600         validate_format(spec)
601         with SessionTransaction(self.engine) as session:
602             add_format(session, spec, user, update)
603
604     def get_discovery_for_cdap(self, name, version, neighbors=None):
605         '''Returns the parameters and interface map for a given component and considering its neighbors'''
606         with SessionTransaction(self.engine) as session:
607             return _get_discovery_for_cdap(session, name, version, neighbors)
608
609     def get_discovery_for_docker(self, name, version, neighbors=None):
610         '''Returns the parameters and interface map for a given component and considering its neighbors'''
611         with SessionTransaction(self.engine) as session:
612             return _get_discovery_for_docker(session, name, version, neighbors)
613
614     def get_discovery_for_dmaap(self, name, version):
615         with SessionTransaction(self.engine) as session:
616             get_component_spec_func = partial(get_component_spec, session)
617             return _get_discovery_for_dmaap(get_component_spec_func, name, version)
618
619     def get_discovery_from_spec(self, user, target_spec, neighbors=None):
620         '''Get pieces to generate configuration for the given target spec
621
622         This function is used to obtain the pieces needed to generate
623         the application configuration json: parameters map, interfaces map, dmaap
624         map. Where the input is a provided specification that hasn't been added to
625         the catalog - prospective specs - which includes a component that doesn't
626         exist or a new version of an existing spec.
627
628         Returns
629         -------
630         Tuple of three elements:
631
632         - Dict of parameter name to parameter value
633         - Dict of "config_key" to list of (component.name, component.version)
634           known as "interface_map"
635         - Tuple of lists of "config_key" the first for message router the second
636           for data router known as "dmaap_map"
637         '''
638         validate_component(target_spec)
639
640         with SessionTransaction(self.engine) as session:
641             # The following approach was taken in order to:
642             # 1. Re-use existing functionality e.g. implement fast
643             # 2. In order to make ORM-specific queries, I need the entire ORM
644             # in SQLAlchemy meaning I cannot do arbitrary DataFormatPair queries
645             # without Component.
646             name = target_spec["self"]["name"]
647             version = target_spec["self"]["version"]
648
649             try:
650                 # Build a component with update to True first because you may
651                 # want to run this for an existing component
652                 build_generic_component(session, user, target_spec, True)
653             except MissingEntry:
654                 # Since it doesn't exist already, build a new component
655                 build_generic_component(session, user, target_spec, False)
656
657             # This is needed so that subsequent queries will "see" the component
658             session.flush()
659
660             ctype = target_spec["self"]["component_type"]
661
662             if ctype == "cdap":
663                 params, interface_map = _get_discovery_for_cdap(session, name,
664                         version, neighbors)
665             elif ctype == "docker":
666                 params, interface_map = _get_discovery_for_docker(session, name,
667                         version, neighbors)
668
669             # Don't want to commit these changes so rollback.
670             session.rollback()
671
672             # Use the target spec as the source to compile the config keys from
673             dmaap_config_keys = _get_discovery_for_dmaap(
674                     lambda name, version: target_spec, name, version)
675
676             return params, interface_map, dmaap_config_keys
677
678     def verify_component(self, name, version):
679         '''Returns the component's name and version if it exists and raises an exception otherwise'''
680         with SessionTransaction(self.engine) as session:
681             return verify_component(session, name, version)
682
683     def list_components(self, subscribes=None, publishes=None, provides=None,
684             calls=None, latest=True, user=None, only_published=False):
685         '''Returns a list of component names which match the specified filter sequences'''
686         with SessionTransaction(self.engine) as session:
687             return list_components(session, user, only_published, subscribes,
688                     publishes, provides, calls, latest)
689
690     def list_formats(self, latest=True, user=None, only_published=False):
691         """Get list of data formats
692
693         Returns
694         -------
695         List of data formats as dicts
696         """
697         with SessionTransaction(self.engine) as session:
698             return _list_formats(session, user, only_published, latest)
699
700     def get_format(self, name, version):
701         """Get data format
702
703         Throws MissingEntry exception if no matches found.
704
705         Returns
706         -------
707         Dict representation of data format
708         """
709         with SessionTransaction(self.engine) as session:
710             return get_format(session, name, version).__dict__
711
712     def _publish(self, get_func, user, name, version):
713         """Publish data format
714
715         Args:
716         -----
717         get_func: Function that takes a session, name, version and outputs a data
718             object either Component or Format
719
720         Returns:
721         --------
722         True upon success else False
723         """
724         # TODO: To make function composeable, it should take in the data format
725         # object
726         with SessionTransaction(self.engine) as session:
727             obj = get_func(session, name, version)
728
729             if obj:
730                 if obj.owner != user:
731                     logger.error("Not authorized to modify component or data format")
732                     return False
733                 elif obj.when_published:
734                     logger.warn("Component or data format has already been published")
735                     return False
736                 else:
737                     obj.when_published = datetime.utcnow()
738                     session.commit()
739             else:
740                 logger.error("Component or data format not found: {0}, {1}".format(name, version))
741                 return False
742
743         return True
744
745     def publish_format(self, user, name, version):
746         """Publish data format
747
748         Returns
749         -------
750         True upon success else False
751         """
752         return self._publish(get_format, user, name, version)
753
754     def get_unpublished_formats(self, comp_name, comp_version):
755         """Get unpublished formats for given component
756
757         Returns:
758         --------
759         List of unique data format name, version pairs
760         """
761         with SessionTransaction(self.engine) as session:
762             comp = _get_component(session, comp_name, comp_version)
763
764             dfs = comp.publishes + comp.subscribes
765             dfs += [ p.req for p in comp.provides]
766             dfs += [ p.resp for p in comp.provides]
767             dfs += [ c.req for c in comp.calls]
768             dfs += [ c.resp for c in comp.calls]
769
770             def is_not_published(orm):
771                 return orm.when_published == None
772
773             formats = [(df.name, df.version) for df in filter(is_not_published, dfs)]
774             return list(set(formats))
775
776     def publish_component(self, user, name, version):
777         """Publish component
778
779         Returns
780         -------
781         True upon success else False
782         """
783         return self._publish(_get_component, user, name, version)