Source code for openalea.mtg.io

# -*- python -*-
#
#       OpenAlea.mtg
#
#       Copyright 2008-2016 INRIA - CIRAD - INRA
#
#       File author(s): Christophe Pradal <christophe.pradal.at.cirad.fr>
#
#       Distributed under the Cecill-C License.
#       See accompanying file LICENSE.txt or copy at
#           http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
#       OpenAlea WebSite : http://openalea.gforge.inria.fr
#
################################################################################
"""This module provides functions to read / write mtg data structure."""

import re
from string import Template
from warnings import warn


from mtg import *
from traversal import iter_mtg, iter_mtg_with_filter

try:
    from openalea.core.logger import get_logger, logging

    logger = get_logger('openalea.mtg')
    _ch = logging.StreamHandler()
    logger.addHandler(_ch)
except:
    logger = None

debug = 0


[docs]def log(*args): if debug: if logger: logger.debug(' '.join(map(str, args))) else: print ' '.join(map(str, args))
################## UTILS
[docs]def get_expr(s, expr): res = re.search(expr, s) _str = '' if res: _str = s[res.start():res.end()] return _str
[docs]def get_label(s): name = r'[a-zA-Z0-9]+' return get_expr(s, name)
[docs]def get_name(s): name = r'[a-zA-Z]+' return get_expr(s, name)
[docs]def get_index(s): name = r'[0-9]+' return get_expr(s, name)
[docs]def get_args(s): args = r'\([0-9,-\.\+]+\)' return get_expr(s, args)
[docs]def get_float(s): args = r'[0-9-\+]+' num = get_expr(s, args) return float(num)
[docs]def replace_date(s, format): """ Replace the date / by - """ import re if format == 'DD/MM/YY': rawstr = r"""(?P<day>3[01]|[1-9]|[0-2][0-9])/(?P<month>1[012]|0[1-9]|[1-9])/(?P<year>\d\d)""" else: rawstr = r"""(?P<day>3[01]|[0-2]{0,1}\d)/(?P<month>1[012]|0\d)/(?P<year>19\d\d|20\d\d)""" def change_date(match_obj): day, month, year = match_obj.group('day'), match_obj.group('month'), match_obj.group('year') return '-'.join((day, month, year)) return re.sub(rawstr, change_date, s)
[docs]def multiscale_edit(s, symbol_at_scale = {}, class_type={}, has_date = False, mtg=None): """Construction of an MTG from a string. :Parameters: - `s`: The string representing the MTG. - `symbol_at_scale`: A dict containing the scale for each symbol name. :Optional parameters: - `class_type`: A dict containing the type of the properties. - `has_date`: Is the MTG is a Dynamic MTG? - `mtg`: An existing MTG :Return: MTG object """ def get_properties(name,vid=None, time=False): _type = dict([('INT', int), ('REAL', float), ('ALPHA', str), ('DD/MM/YY', str), ('DD/MM/YYYY', str), ('STRING', str)]) args = {} l = name.strip().split('(') if not time: label = get_label(name) index = get_index(label) if index.isdigit(): args['index'] = int(index) args['label'] = label if len(l) > 1: arg_string = l[1].strip()[:-1] if arg_string: ln = arg_string.split(',') for arg in ln: k, v = arg.split('=') klass = _type[class_type[k]] try: args[k] = klass(v) except: if vid is not None: print 'Args ', v, 'of vertex ', vid, 'of type ', k, 'is not of type ', str(klass) else: print 'Args ', v, 'of type ', k, 'is not of type ', str(klass) return args def add_dynamic_properties(mtg, vid, args): print "Existing properties at ", vid, " ", mtg.get_vertex_property(vid) print "New property: ", args # a property can be a list but not a timeserie. # Create a real timeserie object... static_properties = ['index', 'label'] is_time_series= lambda x: isinstance(x,list) date = 'Date' new_date = args.get(date) if new_date is None: return old_date = mtg.property(date)[vid] for prop in args: if prop == date: continue old = mtg.property(prop).get(vid) if is_time_series(old): old.append((new_date, args[prop])) elif old: mtg.property(prop)[vid] = [(old_date, old), (new_date, args[prop])] else: mtg.property(prop)[vid] = [(new_date, args[prop])] implicit_scale = bool(symbol_at_scale) if debug: print symbol_at_scale.keys() mtg = mtg if mtg else MTG() vid = mtg.root # vid of the support tree, i.e. at the finest scale current_vertex = mtg.root branching_stack = [] if not implicit_scale: symbols = ['/', '\\', '[', ']', '+', '<', '<<'] else: symbols = ['/', '[', ']', '+', '<', '<<'] pending_edge = '' # edge type for the next edge to be created scale = 0 # 2. add some properties to the MTG mtg.add_property('index') for k in class_type: mtg.add_property(k) # remove from the date format the / if has_date: print 'replace all the date format by -' if 'DD/MM/YY' in class_type.values(): date_format = 'DD/MM/YY' else: date_format = 'DD/MM/YYYY' s = replace_date(s, date_format) for edge_type in symbols: if edge_type != '/' or not symbol_at_scale: s = s.replace(edge_type, '\n%s'%edge_type) else: # do not consider the date format for klass in symbol_at_scale.keys(): s = s.replace('/%s'%klass, '\n/%s'%klass) s = s.replace('<\n<', '<<') # TODO: Write a regular expression to allow several spaces s = s.replace(')(', ')\n(') s = s.replace(') (', ')\n(') s = s.replace('*(', '\n(') l = filter( None, s.split('\n')) for node in l: if node.startswith('<<'): tag = '<<' name = node[2:] elif node.startswith('(') and has_date: tag = '*' name = node[:] else: tag = node[0] name = node[1:] assert tag in symbols, tag if tag == '[': branching_stack.append(vid) elif tag == ']': vid = branching_stack.pop() current_vertex = vid scale = mtg.scale(vid) elif tag == '*': args = get_properties(name, vid=vid, time=True) print vid, '*(', args, ')' # CPL Manage Dynamic_MTG add_dynamic_properties(mtg, vid, args) else: if class_type: args = get_properties(name, vid=vid) else: label = get_label(name) index = get_index(name) args = {'label':label} if index.isdigit(): args['index'] = int(index) if implicit_scale: symbol_class = get_name(name) try: new_scale = symbol_at_scale[symbol_class] except: print 'NODE ',node, bool(tag=='*') if tag == '/' and new_scale <= scale: new_scale -= 1 pending_edge = '/' while new_scale < scale: scale -= 1 current_vertex = mtg.complex(current_vertex) if tag in ['+', '<']: if mtg.scale(vid) == scale: vid = mtg.add_child(vid, edge_type=tag, **args) current_vertex = vid pending_edge = '' else: complex = mtg.complex(current_vertex) current_vertex = mtg.add_component(complex, **args) pending_edge = tag elif tag == '<<': index = args['index'] label = args['label'] previous_index = mtg.property('index')[current_vertex] pending_edge = '' _args = {} for i in range(previous_index+1, index+1): if i == index: _args = args _args['index'] = i _args['label'] = label.replace(str(index), str(i)) vid = mtg.add_child(vid, edge_type='<', **_args) current_vertex = vid elif tag == '/': if mtg.scale(vid) == scale: vid = mtg.add_component(vid, **args) current_vertex = vid scale += 1 elif mtg.scale(vid) > scale: scale += 1 component = mtg.add_component(current_vertex, **args) if mtg.scale(vid) == scale and pending_edge != '/': vid = mtg.add_child(vid, child=component, edge_type=pending_edge) assert vid == component current_vertex = vid else: current_vertex = component # two case : # 1. up and down in scales E+A/U/E # 2. /P/P if pending_edge == '/': vid = current_vertex scale = mtg.scale(vid) else: vid = mtg.add_component(current_vertex, **args) current_vertex = vid elif tag == '\\': scale -= 1 current_vertex = mtg.complex(current_vertex) mtg = fat_mtg(mtg) return mtg
[docs]def read_lsystem_string( string, symbol_at_scale, functional_symbol={}, mtg=None ): """Read a string generated by a lsystem. :Parameters: - `string`: The lsystem string representing the axial tree. - `symbol_at_scale`: A dict containing the scale for each symbol name. :Optional parameters: - `functional_symbol`: A dict containing a function for specific symbols. The args of the function have to be coherent with those in the string. The return type of the functions have to be a dictionary of properties: dict(name, value) :Return: MTG object """ import openalea.plantgl.all as pgl s = string def transform(turtle, mesh): x = turtle.getUp() z = turtle.getHeading() bo = pgl.BaseOrientation(x, z^x) matrix = pgl.Transform4(bo.getMatrix()) matrix.translate(turtle.getPosition()) mesh = mesh.transform(matrix) return mesh # 1. Create the mtg structure. if mtg is None: mtg = MTG() # 2. add some properties to the MTG mtg.add_property('index') mtg.add_property('can_label') mtg.add_property('geometry') vid = mtg.root # vid of the support tree, i.e. at the finest scale current_vertex = mtg.root branching_stack = [] pending_edge = '' # edge type for the next edge to be created scale = 0 lsys_symbols = ['[', ']', '/', '+', '^', 'f'] modules = symbol_at_scale.keys() symbols = lsys_symbols + modules index = dict(zip(symbol_at_scale.keys(), [0]*len(symbol_at_scale))) is_ramif = False # 2. Create a PlantGL Turtle... turtle = pgl.Turtle() max_scale = max(symbol_at_scale.values()) for edge_type in symbols: if edge_type != 'f': s = s.replace(edge_type, '\n%s'%edge_type) else: s = s.replace('f(', '\nf(') l = s.split() try: plant_name = [s for s in symbol_at_scale.keys() if 'plant' in s.lower()][0] except: ValueError("""Incorrect plant name (should be plant)""") for node in l: # Check if node is a module tag = node[0] if tag == '[': branching_stack.append(vid) turtle.push() is_ramif = True elif tag == ']': vid = branching_stack.pop() current_vertex = vid scale = mtg.scale(vid) turtle.pop() is_ramif = False elif tag == '/': args = get_args(node[1:]) if args: angle = get_float(args[1:-1]) turtle.rollR(angle) else: turtle.rollR() elif tag == '+': args = get_args(node[1:]) if args: angle = get_float(args[1:-1]) turtle.left(angle) else: turtle.left() elif tag == '^': args = get_args(node[1:]) if args: angle = get_float(args[1:-1]) turtle.up(angle) else: turtle.up() elif tag == 'f' and node[1] == '(': args = get_args(node[1:]) if args: length = get_float(args[1:-1]) if length > 0: turtle.f(length) else: turtle.f() else: # add new modules to the mtg (i.e. add nodes) name = get_name(node) if name not in modules: print 'Unknow element %s'% name continue module_scale = symbol_at_scale[name] if is_ramif: edge_type = '+' else: edge_type = '<' log(node, module_scale, edge_type ) if module_scale == scale: if mtg.scale(vid) == scale: vid = mtg.add_child(vid, edge_type=edge_type, label=name) current_vertex = vid pending_edge = '' log('','Cas 1.1', scale, 'mtg.scale(vid)', mtg.scale(vid), 'generated vertex', vid) assert mtg.scale(vid) == module_scale else: # add the edge to the current vertex current_vertex = mtg.add_child(current_vertex, edge_type=edge_type, label=name) log('', 'Cas 1.2', scale, 'mtg.scale(vid)', mtg.scale(vid), 'generated vertex', current_vertex) assert mtg.scale(current_vertex) == module_scale is_ramif = False elif module_scale > scale: log('', 'Cas 2', scale, 'mtg.scale(vid)', mtg.scale(vid)) old_current_vertex = current_vertex while module_scale > scale: if mtg.scale(vid) == scale: assert vid == current_vertex vid = mtg.add_component(vid) current_vertex = vid log('', '', 'Cas 2.1', scale, 'generate new component', current_vertex) scale += 1 if module_scale == scale: assert mtg.scale(current_vertex) == module_scale mtg.property('label')[current_vertex] = name break else: scale += 1 current_vertex = mtg.add_component(current_vertex) else: log(node, 'add_child(%d, child=%d)'%(old_current_vertex, current_vertex)) mtg.property('label')[current_vertex] = name if mtg.scale(vid) == scale: vid = mtg.add_child(vid, child=current_vertex, edge_type=edge_type) is_ramif = False else: assert module_scale < scale while module_scale < scale: scale -= 1 current_vertex = mtg.complex(current_vertex) else: current_vertex = mtg.add_child(current_vertex, edge_type=edge_type, label=name) assert mtg.scale(current_vertex) == module_scale # MANAGE the properties, the geometry and the indices!!! index[name] += 1 if name == plant_name: for k in index.keys(): if k != name: index[k] = 0 mtg.property('index')[current_vertex] = index[name] if name in functional_symbol: features = eval(node, functional_symbol) geom = features.get('geometry') canlabel = features.get('label') if geom: # get the transformation from the turtle geom = transform(turtle, geom) mtg.property('geometry')[current_vertex] = geom if name == 'StemElement': # parse args to know how the turtle has to move . args = get_args(node)[1:-1] list_args= args.split(',') length = float(list_args[1]) # 2nd arg if length > 0: turtle.f(length) if canlabel: canlabel.elt_id = index[name] plant_id = mtg.complex_at_scale(current_vertex, scale=1) canlabel.plant_id = mtg.property('index')[plant_id] mtg.property('can_label')[current_vertex] = canlabel mtg = fat_mtg(mtg) return mtg
[docs]def axialtree2mtg(tree, scale, scene, parameters = None): """Create an MTG from an AxialTree. Tha axial tree has been generated by LPy. It contains both modules with parameters. The geometry is provided by the scene. The shape ids are the same that the module ids in the axial tree. For each module name in the axial tree, a `scale` and a list of parameters should be defined. The `scale` dict allow to add a module at a given scale in the MTG. The `parameters` dict map for each module name a list of parameter name that are added to the MTG. :Parameters: - `tree`: The axial tree generated by the L-system - `scale`: A dict containing the scale for each symbol name. - `scene`: The scene containing the geometry. - `parameters`: list of parameter names for each module. :Return: mtg :Example: .. code-block:: python tree # axial tree scales = {} scales['P'] = 1 scales['A'] = 2 scales['GU'] = 3 params ={} params['P'] = [] params['A'] = ['length', 'radius'] params['GU'] = ['nb_flower'] g = axialtree2mtg(tree, scales, scene, params) .. seealso:: :func:`mtg2axialtree`, :func:`lpy2mtg`, :func:`mtg2lpy` """ def scene_id(scene): d = {} if scene: for sh in scene: d.setdefault(sh.id,[]).append(sh) return d def change_id(axial_id, mtg_id): """ Change the id of the shape in the scene by the id of the mtg element. """ mtg.property('_axial_id')[mtg_id] = axial_id if geoms: if geoms.has_key(axial_id): for shape in geoms[axial_id]: shape.id = mtg_id mtg.property('geometry')[mtg_id]=geoms[axial_id] else: #print 'Be careful : no id ', axial_id pass # The string represented by the axial tree... geoms = scene.todict() if scene else dict() # scene_id(scene) mtg = MTG() if scene: mtg.add_property('geometry') mtg.add_property('_axial_id') if parameters is None: parameters = {} for label in parameters: for p in parameters[label]: if p not in mtg.property_names(): mtg.add_property(p) vid = mtg.root current_vertex = vid branching_stack = [vid] pending_edge = '' # edge type for the next edge to be created max_scale = max(scale.itervalues()) for aid, module in enumerate(tree): label = module.name if label == '[': branching_stack.append(vid) pending_edge = '+' elif label == ']': vid = branching_stack.pop() current_vertex = vid pending_edge = '' elif (label not in scale) and (label not in parameters): continue else: _scale = scale[label] _params = parameters.get(label, []) params = {} params['label'] = label for p in _params: if module.hasParameter(p): params[p] = module.getParameter(p) # otherwise check in parameterset elif module.argSize() is 1: try: pset = module.args[0] if p in pset.__dict__.keys(): params[p] = pset.__dict__[p] except: pass if mtg.scale(vid) == mtg.scale(current_vertex) == _scale: # Add a vertex at the finer scale if pending_edge == '+': edge_type = '+' else: edge_type = '<' #check if the edge_type is a good one: if edge_type == '+' and current_vertex != branching_stack[-1]: edge_type = '<' params['edge_type'] = edge_type vid = mtg.add_child(vid, **params) current_vertex = vid pending_edge = '' elif mtg.scale(vid) < max_scale: assert mtg.scale(vid) == mtg.scale(current_vertex) # Descend in scale for the first time vid = mtg.add_component(vid, **params) current_vertex = vid elif mtg.scale(current_vertex) < _scale: assert mtg.scale(current_vertex) == _scale - 1 current_vertex = mtg.add_component(current_vertex, **params) if mtg.scale(vid) == _scale: if pending_edge == '+': edge_type = '+' else: edge_type = '<' params['edge_type'] = edge_type vid = mtg.add_child(vid, child=current_vertex, **params) assert vid == current_vertex pending_edge = '' else: while mtg.scale(current_vertex) >= _scale: current_vertex = mtg.complex(current_vertex) assert mtg.scale(current_vertex) == _scale - 1 current_vertex = mtg.add_component(current_vertex, **params) pending_edge = '' #assert mtg.scale(current_vertex) == _scale #if max_scale == _scale: change_id(aid,current_vertex) mtg = fat_mtg(mtg) return mtg
[docs]def mtg2axialtree(g, parameters=None, axial_tree=None): """ Create a MTG from an AxialTree with scales. :Parameters: - `axial_tree`: The axial tree managed by the L-system. Use an empty AxialTree if you do not want to concatenate this axial_tree with previous results. - `parameters`: list of parameter names for each module. :Return: mtg :Example: .. code-block:: python params = dict() params ['P'] = [] params['A'] = ['length', radius'] params['GU']=['nb_flower'] tree = mtg2axialtree(g, params) .. seealso:: :func:`axialtree2mtg`, :func:`mtg2lpy` """ edge_type = g.properties().get('edge_type', {}) if parameters is None: parameters = {} tree = axial_tree if tree is None: import openalea.lpy as lpy tree = lpy.AxialTree() # Root of the MTG at scale 0 vtx_id = g.roots_iter(scale=0).next() def axialtree_pre_order_visitor(vid, tree=tree): if vid == g.root: return True et = edge_type.get(vid, '/') if et in ('+', '/'): tree += '[' name = g.class_name(vid) if not name: return False l = [name] params = parameters.get(name, []) if 'parameter_set' in params and len(params) is 1: from openalea.lpy.parameterset import ParameterSet exclude = ['geometry','label','edge_type','_axial_id'] pset = {} for p in g.property_names(): if not p in exclude: arg = g.property(p).get(vid) if arg is None: continue pset[p] = arg l.append(ParameterSet(**pset)) else: for p in params: arg = g.property(p).get(vid) if arg is None: # Be Careful, the argument is skipped if not defined. continue l.append(arg) tree += tuple(l) return True def axialtree_post_order_visitor(vid, tree=tree): et = edge_type.get(vid, '/') if et in ('+', '/'): tree += ']' for v in traversal.iter_mtg2_with_filter(g, vtx_id, axialtree_pre_order_visitor, axialtree_post_order_visitor): pass return tree
[docs]def lpy2mtg(axial_tree, lsystem, scene = None): l = lsystem l.makeCurrent() context = l.context() modules = context.declaredModules() parameters = {} scales = {} for m in modules: label = m.name parameters[label] = m.parameterNames if 'parameter_set' in m.parameterNames and len(m.parameterNames) is 1: if axial_tree.count(label) > 0: index = axial_tree.find(label + '(p)') pset = axial_tree[index].args[0] parameters[label] = pset.__dict__.keys() scales[label] = m.scale tree = axial_tree if scene is None: scene = l.sceneInterpretation(tree) l.done() mtg = axialtree2mtg(tree, scales, scene, parameters) return mtg
[docs]def mtg2lpy(g, lsystem, axial_tree=None): """ Create an AxialTree from a MTG with scales. :Usage: .. code-block:: python tree = mtg2lpy(g,lsystem) :Parameters: - `g`: The mtg which have been generated by an LSystem. - `lsystem`: A lsystem object containing various information. The `lsystem` is only used to retrieve the context and the parameters associated with each module name. :Optional Parameters: - `axial_tree`: an empty axial tree. It is used to avoid complex import in the code. :Return: axial tree .. seealso:: :func:`mtg2axialtree` """ # Retrieve the set of modules, their label, scale and proerty names. l = lsystem l.makeCurrent() context = l.context() modules = context.declaredModules() parameters = {} for m in modules: parameters[m.name] = m.parameterNames l.done() return mtg2axialtree(g, parameters, axial_tree)
[docs]def mtg2mss(name, mtg, scene, envelop_type = 'CvxHull'): """ Convert an MTG into the multi-scale structure implemented by fractalysis. :Parameters: - `name`: name of the structure - `mtg`: the mtg to convert - `scene`: the scene containing the geometry - `envelop_type`: algorithm used to fit the geometry.between scales. :Returns: mss data structure. """ from openalea.fractalysis.light import ssFromDict l = [] for scale in range(1, mtg.nb_scales()-1): d = {} for vid in mtg.vertices_iter(scale=scale): d[vid] = mtg.components(vid) l.append(d) return ssFromDict(name, scene, l, envelop_type)
############################################################################### # Class and methods to read the famous MTG file format. ###############################################################################
[docs]class Reader(object): """ Parse a MTG string from a classic MTG file format. The mtg format is composed of a header and the mtg code. The header is used to construct and validate the mtg. The code contains topology relations and properties. """ def __init__(self, string, has_line_as_param=True, mtg=None, has_date=False): self.mtg = mtg # First implementation. # Do not store 3 time the structure (mtg, txt and lines) self.txt = string self.lines = string.splitlines() # header information self._code = "" self._symbols = {} self._description = None self._features = {} self.has_date = has_date # debug self._no_line = 0 self.warnings = [] self.has_line_as_param = has_line_as_param
[docs] def parse(self): """ """ self.header() self.code() self.errors() return self.mtg
[docs] def header(self): """ Parse an MTG header and create the mtg datastructure. An mtg header contains different parts: - code: definition - classes: symbol name and scale - description: allowed relationship between symbols - features: property name and type """ # 1. Read the file from the begining self._no_line = -1 self.code_form() self.classes() self.description() self.features()
[docs] def check(self): """ Check the validity of the MTG without building it. """ return True
#### internal methods ####
[docs] def code_form(self): """ CODE: FORM-A / FORM-B """ l = self._next_line() l = l.split('#')[0] code = l.split(':') if len(code) == 2 and 'CODE' in code[0] and 'FORM-' in code[1]: self._code = code[1] else: # error self.warnings.append((self._no_line, "Code form error"))
[docs] def classes(self): """ CLASSES: SYMBOL SCALE DECOMPOSITION INDEXATION DEFINITION ... """ decomp = ['NONE', 'FREE', 'CONNECTED', 'NOTCONNECTED', 'LINEAR', 'PURELINEAR', '<-LINEAR', '+-LINEAR'] l = self._next_line() if not l.startswith('CLASSES'): self.warnings.append((self._no_line, "CLASSES section not found.")) l = self._next_line() l = l.split('#')[0] class_header = l.split() if class_header != ['SYMBOL', 'SCALE', 'DECOMPOSITION', 'INDEXATION', 'DEFINITION']: self.warnings.append((self._no_line, "CLASS header error.")) while l: l = self._next_line() if l.startswith('DESCRIPTION'): break l = l.split('#')[0] line = l.split() if len(line) != 5: self.warnings.append((self._no_line, "CLASS error.")) break else: symbol, scale, decomposition, indexation, definition = line # validation if not symbol.isalpha() and symbol != '$': self.warnings.append((self._no_line, "Bad symbol %s."%symbol)) if not scale.isdigit(): self.warnings.append((self._no_line, "Bad scale %s."%scale)) if decomposition not in decomp: self.warnings.append((self._no_line, "Bad decomposition id %s."%decomposition)) # TODO: validate indexation if definition not in ['IMPLICIT', 'EXPLICIT']: self.warnings.append((self._no_line, "Bad definition %s."%definition)) if symbol != '$': self._symbols[symbol] = int(scale) if l.startswith('DESCRIPTION'): self._no_line -= 1
[docs] def description(self): """ DESCRIPTION: LEFT RIGHT RELTYPE MAX U U,I + ? ... """ l = self._next_line() if not l.startswith('DESCRIPTION'): self.warnings.append((self._no_line, "DESCRIPTION section not found.")) l = self._next_line() l = l.split('#')[0] desc_header = l.split() if desc_header != ['LEFT', 'RIGHT', 'RELTYPE', 'MAX']: self.warnings.append((self._no_line, "DESCRIPTION header error.")) while l : l = self._next_line() if l.startswith('FEATURES'): break l = l.split('#')[0] line = l.split() if len(line) < 2: self.warnings.append((self._no_line, "Class description error.")) continue left = line[0] if left not in self._symbols: self.warnings.append((self._no_line, "Unknown left symbol %s."%left)) right = ''.join(line[1:-2]) rights = [symbol.strip() for symbol in right.split(',')] bad_right= filter(lambda x: x not in self._symbols, rights) if bad_right: self.warnings.append((self._no_line, "Unknown right symbols %s."%bad_right)) reltype, _max = line[-2:] if reltype not in ['+', '<']: self.warnings.append((self._no_line, "Unknown relation type %s."%reltype)) if _max != '?' and not _max.isdigit(): msg = "Error in the maximum number of relationships (%s)."%_max msg += "Give a number or ?" self.warnings.append((self._no_line, msg)) if l.startswith('FEATURES'): self._no_line -= 1
[docs] def features(self): """ FEATURES: NAME TYPE nb_plant INT """ l = self._next_line() if not l.startswith('FEATURES'): self.warnings.append((self._no_line, "FEATURES section not found.")) l = self._next_line() l = l.split('#')[0] f_header = l.split() if f_header != ['NAME', 'TYPE']: self.warnings.append((self._no_line, "FEATURES header error.")) while l: l = self._next_line() if not l or l.startswith('MTG'): break l = l.split('#')[0] line = l.split() if len(line) != 2: self.warnings.append((self._no_line, "FEATURE description error.")) continue name, _type = line if '/' in _type and 'date' in name.lower(): self.has_date = True self._features[name] = _type if l.startswith('MTG'): self._no_line -= 1 # add _line feature as int if self.has_line_as_param: self._features['_line'] = 'INT'
def _next_line(self): self._no_line += 1 if self._no_line == len(self.lines): self._no_line -= 1 return "" l = self.lines[self._no_line] l1 = l.strip() if not l1 or l1[0] == '#': return self._next_line() else: return l
[docs] def next_line_iter(self): l = self._next_line() while l: yield l l = self._next_line()
[docs] def errors(self): nb_lines = len(self.lines) for id, warning in self.warnings: if id < nb_lines: print "== Line %d: %s"%(id, self.lines[id]) print warning else: print id, " ", warning
############################################################################ ### Parsing of the MTG code ### That's the real stuff... ############################################################################
[docs] def code(self): """ Parse the code and populate the MTG. """ l = self._next_line() if not l.startswith('MTG'): self.warnings.append((self._no_line, "MTG section not found.")) l = self._next_line() if not l.startswith('ENTITY-CODE') and not l.startswith('TOPO'): self.warnings.append((self._no_line, "ENTITY-CODE or TOPO not found.")) l = l.split('#')[0] features = l.split()[1:] self._nb_features = len(features) self._feature_head = [] for feature in features: if feature not in self._features: self.warnings.append((self._no_line, "Error in ENTITY-CODE: Feature %s is unknown."%feature)) else: self._feature_head.append(feature) code_topo = l[:l.find(features[0])] if self._nb_features else l[:] nb_cols = len(code_topo.split('\t')) self._feature_slice = slice(nb_cols-1, nb_cols-1+self._nb_features) self.preprocess_code() self.build_mtg()
[docs] def preprocess_line(self, s, diff_space, indent, nb_spaces, edge_type): """ Preprocess a line. """ if (debug): print 'line :%s, nb_spaces: %d, diff_space: %d, edge_type:%s, %s'%(s, nb_spaces, diff_space, str(edge_type), str(indent)) if diff_space == 0: if s.startswith('^') or s.startswith('*'): s = s[1:] elif edge_type: elt = '' if edge_type[-1] in ['+', '/']: elt = ']' if s[0] in ['+','/']: if elt == ']': edge_type.pop() edge_type.append(s[0]) s = elt+'[' + s elif s[0] in ['+']: edge_type.append(s[0]) elif diff_space > 0: # indent if s.startswith('^'): print 'ERROR %s'%s indent.append(nb_spaces) if s[0] in ['+','/']: edge_type.append(s[0]) s = "[" + s else: edge_type.append(s[0]) else: # unindent brackets = [] # Close the previous brackets while nb_spaces - indent[-1] < 0: indent.pop() if edge_type: edge = edge_type.pop() if edge in ['+','/']: brackets.append(']') # Same case as diff_space == 0 assert nb_spaces - indent[-1] == 0 if s.startswith('^'): s = s[1:] elif edge_type: elt='' if edge_type[-1] in ['+','/']: elt = ']' if s[0] in ['+','/']: if elt == ']': edge_type.pop() edge_type.append(s[0]) s = elt+'[' + s s = ''.join(brackets+[s]) return s, edge_type
[docs] def preprocess_code(self): code = [l for l in self.lines[self._no_line+1:] if l.strip() and not l.strip().startswith('#')] indent = [0] edge_type = [] new_code = [] #for l in code: for l in self.next_line_iter(): #l = l.expandtabs(4) s = l.strip() s= s.split()[0] # args args = l.split('\t')[self._feature_slice] params = [ "%s=%s"%(k,v) for k, v in zip(self._feature_head, args) if v.strip()] if self.has_line_as_param: params.append("_line=%d"%self._no_line) if params: s = s + "("+','.join(params)+")" # build nb_spaces = len(l) - len(l.lstrip('\t')) diff_space = nb_spaces - indent[-1] #s = self.preprocess_line(s, diff_space, indent, nb_spaces, edge_type) s, edge_type = self.preprocess_line(s, diff_space, indent, nb_spaces, edge_type) new_code.append(s) while edge_type: edge = edge_type.pop() if edge in ['+','/']: new_code.append(']') self._new_code = ''.join(new_code) if debug: print self._new_code
[docs] def build_mtg(self): """ """ self.mtg = multiscale_edit(self._new_code, self._symbols, self._features, self.has_date, mtg=self.mtg)
#self.mtg = multiscale_edit(self._new_code, {}, self._features)
[docs]def read_mtg(s, mtg=None, has_date=False): """ Create an MTG from its string representation in the MTG format. :Parameter: - s (string) - a multi-lines string :Return: an MTG :Example: .. code-block:: python f = open('test.mtg') txt = f.read() g = read_mtg(txt) .. seealso:: :func:`read_mtg_file`. """ reader = Reader(s, mtg=mtg, has_date=has_date) g = reader.parse() return g
[docs]def read_mtg_file(fn, mtg=None, has_date=False): """ Create an MTG from a filename. :Usage: >>> g = read_mtg_file('test.mtg') .. seealso:: :func:`read_mtg`. """ f = open(fn) txt = f.read() f.close() return read_mtg(txt, mtg=mtg, has_date=has_date)
[docs]def mtg_display(g, vtx_id, tab=' ', edge_type=None, label=None): """ Test the traversal of an mtg. A first step before writing it. """ import traversal if not edge_type: edge_type = g.properties().get('edge_type', {}) if not label: label= g.properties().get('label', {}) prev = vtx_id prev_order = 0 prev_scale = g.scale(vtx_id) for vid in traversal.iter_mtg(g, vtx_id): if prev == vid: continue name = label.get(vid, vid) if vid in edge_type: et = edge_type[vid] elif prev == g.parent(vid): et = '<' else: et = '?' space = '' scale = g.scale(vid) order = g.order(vid) if prev == g.complex(vid): et = '/' # add one blank line space = '^' elif prev_scale == scale and et == '<': space = '^' if scale < prev_scale: yield '' order = g.order(vid) prev = vid prev_scale = scale prev_order = order if order != prev_order: indent = 0 yield (order*tab) +space+et+ name
############################################################################### # Class and methods to write in the famous MTG file format. ###############################################################################
[docs]class Writer(object): """ Write a MTG string from a mtg object. The mtg format is composed of a header and the mtg code. The header is used to construct and validate the mtg. The code contains topology relations and properties. """ def __init__(self, g, header = '' ): self.g = g self._header = header
[docs] def header(self): """ Build the MTG header from the datastructure. An mtg header contains different parts: - code: definition - classes: symbol name and scale - description: allowed relationship between symbols - features: property name and type """ code = self._code() classes = self.classes() desc = self.description() features = self.features()
[docs] def code(self, property_names, nb_tab=12, display_id=False, display_scale=False, filter=None): """ Traverse the MTG and write the code. """ head = ['MTG :'] entity = ['ENTITY-CODE'] entity.extend((nb_tab-1)*['']) entity.extend(property_names) head.append('\t'.join(entity)) # Create for each line a string with code and propertie values. # TODO : duplication of code from display_mtg and mtg_display. labels = self.g.property('label') edge_type = self.g.property('edge_type') properties = self.g.properties() current_vertex = self.g.root tab = 0 prev_scale = 0 sym_at_col = [] for vtx in traversal.iter_mtg2(self.g, current_vertex): if filter and not filter(self.g, vtx): continue log('Process ',vtx, self.g.node(vtx).label) cur_scale = self.g.scale(vtx) if vtx == current_vertex: current_vertex = vtx prev_scale = cur_scale sym_at_col.append(vtx) continue # Algorithm description: # prev_scale >= cur_scale: # 1. search the parent # 2. if < same column elif + : tab = col+1 complex = self.g.complex(vtx) if current_vertex == complex: et = '/' if current_vertex != self.g.root: et = '^'+et log(' ','Cas / ',self.g.node(current_vertex).label, vtx, et) else: et = edge_type.get(vtx,'/') parent = self.g.parent(vtx) possible_et = possible_tab = None log(' ','Cas 2:', et, 'parent:',parent, 'sym_at_col: ',sym_at_col) for i in range(tab, -1, -1): vc = v = sym_at_col[i] vscale = self.g.scale(v) log(' col '+str(i),cur_scale, v,'scale',vscale) vtx_proj = vtx parent_proj = parent if vscale > cur_scale: # up for j in range(vscale-cur_scale): vc = self.g.complex(vc) #down # Even if the complex are linked together, several solution can coexist vtx_proj = self.g.component_roots_at_scale_iter(vtx,scale=vscale).next() parent_proj = self.g.parent(vtx_proj) if vc == parent and v == parent_proj: log(' ==> cas 1') if et == '<': et = '^'+et tab = i else: if i+1 < nb_tab: tab = i+1 else: et = '^'+et tab = i break elif vc == parent: log(' ==> cas 2') if et == '<': possible_et = '^'+et possible_tab = i else: if i+1 < nb_tab: possible_tab = i+1 else: possible_et = '^'+et possible_tab = i elif i == 0 and self.g.complex(vc) == self.g.complex(vtx)==self.g.root: if not possible_et: tab = 0 break else: #print sy log(' Possible Error. Use hypothetic state if possible.') if possible_et and possible_tab: et = possible_et tab = possible_tab else: print tab print sym_at_col raise Exception("Error in the MTG for vertex %d"%vtx) if tab >= nb_tab: msg = """There is not enough tabs to store the MTG code. Increase the nb_tab variable to at least %d""" raise Exception(msg%(nb_B2tab+2)) # Create a valid line with properties. label = labels.get(vtx, str(vtx)) if not display_id and not display_scale: name = '%s%s'%(et,get_label(label)) elif display_id and display_scale: name = '%s%s\t\t\t(id=%d, scale=%d)'%(et,get_label(label),vtx, self.g.scale(vtx)) elif display_id: name = '%s%s\t\t\t(id=%d)'%(et,get_label(label),vtx) else: name = '%s%s\t\t\t(scale=%d)'%(et,get_label(label),self.g.scale(vtx)) line = ['']*nb_tab line[tab] = name log(' -> Add vertex', line[:tab+1], '(%d)'%tab ) for pname in property_names: if properties[pname].has_key(vtx): p = properties[pname].get(vtx,'') line.append(str(p)) else: line.append('') head.append('\t'.join(line)) current_vertex = vtx if len(sym_at_col)==tab: sym_at_col.append(vtx) else: assert len(sym_at_col) > tab sym_at_col = sym_at_col[:tab+1] sym_at_col[tab] = vtx return head
@staticmethod def _code(code='A'): """ Define the MTG code format. """ if code not in ['A', 'B']: code = 'A' return "CODE : \tFORM-%s" % code @staticmethod def _classes(symbols): """ Define the different symbols with their scale. symbols is a list of dictionary with specific keys: - symbol is the class name - scale is an positive integer - decomposition (optional) is in [FREE, LINEAR, CONNECTED, +-LINEAR, <-LINEAR, NOTCONNECTED, NONE] - indexation (optional) is FREE or CONSECUTIVE - definition (optional) is EXPLICIT or IMPLICIT """ klass = ['CLASSES :'] head = ['SYMBOL', 'SCALE', 'DECOMPOSITION', 'INDEXATION', 'DEFINITION'] klass.append('\t'.join(head)) default = dict(decomposition='FREE', indexation='FREE', definition='EXPLICIT') template = Template('\t'.join(['$symbol', '$scale', '$decomposition', '$indexation', '$definition'])) d = default.copy() d.update(dict(symbol='$', scale='0', definition='IMPLICIT')) s = template.substitute(d) klass.append(s) for sdict in symbols: d = default.copy() d.update(sdict) s = template.substitute(d) klass.append(s) return '\n'.join(klass) @staticmethod def _description(scale_symbol): """ Generate the description header file for a MTG. scale_symbol is a dict that associate a scale integer with the different symbols at this scale. """ desc = ['DESCRIPTION :'] head = ['LEFT', 'RIGHT', 'RELTYPE', 'MAX'] desc.append('\t'.join(head)) template = Template('\t'.join(['$left', '$right', '$reltype', '$max'])) d = {} d['max'] = '?' scales = sorted(scale_symbol.keys()) for scale in scales: symbols = scale_symbol[scale] d['right'] = ','.join(symbols) for s in symbols: d['left'] = s for edge_type in ['<', '+']: d['reltype'] = edge_type l = template.substitute(d) desc.append(l) return '\n'.join(desc) @staticmethod def _features(name_type): """ Generate the Feature header. name_type is a list of tuple containing the property name and the associated property type. type is INT, REAL, ALPHA or DATE (DD/MM, DD/MM/YY, DD/MM-TIME, DD/MM/YY-TIME). """ predefined_types = ['INT', 'REAL', 'ALPHA', 'DD/MM', 'DD/MM/YY', 'DD/MM-TIME', 'DD/MM/YY-TIME', 'STRING'] features = ['FEATURES :'] features.append('\t'.join(['NAME', 'TYPE'])) for name, type_ in name_type: if type_ not in predefined_types: warn('The type %s for the feature %s is not allow'%(type_, name), SyntaxWarning) continue features.append('\t'.join([name, type_])) return '\n'.join(features) @staticmethod def _scale2symbol(scales): """ scale is a dict mapping scale to a list of symbols. Returns a list of dict with two keys symbol and scale. """ symbols = [] for s, classes in scales.iteritems(): for class_ in classes: symbols.append(dict(scale=str(s), symbol=class_)) return symbols
[docs]def write_mtg(g, properties=[], class_at_scale=None, nb_tab=12, display_id=False): """ Transform an MTG into a multi-line string in the MTG format. This method build a generic header, then traverses the MTG and transform each vertex into a line with its label, topoloical relationship and specific `properties`. :Parameters: - `g` (MTG) - `properties` (list): a list of tuples associating a property name with its type. Only these properties will be written in the out file. :Optional Parameters: - `class_at_scale` (dict(name->int)): a map between a class name and its scale. If `class _at_scale` is None, its value will be computed from `g`. - `nb_tab` (int): the number of tabs used to write the code. - `display_id` (bool): display the id for each vertex :Returns: a list of strings. :Example: .. code-block:: python # Export all the properties defined in `g`. # We consider that all the properties are real numbers. properties = [(p, 'REAL') for p in g.property_names() if p not in ['edge_type', 'index', 'label']] mtg_lines = write_mtg(g, properties) # Write the result into a file example.mtg filename = 'example.mtg' f = open(filename, 'w') f.write(mtg_lines) f.close() """ w = Writer(g) header = [w._code()] header.append('') if not class_at_scale: label = g.property('label') class_at_scale = dict(((get_name(lab),g.scale(id)) for id, lab in label.iteritems())) scales = {} for class_, scale in class_at_scale.iteritems(): scales.setdefault(scale, []).append(class_) symbols = w._scale2symbol(scales) class_str = w._classes(symbols) header.append(class_str) header.append('') header.append(w._description(scales)) header.append('') features = w._features(properties) header.append(features) header.append('') property_name = [p[0] for p in properties] code = w.code(property_name, nb_tab=nb_tab, display_id=display_id, filter=lambda g,v: True if g.scale(v) <=4 else False) header.extend(code) header.append('') return '\n'.join(header)
[docs]def display(g, max_scale=0, display_id=True, display_scale=False, nb_tab=12,**kwds): """ Display MTG """ w = Writer(g) if max_scale: f = lambda g,v: True if g.scale(v) <= max_scale else False else: f = None code = w.code([], nb_tab=nb_tab, display_id=display_id, display_scale=display_scale, filter=f) return '\n'.join(code[2:])