Skip to content

Commit

Permalink
refactor model processing to aggregate section's data instead of over…
Browse files Browse the repository at this point in the history
…writing
  • Loading branch information
Marketa Opichalova committed Feb 22, 2024
1 parent 1bce72e commit 8546c4e
Showing 1 changed file with 142 additions and 75 deletions.
217 changes: 142 additions & 75 deletions eBCSgen/Parsing/ParseBCSL.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,27 @@ def load_TS_from_json(json_file: str) -> TransitionSystem:
with open(json_file) as json_file:
data = json.load(json_file)

ordering = SortedList(map(lambda agent: complex_parser.parse(agent).data.children[0], data['ordering']))
ts = TransitionSystem(ordering, data['bound'])
ordering = SortedList(
map(
lambda agent: complex_parser.parse(agent).data.children[0],
data["ordering"],
)
)
ts = TransitionSystem(ordering, data["bound"])
ts.states_encoding = dict()
for node_id in data['nodes']:
vector = np.array(eval(data['nodes'][node_id]))
for node_id in data["nodes"]:
vector = np.array(eval(data["nodes"][node_id]))
is_hell = True if vector[0] == inf else False
ts.states_encoding[int(node_id)] = State(Vector(vector), Memory(0), is_hell)
ts.edges = {edge_from_dict(edge) for edge in data['edges']}
ts.init = data['initial']
if 'parameters' in data:
ts.params = data['parameters']

ts.unprocessed = {State(Vector(np.array(eval(state))), Memory(0)) for state in data.get('unprocessed', list())}
ts.edges = {edge_from_dict(edge) for edge in data["edges"]}
ts.init = data["initial"]
if "parameters" in data:
ts.params = data["parameters"]

ts.unprocessed = {
State(Vector(np.array(eval(state))), Memory(0))
for state in data.get("unprocessed", list())
}
ts.states = set(ts.states_encoding.values()) - ts.unprocessed
return ts

Expand All @@ -77,13 +85,20 @@ def __init__(self):
self.counter = 0

def __str__(self):
return " | ".join([str(self.seq), str(self.comp), str(self.complexes), str(self.counter)])
return " | ".join(
[str(self.seq), str(self.comp), str(self.complexes), str(self.counter)]
)

def __repr__(self):
return str(self)

def to_side(self):
return Side([Complex(self.seq[c[0]:c[1] + 1], self.comp[c[0]]) for c in self.complexes])
return Side(
[
Complex(self.seq[c[0] : c[1] + 1], self.comp[c[0]])
for c in self.complexes
]
)


GRAMMAR = r"""
Expand Down Expand Up @@ -189,7 +204,7 @@ def to_side(self):

class TransformRegulations(Transformer):
def regulation(self, matches):
return {'regulation': matches[1]}
return {"regulation": matches[1]}

def regulation_def(self, matches):
return matches[0]
Expand Down Expand Up @@ -255,27 +270,28 @@ def cmplx_dfn(self, matches):
def rules(self, matches):
new_rules = [matches[0]]
for rule in matches[1:]:
if rule.children[-1].data == 'variable':
if rule.children[-1].data == "variable":
variables = rule.children[-1].children[1:]
for variable in variables:
replacer = ReplaceVariables(variable)
new_rule = Tree('rule', deepcopy(rule.children[:-1]))
new_rule = Tree("rule", deepcopy(rule.children[:-1]))
new_rules.append(replacer.transform(new_rule))
else:
new_rules.append(rule)
return Tree('rules', new_rules)
return Tree("rules", new_rules)


def remove_nested_complex_aliases(complex_defns):
"""
Removes nested complex aliases from their definitions.
"""

def replace_aliases(complex_defns):
new_definitions = dict()
for defn in complex_defns:
result = []
for child in complex_defns[defn]:
if child.data == 'cmplx_name':
if child.data == "cmplx_name":
result += complex_defns[str(child.children[0])]
else:
result.append(child)
Expand All @@ -289,7 +305,7 @@ def replace_aliases(complex_defns):
complex_defns = new_defns
new_defns = replace_aliases(complex_defns)

complex_defns = {key: Tree('value', complex_defns[key]) for key in complex_defns}
complex_defns = {key: Tree("value", complex_defns[key]) for key in complex_defns}
return complex_defns


Expand Down Expand Up @@ -350,24 +366,28 @@ def insert_atomic_to_struct(self, atomic, struct):
"""
if len(struct.children) == 2:
for i in range(len(struct.children[1].children)):
if self.get_name(atomic) == self.get_name(struct.children[1].children[i]):
if self.get_name(atomic) == self.get_name(
struct.children[1].children[i]
):
if self.is_empty(struct.children[1].children[i]):
struct.children[1].children[i] = atomic
return struct
raise ComplexParsingError(f"Illegal nesting sequence: {atomic}:{struct}", struct)
raise ComplexParsingError(
f"Illegal nesting sequence: {atomic}:{struct}", struct
)
struct.children[1].children.append(atomic)
else:
struct.children.append(Tree("composition", [atomic]))
return struct

def insert_struct_to_complex(self, struct, complex):
"""
Adds a struct subtree to a complex tree, or merges it with an existing struct subtree. This method first
searches for a struct in the complex with the same name as the input struct. If found, it then checks for
Adds a struct subtree to a complex tree, or merges it with an existing struct subtree. This method first
searches for a struct in the complex with the same name as the input struct. If found, it then checks for
atomic incompatibility within the structs.
The method ensures that the struct being added does not contain atomics with names that match any atomics
in the corresponding struct in the complex. This step is crucial to maintain the integrity of the complex
The method ensures that the struct being added does not contain atomics with names that match any atomics
in the corresponding struct in the complex. This step is crucial to maintain the integrity of the complex
by avoiding conflicting or duplicate atomic structures.
Args:
Expand All @@ -385,8 +405,14 @@ def insert_struct_to_complex(self, struct, complex):
struct_found = True
# search same name structs - if they contain atomics with matching names, they are considered incompatible
for j in range(len(struct.children[1].children)):
for k in range(len(complex.children[i].children[0].children[1].children)):
if self.get_name(struct.children[1].children[j]) == self.get_name(complex.children[i].children[0].children[1].children[k]):
for k in range(
len(complex.children[i].children[0].children[1].children)
):
if self.get_name(
struct.children[1].children[j]
) == self.get_name(
complex.children[i].children[0].children[1].children[k]
):
struct_found = False
break
# if no same name atomic found in the struct, we found the suitable complex's struct
Expand All @@ -399,10 +425,14 @@ def insert_struct_to_complex(self, struct, complex):
complex.children[i] = Tree("agent", [struct])
else:
# if the complex's struct is not empty merge the struct's children into the complex's struct
complex.children[i].children[0].children[1].children += struct.children[1].children
complex.children[i].children[0].children[
1
].children += struct.children[1].children
return complex

raise ComplexParsingError(f"Illegal struct nesting or duplication: {struct}:{complex}", complex)
raise ComplexParsingError(
f"Illegal struct nesting or duplication: {struct}:{complex}", complex
)

def insert_atomic_to_complex(self, atomic, complex):
"""
Expand All @@ -424,15 +454,17 @@ def insert_atomic_to_complex(self, atomic, complex):
if self.is_empty(complex.children[i].children[0]):
complex.children[i] = Tree("agent", [atomic])
return complex
raise ComplexParsingError(f"Illegal atomic nesting or duplication: {atomic}:{complex}", complex)
raise ComplexParsingError(
f"Illegal atomic nesting or duplication: {atomic}:{complex}", complex
)

def get_name(self, agent):
return str(agent.children[0].children[0])

def complex(self, matches):
result = []
for match in matches[0].children:
if match.data == 'value':
if match.data == "value":
result += match.children
else:
result.append(match)
Expand Down Expand Up @@ -540,7 +572,11 @@ def rule(self, matches):
mid = lhs.counter
compartments = lhs.comp + rhs.comp
complexes = lhs.complexes + list(
map(lambda item: (item[0] + lhs.counter, item[1] + lhs.counter), rhs.complexes))
map(
lambda item: (item[0] + lhs.counter, item[1] + lhs.counter),
rhs.complexes,
)
)
pairs = [(i, i + lhs.counter) for i in range(min(lhs.counter, rhs.counter))]
if lhs.counter > rhs.counter:
pairs += [(i, None) for i in range(rhs.counter, lhs.counter)]
Expand All @@ -556,9 +592,17 @@ def rule(self, matches):
pairs += [(None, i + lhs.counter)]

reversible = False
if arrow == '<=>':
if arrow == "<=>":
reversible = True
return reversible, Rule(agents, mid, compartments, complexes, pairs, Rate(rate) if rate else None, label)
return reversible, Rule(
agents,
mid,
compartments,
complexes,
pairs,
Rate(rate) if rate else None,
label,
)

def rules(self, matches):
rules = []
Expand All @@ -569,14 +613,14 @@ def rules(self, matches):
rules.append(reversible_rule)
else:
rules.append(rule)
return {'rules': rules}
return {"rules": rules}

def definitions(self, matches):
result = dict()
for definition in matches[1:]:
pair = definition.children
result[pair[0]] = pair[1]
return {'definitions': result}
return {"definitions": result}

def init(self, matches):
return matches
Expand All @@ -588,7 +632,7 @@ def inits(self, matches):
result[init[1].children[0]] = int(init[0])
else:
result[init[0].children[0]] = 1
return {'inits': result}
return {"inits": result}

def param(self, matches):
self.params.add(str(matches[0]))
Expand All @@ -602,52 +646,61 @@ def model(self, matches):
for match in matches:
if type(match) == dict:
key, value = list(match.items())[0]
if key == 'rules':
if key == "rules":
rules.update(value)
if key == 'inits':
inits = value
if key == 'definitions':
definitions = value
if key == 'regulation':
if key == "inits":
inits.update(value)
if key == "definitions":
definitions.update(value)
if key == "regulation":
regulation = value
elif isinstance(match, Tree) and match.data == 'sections':
elif isinstance(match, Tree) and match.data == "sections":
if isinstance(match.children[0], Tree):
continue
key, value = list(match.children[0].items())[0]
if key == 'rules':
if key == "rules":
rules.update(value)
if key == 'inits':
inits = value
if key == 'definitions':
definitions = value
if key == 'regulation':
regulation = value
if key == "inits":
inits.update(value)
if key == "definitions":
definitions.update(value)
if key == "regulation":
regulation.update(value)
params = self.params - set(definitions)
return Model(rules, inits, definitions, params, regulation)


class Parser:
def __init__(self, start):
grammar = "start: " + start + GRAMMAR + COMPLEX_GRAMMAR + EXTENDED_GRAMMAR + REGULATIONS_GRAMMAR
self.parser = Lark(grammar, parser='lalr',
propagate_positions=False,
maybe_placeholders=False
)
grammar = (
"start: "
+ start
+ GRAMMAR
+ COMPLEX_GRAMMAR
+ EXTENDED_GRAMMAR
+ REGULATIONS_GRAMMAR
)
self.parser = Lark(
grammar, parser="lalr", propagate_positions=False, maybe_placeholders=False
)

self.terminals = dict((v, k) for k, v in _TERMINAL_NAMES.items())
self.terminals.update({"COM": "//",
"ARROW": "=>, <=>",
"POW": "**",
"DOUBLE_COLON": "::",
"RULES_START": "#! rules",
"INITS_START": "#! inits",
"DEFNS_START": "#! definitions",
"COMPLEXES_START": "#! complexes",
"REGULATION_START": "#! regulation",
"CNAME": "name",
"NAME": "agent_name",
"VAR": "?"
})
self.terminals.update(
{
"COM": "//",
"ARROW": "=>, <=>",
"POW": "**",
"DOUBLE_COLON": "::",
"RULES_START": "#! rules",
"INITS_START": "#! inits",
"DEFNS_START": "#! definitions",
"COMPLEXES_START": "#! complexes",
"REGULATION_START": "#! regulation",
"CNAME": "name",
"NAME": "agent_name",
"VAR": "?",
}
)
self.terminals.pop("$END", None)

def replace(self, expected: set) -> set:
Expand Down Expand Up @@ -685,7 +738,9 @@ def transform(self, tree: Tree) -> Result:
try:
complexer = ExtractComplexNames()
tree = complexer.transform(tree)
complexer.complex_defns = remove_nested_complex_aliases(complexer.complex_defns)
complexer.complex_defns = remove_nested_complex_aliases(
complexer.complex_defns
)
de_abstracter = TransformAbstractSyntax(complexer.complex_defns)
tree = de_abstracter.transform(tree)
tree = TreeToComplex().transform(tree)
Expand All @@ -707,11 +762,23 @@ def syntax_check(self, expression: str) -> Result:
try:
tree = self.parser.parse(expression)
except UnexpectedCharacters as u:
return Result(False, {"unexpected": expression[u.pos_in_stream],
"expected": self.replace(u.allowed),
"line": u.line, "column": u.column})
return Result(
False,
{
"unexpected": expression[u.pos_in_stream],
"expected": self.replace(u.allowed),
"line": u.line,
"column": u.column,
},
)
except UnexpectedToken as u:
return Result(False, {"unexpected": str(u.token),
"expected": self.replace(u.expected),
"line": u.line, "column": u.column})
return Result(
False,
{
"unexpected": str(u.token),
"expected": self.replace(u.expected),
"line": u.line,
"column": u.column,
},
)
return Result(True, tree)

0 comments on commit 8546c4e

Please sign in to comment.