-
Notifications
You must be signed in to change notification settings - Fork 37
/
planner.py
127 lines (113 loc) · 4.54 KB
/
planner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
""" Task and Motion Planning tools using PDDLStream. """
import os
from pddlstream.algorithms.focused import solve_adaptive
from pddlstream.language.constants import And, PDDLProblem
from pddlstream.utils import read
from .utils import (
get_default_stream_info_fn,
get_default_stream_map_fn,
process_goal_specification,
world_to_pddlstream_init,
pddlstream_solution_to_plan,
)
class PDDLStreamPlanner:
"""Task and Motion Planner using PDDLStream."""
def __init__(
self,
world,
domain_folder,
stream_map_fn=get_default_stream_map_fn(),
stream_info_fn=get_default_stream_info_fn(),
):
"""
Creates a new PDDLStream based planner.
:param world: World object to use for planning.
:type world: :class:`pyrobosim.core.world.World`
:param domain_folder: Path to folder containing PDDL domain and streams
:type domain_folder: str
:param stream_map_fn: Function that accepts a World and Robot object and returns a dictionary of stream mappings.
:type stream_map_fn: function
:param stream_info_fn: Function that returns a dictionary of stream information.
:type stream_info_fn: function
"""
# Set the world model
self.world = world
# Planning configuration parameters
domain_pddl_file = os.path.join(domain_folder, "domain.pddl")
self.domain_pddl = read(domain_pddl_file)
stream_pddl_file = os.path.join(domain_folder, "streams.pddl")
self.stream_pddl = read(stream_pddl_file)
self.stream_map_fn = stream_map_fn
self.stream_info_fn = stream_info_fn
# Bookkeeping variables
self.latest_specification = None
self.latest_plan = None
def plan(
self,
robot,
goal_literals,
max_attempts=1,
verbose=False,
**planner_config,
):
r"""
Searches for a set of actions that completes a goal specification
given an initial state of the world.
This uses the "adaptive" planner in PDDLStream, which demonstrates the best performance
for most problems.
:param robot: Robot to use for planning.
:type robot: :class:`pyrobosim.core.robot.Robot`
:param goal_literals: List of literals describing a goal specification.
:type goal_literals: list[tuple]
:param max_attempts: Maximum planning attempts.
:type max_attempts: int, optional
:param verbose: If True, prints additional information. Defaults to False.
:type verbose: bool, optional
:param \*\*planner_config: Additional keyword arguments to pass to the PDDLStream planner.
:return: A task plan object ready to use with ``pyrobosim``.
:rtype: :class:`pyrobosim.planning.actions.TaskPlan`
"""
# Set the initial and goal states for PDDLStream
init = world_to_pddlstream_init(self.world, robot)
process_goal_specification(goal_literals, self.world)
goal = And(*goal_literals)
self.latest_specification = goal
# Set up the PDDL problem.
# The ``get_stream_map()`` function comes from the ``mappings.py``
# file, so as you add new functionality you should fill it out there.
external_pddl = [self.stream_pddl]
constant_map = {}
prob = PDDLProblem(
self.domain_pddl,
constant_map,
external_pddl,
self.stream_map_fn(self.world, robot),
init,
goal,
)
for i in range(max_attempts):
# Solve the problem using the "adaptive" PDDLStream algorithm.
solution = solve_adaptive(
prob,
stream_info=self.stream_info_fn(),
verbose=verbose,
**planner_config,
)
# If the solution is valid, no need to try again
# TODO: Could later consider an option to execute all the attempts
# and take the minimum-cost plan from that batch.
plan_out = pddlstream_solution_to_plan(solution, robot.name)
if plan_out is not None:
break
# Convert the solution to a TaskPlan object.
self.latest_plan = plan_out
if verbose:
print("\nInitial conditions:")
for i in init:
print(f"\t{i}")
print("\nGoal Specification:")
for g in goal_literals:
print(f"\t{g}")
print("")
print(self.latest_plan)
return self.latest_plan