-
Notifications
You must be signed in to change notification settings - Fork 0
/
agent.py
357 lines (288 loc) · 14 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
import IPython
from typing import List, Dict, Any, Optional
import pandas as pd
from recordclass import recordclass
import os
from time import sleep
from navigator import Navigator
from utils import flist, bcolors
LearningInfo = recordclass('LearningInfo', 'score prob value action index possible_actions')
class HAgent:
def __init__(self, device, model, item_scorer, navigation_model, hcp=4):
self.device = device
self.cmd_memory = flist()
self.item_scorer = item_scorer
self.navigator = Navigator(navigation_model)
self.utils = None
self.hcp = hcp
self.step_count = 0
self.total_score = 0
self.current_score = 0
self.recipe = ''
self.reading = False
self.model = model
self.description = 'nothing'
self.description_updated = True
self.inventory = 'nothing'
self.inventory_updated = False
self.info = None
def step(self, observation, info: Dict[str, Any], detailed_commands=False):
self.info = info
self.reading = 'and start reading' in observation
# retrieve the information about the inventory, description, recipe and location (different approaches for different HCPs)
self.inventory, self.description = self._get_inventory_and_description(observation, info)
inventory = [self.remove_articles(inv.strip()) for inv in self.inventory.strip().split('\n') if not 'carrying' in inv]
self.recipe = self._get_recipe(observation)
location = Navigator.extract_location(self.description)
nav_commands = self.navigator.get_navigational_commands(self.description)
items = None
if self._know_recipe():
items, utils = self.item_scorer(recipe=self.recipe,
inventory=self.inventory)
# update the needed utils
self._update_util_locations(self.description, utils, location)
state_description = self.build_state_description(self.description, items, location, observation, inventory)
possible_commands = self.get_commands(self.description, items, location, inventory, nav_commands)
score, prob, value, high_level_command, index = self.model(state_description, possible_commands)
cmds = flist()
cmds.append(self.command_to_action(command=high_level_command,
items=items,
inventory=inventory,
description=self.description))
learning_info = LearningInfo(score=score,
prob=prob,
value=value,
action=high_level_command,
index=index,
possible_actions=possible_commands)
self.reading = (high_level_command == 'examine cookbook')
self.step_count += 1
self.cmd_memory.append(high_level_command)
if detailed_commands:
hl2ll = {hl_cmd: self.command_to_action(command=hl_cmd,
items=items,
inventory=inventory,
description=self.description)
for hl_cmd in possible_commands}
return cmds, learning_info, hl2ll
return cmds, learning_info
def change_last_cmd(self, cmd):
self.cmd_memory[-1] = cmd
def _get_inventory_and_description(self, observation, info):
if self.hcp > 0:
# for hcp > 0 the inventory and description is in info
description = info['description']
inventory = info['inventory']
else:
# for hcp == 0 the information needs to be extracted (if possible) from the observation
description = self._description_from_observation(observation)
inventory = self._inventory_from_observation(observation)
return inventory, description
def _description_from_observation(self, observation):
if '-=' and '=-' in observation:
description = '-= ' + observation.split('-= ')[1]
self.description_updated = True
else:
description = self.description
self.description_updated = False
return description
def _inventory_from_observation(self, observation):
if 'You are carrying' in observation:
inventory = observation
self.inventory_updated = True
else:
inventory = self.inventory
self.inventory_updated = False
return inventory
def _update_util_locations(self, description, utils, location):
if self.utils is None and utils is not None:
self.utils = {u: None for u in utils}
for util, loc in self.utils.items():
if loc is not None:
continue
if util in description:
self.utils[util] = location
def update_score(self, new_total_score):
self.current_score = new_total_score - self.total_score
self.total_score = new_total_score
def _get_recipe(self, observation, explicit_recipe=None):
recipe = ''
if self.recipe == '':
if explicit_recipe is not None:
recipe = explicit_recipe
else:
if self.reading:
recipe = '\nRecipe {}\n'.format(observation.split('\n\nRecipe ')[1].strip())
else:
recipe = self.recipe
return recipe
def _know_recipe(self):
return self.recipe != ''
def command_to_action(self, command, items, inventory, description):
if command == 'drop unnecessary items':
cmd = self.drop_unnecessary_items(items, inventory)
# elif command == 'explore':
# cmd = self.navigator.explore(description)
elif command == 'take required items from here':
cmd = self.take_all_required_items(items, description)
elif command == 'open stuff':
cmd = ['open fridge']
if self.hcp == 0:
cmd += ['look']
# elif 'go to' in command:
# cmd = self.navigator.go_to(place=command.split('go to ')[1])
elif 'prepare meal' in command:
cmd = [command]
if self.hcp == 0:
cmd += ['inventory']
elif 'with' in command:
cmd = [command]
if self.hcp == 0:
cmd += ['inventory']
else:
cmd = [command]
if len(cmd) == 0:
cmd = ['look']
return cmd
def get_commands(self, description, items, location, inventory, nav_commands):
"""
Builds a list of possible commands based on the current game state and the hcp of the agent.
"""
if self.hcp == 5:
raise NotImplementedError('HCP 5 not supported anymore')
elif self.hcp == 4:
pass
# return self._get_commands_hcp4(description, items, location, inventory)
elif self.hcp >= 1:
pass
# return self._get_commands_hcp3(description, items, location, inventory)
else:
return self._get_commands_hcp0(description, items, location, inventory, nav_commands)
def _get_commands_hcp0(self, description, items, location, inventory, nav_commands):
cmds = self._get_commands_hcp3(description, items, location, inventory)
# for hcp 0 we need to add the look and inventory command.
if not self.description_updated:
cmds += ['look']
if not self.inventory_updated:
cmds += ['inventory']
cmds += nav_commands
return cmds
def _get_commands_hcp3(self, description, items, location, inventory):
"""
HCP 3 has the same commands as hcp4 as soon as it found the cookbook.
"""
if self._know_recipe():
return self._get_commands_hcp4(description, items, location, inventory)
cmds = []
# cmds = ['explore']
if 'cookbook' in description:
cmds.append('examine cookbook')
# open fridge command
if 'fridge' in description:
cmds.append('open stuff')
# if location != 'Kitchen' and 'Kitchen' in self.navigator.graph.keys():
# cmds.append('go to Kitchen')
return cmds
def _get_commands_hcp4(self, description, items, location, inventory):
def get_drop_cmds(items, inventory):
cmds = []
for inv_item in inventory:
for item in list(items.item):
if item in inv_item:
cmds.append('drop {}'.format(item))
continue
return cmds
standard_cmds = ['drop unnecessary items',
# 'explore',
'take required items from here']
drop_cmds = get_drop_cmds(items, inventory)
pickup_util_cmds = ['take {}'.format(util) for util in self.utils.keys() if util in description and util == 'knife']
drop_util_cmds = ['drop {}'.format(util) for util in self.utils.keys() if util in inventory]
recipe_step_cmds = [cmd for sublist in [item['recipe_steps'] for _, item in items.iterrows() if item['already_in_inventory']] for cmd in sublist]
recipe_step_cmds = [cmd for cmd in recipe_step_cmds if cmd.split('with ')[1] in self.utils and self.utils[cmd.split('with ')[1]] == location]
if 'fridge' in description:
recipe_step_cmds.append('open stuff')
finishing_cmds = []
if 'meal' in inventory:
finishing_cmds.append('eat meal')
elif len([item for sublist in list(items.recipe_steps) for item in sublist]) == 0 and location.lower() == 'kitchen':
finishing_cmds.append('prepare meal')
return standard_cmds + drop_cmds + pickup_util_cmds + drop_util_cmds + recipe_step_cmds + finishing_cmds
def take_all_required_items(self, items, description):
"""
List of take commands for all the ingredients necessary (specified by neural model) that are present in current location.
"""
return ['take {}'.format(item) for (item, already_in_inventory) in zip(items['item'], items['already_in_inventory']) if item in description and not already_in_inventory]
def drop_unnecessary_items(self, items, inventory):
"""
List of drop commands for all the unnecessary ingredients currently carried (specified by neural model).
"""
cmds = []
for carried_item in inventory:
if not any([item in carried_item for item in list(items.item)]):
cmds.append('drop {}'.format(carried_item))
return cmds
def remove_articles(self, item):
return item.replace('an ', '').replace('a ', '').replace('the ', '').strip()
### Input features construction
def build_state_description(self, description, items, location, observation, inventory):
"""
Builds the string representation of the current state of the game. The state has 8 'features' that all are
arbitrarily long strings. Some features come directly from the agent's observation, e.g. 'observation', 'description',
'location'. Others are constructed using the output of the neural item scorer model, e.g. 'missing itens',
'required utils'.
"""
state_description = {
'observation': observation.split('$$$$$$$')[-1].replace('\n\n', ' ').replace('\n', ' ').strip(),
'missing_items': self._get_missing_items(items),
'unnecessary_items': self._get_unnecessary_items(items, inventory),
'location': location,
'description': self._get_description(description),
'previous_cmds': self._get_previous_cmds(length=10),
'required_utils': self._get_required_utils(items),
'discovered_locations': self._get_discovered_locations(),
}
for key, descr in state_description.items():
state_description[key] = ' '.join([word.lower() if word not in ['<SEP>', '<DIR>'] else word for word in
descr.replace('.', '').replace(',', '').replace('?', '').replace('!', '').replace(':','').replace(' ', ' ').strip().split()])
return state_description
def _get_discovered_locations(self):
# locations = list(self.navigator.graph.keys())
locations = self.navigator.discovered_locations
if len(locations) == 0:
return 'nothing'
return ' <SEP> '.join(locations)
def _get_required_utils(self, items):
if items is None:
return 'not determined yet'
utils = ['{} not found'.format(util) if location is None else '{} in {}'.format(util, location) for util, location in self.utils.items()]
if len(utils) == 0:
return 'nothing'
return ' <SEP> '.join(utils)
def _get_previous_cmds(self, length):
cmds = self.cmd_memory[::-1][:length]
if len(cmds) == 0:
return 'nothing'
return ' <SEP> '.join(cmds)
def _get_description(self, description):
return description.replace('\n\n\n\n', ' ').replace('\n', ' ').strip()
def _get_missing_items(self, items):
if items is None:
return 'not determined yet'
descr = []
for _, item in items.iterrows():
if not item.already_in_inventory:
descr.append(' <DIR> '.join([item['item']] + item.recipe_steps))
if len(descr) == 0:
return 'nothing'
return ' <SEP> '.join(descr)
def _get_unnecessary_items(self, items, inventory):
if items is None:
return 'not determined yet'
unnecessary_items = []
for carried_item in inventory:
if not any([item in carried_item for item in list(items.item)]):
unnecessary_items.append(carried_item)
if len(unnecessary_items) == 0:
return 'nothing'
return ' <SEP> '.join(unnecessary_items)