Source code for concepts.benchmark.gridworld.crafting_world.crafting_world_env

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : crafting_world_env.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 09/23/2023
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import numpy as np
import os.path as osp
from PIL import Image

from concepts.benchmark.gridworld.crafting_world.crafting_world_rules import MINING_RULES, CRAFTING_RULES, get_all_locations
from concepts.benchmark.gridworld.crafting_world.utils import underline_to_pascal

SKIP_CRAFTING_LOCATION_CHECK = True

__all__ = ['CraftingWorldSimulator', 'CraftingWorldRenderer']


[docs] def set_skip_crafting_location_check(value: bool = True): global SKIP_CRAFTING_LOCATION_CHECK SKIP_CRAFTING_LOCATION_CHECK = value
[docs] class CraftingWorldSimulator(object):
[docs] def __init__(self): self.nr_grids = 15 self.nr_inventory = 3 self.agent_pos = 1 self.objects = dict() # str: (str, int), name: (type, pos) self.inventory = dict() # int: Optional[Tuple[str, str]] # (type, name) self.hypothetical = set() # str
[docs] def reset_from_pds_state(self, objects, state): agent_at = list(state['agent-at'])[0][0] self.agent_pos = int(agent_at[1:]) self.nr_grids = len(objects['tile']) self.nr_inventory = nr_inventory = len(objects['inventory']) self.objects = dict() self.inventory = {i: None for i in range(1, 1 + nr_inventory)} for obj_name, obj_loc in state.get('object-at', []): obj_type = None for obj_name2, obj_type2 in state['object-of-type']: if obj_name2 == obj_name: obj_type = obj_type2 break assert obj_type is not None self.objects[obj_name] = (obj_type, int(obj_loc[1:])) for inv_id, obj_name in state.get('inventory-holding', []): obj_type = None for obj_name2, obj_type2 in state['object-of-type']: if obj_name2 == obj_name: obj_type = obj_type2 assert obj_type is not None self.inventory[int(inv_id[1:])] = (obj_type, obj_name) for obj_name, obj_type in state['object-of-type']: if obj_type == 'Hypothetical': self.hypothetical.add(obj_name)
[docs] def reset_from_crow_state(self, objects, state): raise NotImplementedError()
[docs] def move_to(self, pos): self.agent_pos = max(1, min(self.nr_grids, pos)) return True
[docs] def move_left(self): self.agent_pos = max(1, self.agent_pos - 1) return True
[docs] def move_right(self): self.agent_pos = min(self.nr_grids, self.agent_pos + 1) return True
[docs] def pick_up(self, inventory, obj_name): if self.inventory[inventory] is not None: return False if self.objects[obj_name][1] != self.agent_pos: return False self.inventory[inventory] = self.objects[obj_name][0], obj_name del self.objects[obj_name] return True
[docs] def place_down(self, inventory): if self.inventory[inventory] is None: return False obj_type, obj_name = self.inventory[inventory] self.objects[obj_name] = obj_type, self.agent_pos
[docs] def mine(self, obj_name, inventory, hypothetical_object_name, tool_inventory=None): if self.objects[obj_name][1] != self.agent_pos: return False if self.inventory[inventory] is not None: return False if hypothetical_object_name not in self.hypothetical: return False if tool_inventory is not None and self.inventory[tool_inventory] is None: return False obj_type, _ = self.objects[obj_name] for rule in MINING_RULES: if underline_to_pascal(rule['location']) == obj_type: if tool_inventory is None: if len(rule['holding']) == 0: new_obj_type = underline_to_pascal(rule['create']) self.inventory[inventory] = (new_obj_type, hypothetical_object_name) self.hypothetical.remove(hypothetical_object_name) return True else: tool_type, _ = self.inventory[tool_inventory] if len(rule['holding']) == 0 or (len(rule['holding']) == 1 and underline_to_pascal(rule['holding'][0]) == tool_type): new_obj_type = underline_to_pascal(rule['create']) self.inventory[inventory] = (new_obj_type, hypothetical_object_name) self.hypothetical.remove(hypothetical_object_name) return True return False
[docs] def craft(self, obj_name, inventory, hypothetical_object_name, ingredients_inventory, target_type=None): if SKIP_CRAFTING_LOCATION_CHECK: if self.objects[obj_name][1] != self.agent_pos: return False if self.inventory[inventory] is not None: return False if hypothetical_object_name not in self.hypothetical: return False for ingredient_inventory in ingredients_inventory: if self.inventory[ingredient_inventory] is None: return False obj_type, _ = self.objects[obj_name] # print('Checking crafting', inventory, hypothetical_object_name, target_type, ingredients_inventory) for rule in CRAFTING_RULES: if target_type is not None and underline_to_pascal(rule['create']) != target_type: continue # print(' checking crafting rule', rule['location'], rule['recipe'], rule['create']) # if not SKIP_CRAFTING_LOCATION_CHECK: # print(f' matching crafting location', underline_to_pascal(rule['location']), obj_type) if underline_to_pascal(rule['location']) == obj_type or SKIP_CRAFTING_LOCATION_CHECK: if len(rule['recipe']) == len(ingredients_inventory): current_holding_types = set() for ingredient_inventory in ingredients_inventory: ingredient_type, _ = self.inventory[ingredient_inventory] current_holding_types.add(ingredient_type) target_holding_types = set() for ingredient_type in rule['recipe']: target_holding_types.add(underline_to_pascal(ingredient_type)) # print(f' matching crafting recipe current={current_holding_types}, target={target_holding_types}') if current_holding_types == target_holding_types: new_obj_type = underline_to_pascal(rule['create']) self.inventory[inventory] = (new_obj_type, hypothetical_object_name) self.hypothetical.remove(hypothetical_object_name) for ingredient_inventory in ingredients_inventory: self.hypothetical.add(self.inventory[ingredient_inventory][1]) self.inventory[ingredient_inventory] = None return True return False
[docs] class CraftingWorldRenderer(object):
[docs] def __init__(self, map_w: int, map_h: int, max_inventory: int): self._map_w = map_w self._map_h = map_h self.basic_canvas = np.zeros((map_h * 17 + 1, map_w * 17 + 1, 3), dtype=np.uint8) + 255 self._inventory_h = int(max_inventory / map_w) + (1 if max_inventory % map_w != 0 else 0) self.inventory_canvas = np.zeros((self._inventory_h * 17 + 1, map_w * 17 + 1, 3), dtype=np.uint8) + 255 self._init_basic_canvas() self._block_images = dict() self._init_resource_png()
def _init_basic_canvas(self): # Draw grid for i in range(self._map_w + 1): self.basic_canvas[:, i * 17, :] = 128 for i in range(self._map_h + 1): self.basic_canvas[i * 17, :, :] = 128 # Draw grid for the inventory for i in range(self._inventory_h + 1): self.inventory_canvas[i * 17, :, :] = 128 for i in range(self._map_w + 1): self.inventory_canvas[:, i * 17, :] = 128 def _init_resource_png(self): filename = osp.join(osp.dirname(__file__), 'assets', 'BlockCSS.png') block_image = np.array(Image.open(filename)) filename = osp.join(osp.dirname(__file__), 'assets', 'ItemCSS.png') item_image = np.array(Image.open(filename)) filename = osp.join(osp.dirname(__file__), 'assets', 'EntityCSS.png') entity_image = np.array(Image.open(filename)) def extract(image, x, y): return image[y:y+16, x:x+16, :] def make_small_item_image(image): item_image = np.zeros_like(image) # item_image[8:8+8, :8, :] = image[::2, ::2, :] item_image[8:8+8, :8, :3] = image[::2, ::2, :3] * (image[::2, ::2, 3:4] / 255) + 255 * (1 - image[::2, ::2, 3:4] / 255) item_image[8:8+8, :8, 3] = 255 # add a small border item_image[8:8+8, 0, :] = 128 item_image[8:8+8, 7, :] = 128 item_image[8, :8, :] = 128 item_image[15, :8, :] = 128 return item_image def make_small_villager_image(image): villager_image = np.zeros((image.shape[0], image.shape[1], 4)) villager_image[8:8+8, 8:8+8, :3] = image[::2, ::2, :] villager_image[8:8+8, 8:8+8, 3] = (image[::2, ::2].mean(axis=-1) < 255-32) * 255 return villager_image self._block_images['GoldOreVein'] = extract(block_image, 752, 448) self._block_images['CoalOreVein'] = extract(block_image, 704, 448) self._block_images['IronOreVein'] = extract(block_image, 0, 464) self._block_images['CobblestoneStash'] = extract(block_image, 32, 416) self._block_images['BeetrootCrop'] = extract(item_image, 96, 480) self._block_images['Chicken'] = extract(entity_image, 32, 432) self._block_images['Sheep'] = extract(entity_image, 144, 432) self._block_images['Tree'] = np.array(Image.open(osp.join(osp.dirname(__file__), 'assets', 'Jungle_Tree.png')).resize((16, 16))) self._block_images['SugarCanePlant'] = extract(block_image, 432, 576) self._block_images['PotatoPlant'] = extract(block_image, 304, 576) self._block_images['WorkStation'] = extract(block_image, 0, 512) self._block_images['Furnace'] = extract(block_image, 464, 464) self._block_images['Coal'] = extract(item_image, 176, 544) self._block_images['Coal/Small'] = make_small_item_image(self._block_images['Coal']) self._block_images['Feather'] = extract(item_image, 240, 544) self._block_images['Feather/Small'] = make_small_item_image(self._block_images['Feather']) self._block_images['Axe'] = extract(item_image, 160, 464) # Stone Axe self._block_images['Axe/Small'] = make_small_item_image(self._block_images['Axe']) self._block_images['Pickaxe'] = extract(item_image, 192, 464) # Stone Pickaxe self._block_images['Pickaxe/Small'] = make_small_item_image(self._block_images['Pickaxe']) self._block_images['SugarCane'] = extract(item_image, 176, 576) self._block_images['SugarCane/Small'] = make_small_item_image(self._block_images['SugarCane']) self._block_images['Potato'] = extract(item_image, 224, 496) self._block_images['Potato/Small'] = make_small_item_image(self._block_images['Potato']) self._block_images['CookedPotato'] = extract(item_image, 64, 480) self._block_images['CookedPotato/Small'] = make_small_item_image(self._block_images['CookedPotato']) self._block_images['Beetroot'] = extract(item_image, 96, 480) self._block_images['Beetroot/Small'] = make_small_item_image(self._block_images['Beetroot']) self._block_images['BeetrootSoup'] = extract(item_image, 112, 480) self._block_images['BeetrootSoup/Small'] = make_small_item_image(self._block_images['BeetrootSoup']) self._block_images['Bed'] = extract(block_image, 208, 528) self._block_images['Bed/Small'] = make_small_item_image(self._block_images['Bed']) self._block_images['IronOre'] = extract(item_image, 0, 880) self._block_images['IronOre/Small'] = make_small_item_image(self._block_images['IronOre']) self._block_images['IronIngot'] = extract(item_image, 112, 560) self._block_images['IronIngot/Small'] = make_small_item_image(self._block_images['IronIngot']) self._block_images['GoldOre'] = extract(item_image, 240, 864) self._block_images['GoldOre/Small'] = make_small_item_image(self._block_images['GoldOre']) self._block_images['GoldIngot'] = extract(item_image, 48, 560) self._block_images['GoldIngot/Small'] = make_small_item_image(self._block_images['GoldIngot']) self._block_images['Cobblestone'] = extract(item_image, 240, 800) # actually Netherite Ingot self._block_images['Cobblestone/Small'] = make_small_item_image(self._block_images['Cobblestone']) self._block_images['Sword'] = extract(item_image, 0, 448) # Iron Sword self._block_images['Sword/Small'] = make_small_item_image(self._block_images['Sword']) self._block_images['Shears'] = extract(item_image, 144, 464) self._block_images['Shears/Small'] = make_small_item_image(self._block_images['Shears']) self._block_images['Stick'] = extract(item_image, 80, 48) self._block_images['Stick/Small'] = make_small_item_image(self._block_images['Stick']) self._block_images['Boat'] = extract(item_image, 112, 592) # Oak Boat self._block_images['Boat/Small'] = make_small_item_image(self._block_images['Boat']) self._block_images['Bowl'] = extract(item_image, 208, 528) self._block_images['Bowl/Small'] = make_small_item_image(self._block_images['Bowl']) self._block_images['Wood'] = extract(block_image, 352, 416) # Oak Log self._block_images['Wood/Small'] = make_small_item_image(self._block_images['Wood']) self._block_images['WoodPlank'] = extract(block_image, 224, 464) # Oak Planks self._block_images['WoodPlank/Small'] = make_small_item_image(self._block_images['WoodPlank']) self._block_images['Wool'] = extract(block_image, 0, 32) # White Wool self._block_images['Wool/Small'] = make_small_item_image(self._block_images['Wool']) self._block_images['Arrow'] = extract(item_image, 80, 32) self._block_images['Arrow/Small'] = make_small_item_image(self._block_images['Arrow']) self._block_images['Paper'] = extract(item_image, 96, 544) self._block_images['Paper/Small'] = make_small_item_image(self._block_images['Paper']) self._block_images['agent'] = np.array(Image.open(osp.join(osp.dirname(__file__), 'assets', 'Plains_Villager_Base.png')).resize((16, 16))) self._block_images['agent/Small'] = make_small_villager_image(self._block_images['agent'])
[docs] def render(self, simulator: CraftingWorldSimulator) -> np.ndarray: """Render the current state of the simulator. Args: simulator: the simulator. The function will read out the current state of the simulator. Returns: the rendered image. """ map_canvas = self.basic_canvas.copy() inventory_canvas = self.inventory_canvas.copy() def draw_primitive(canvas, obj_pos, image, mask=None): obj_x = obj_pos % self._map_w obj_y = int(obj_pos / self._map_w) if isinstance(image, np.ndarray) and image.shape[2] == 4: image, mask = image[:, :, :3], image[:, :, 3] if mask is not None: canvas[obj_y * 17 + 1: (obj_y + 1) * 17, obj_x * 17 + 1: (obj_x + 1) * 17, :] = np.clip(( image * (mask[:, :, None] / 255) + canvas[obj_y * 17 + 1: (obj_y + 1) * 17, obj_x * 17 + 1: (obj_x + 1) * 17, :] * (1 - mask[:, :, None] / 255) ), 0, 255).astype('uint8') else: canvas[obj_y * 17 + 1: (obj_y + 1) * 17, obj_x * 17 + 1: (obj_x + 1) * 17, :] = image def draw(canvas, obj_name, obj_type, obj_pos, use_small=True): if obj_pos > self._map_w * self._map_h: raise ValueError(f'Object {obj_name} is out of bounds') obj_pos = obj_pos - 1 if obj_type.endswith('Station'): obj_type = 'WorkStation' if use_small and obj_type + '/Small' in self._block_images: image = self._block_images[obj_type + '/Small'] elif obj_type in self._block_images: image = self._block_images[obj_type] else: print(f'Warning: unknown object type {obj_type}.') image = 0 draw_primitive(canvas, obj_pos, image) # Draw objects location_types = get_all_locations() for obj_name, (obj_type, obj_pos) in simulator.objects.items(): if obj_type in location_types: draw(map_canvas, obj_name, obj_type, obj_pos) for obj_name, (obj_type, obj_pos) in simulator.objects.items(): if obj_type not in location_types: draw(map_canvas, obj_name, obj_type, obj_pos) for i, content in simulator.inventory.items(): if content is not None: obj_type, obj_name = content draw(inventory_canvas, obj_name, obj_type, i, use_small=False) # Highlight the border of the agent position agent_x = (simulator.agent_pos - 1) % self._map_w agent_y = int((simulator.agent_pos - 1) / self._map_w) map_canvas[agent_y * 17: (agent_y + 1) * 17 + 1, agent_x * 17, :] = (29, 209, 77) map_canvas[agent_y * 17: (agent_y + 1) * 17 + 1, (agent_x + 1) * 17, :] = (29, 209, 77) map_canvas[agent_y * 17, agent_x * 17: (agent_x + 1) * 17 + 1, :] = (29, 209, 77) map_canvas[(agent_y + 1) * 17, agent_x * 17: (agent_x + 1) * 17 + 1, :] = (29, 209, 77) draw_primitive(map_canvas, simulator.agent_pos - 1, self._block_images['agent/Small']) # Concatenate the two canvases canvas = np.concatenate((map_canvas, np.zeros((8, map_canvas.shape[1], 3), dtype='uint8') + 255, inventory_canvas), axis=0) return canvas