SeAIPalette/Palette/env/engine.py

563 lines
24 KiB
Python

import pymunk
import pymunk.pygame_util
import pygame
import os
import numpy as np
import math
from copy import deepcopy
from Palette.env.utils import process_angle, asincos
from typing import List, Tuple, Union
from pygame.color import THECOLORS
from pymunk.vec2d import Vec2d
from Palette.env.collide_types import COLLISION_DESTINATION, COLLISION_OBSTACLE, COLLISION_SHIP
from Palette.constants import UP, DOWN, LEFT, RIGHT, FINISHED, BROKEN, \
FILLING_LEFT, FILLING_RIGHT, FILLING_UP, FILLING_DOWN, FILLING, FILLING_BACK,\
COLORS
class SeaEnvEngine(object):
def __init__(
self,
window_width: int,
window_height: int,
field_size: Tuple[int],
ship_radius: int,
ship_velocity: int,
sonar_spread_times: int, # the number of sonar detection points
obstacles: dict,
show_sensors: bool = False,
draw_screen: bool = True,
show_fields: bool = True,
init_ships: bool = True,
start_position=None, # [start_x, start_y]
start_r=None,
time_click: float = 1.0/10.0,
filling_time: int = 1,
cover_ratio: float = 1.0,
actual_background: bool = False,
actual_around_area: int = 2,
*args,
**kwargs
) -> None:
super().__init__()
self.max_angle, self.min_angle = None, None
# basic params
self.crashed = False # whether or not crash into an obstacle or bound
self.finished = False
self.terminate = False
self.show_sensors = show_sensors
self.draw_screen = draw_screen
self.show_fields = show_fields
self.num_steps = 0
self.window_width = window_width
self.window_height = window_height
self.sonar_spread_times = sonar_spread_times
self.ship_radius = ship_radius
self.time_click = time_click
self.filling_time = filling_time
self.cover_ratio = cover_ratio
self.actual_background = actual_background
self.actual_around_area = actual_around_area
""" 1. init pygame """
if self.draw_screen:
pygame.init()
self.screen = pygame.display.set_mode(
(window_width, window_height))
self.clock = pygame.time.Clock()
self.screen.set_alpha(None)
pygame.display.set_caption('NODE Simulation')
self.draw_options = pymunk.pygame_util.DrawOptions(self.screen)
if not actual_background:
self.ss_img = pygame.image.load(
os.path.join("pics", "sea.jpg"))
else:
self.ss_img = pygame.image.load(
os.path.join("pics", 'sea.png'))
self.ss_img = pygame.transform.scale(
self.ss_img, (self.window_width, self.window_height))
self.ship_img = pygame.image.load(os.path.join("pics", "ship.png"))
self.ship_img = pygame.transform.scale(
self.ship_img, (self.ship_radius * 4, self.ship_radius * 4))
""" 2. init gymunk """
# physics stuff
self.space = pymunk.Space()
self.space.gravity = pymunk.Vec2d(0., 0.)
# create bounds
# self.create_bounds(window_height, window_width)
# create ship
self.finished_idx = None
self.init_ships = init_ships
self.battery_capacity = None
self._ship_batteries = None
self._ship_charging = None
self._ship_charging_positions = None
self._ship_charging_time_left = None
if self.init_ships:
self.create_ship(start_position=start_position,
start_r=start_r)
self.ship_velocity = ship_velocity
assert self.window_width % field_size[0] == 0 and self.window_height % field_size[1] == 0
self.field_size = field_size
self.fields = np.zeros(
shape=(self.window_width//field_size[0], self.window_height//field_size[1]))
# create obstacles
self.obstacles = []
self.obstacles_w_h = []
for obstacle_key in obstacles.keys():
x, y, width, height = obstacles[obstacle_key]
cur_obstacle = self.create_rectangle_obstacle(x, y, width, height)
self.obstacles.append(cur_obstacle)
self.obstacles_w_h.append((width, height))
# label the position of obstacles as -1
for i in range(x//self.field_size[0], (x+width)//self.field_size[0]):
for j in range(y//self.field_size[1], (y+height)//self.field_size[1]):
self.fields[i, self.fields.shape[1]-j-1] = -1
if self.actual_background:
for i in range(
max(0, x//self.field_size[0]-self.actual_around_area),
min((x+width)//self.field_size[0] + self.actual_around_area,
self.fields.shape[0])):
for j in range(
max(0, y//self.field_size[1]-self.actual_around_area),
min((y+height)//self.field_size[1]+self.actual_around_area,
self.fields.shape[1])):
if self.fields[i, self.fields.shape[1]-j-1] != -1:
self.fields[i, self.fields.shape[1]-j-1] = 1
"""
self.obstacles_radius = []
for _ in range(n_obstacles):
cur_radius = np.random.randint(
obstacle_radius_bound[0], obstacle_radius_bound[1])
cur_init_x = np.random.randint(cur_radius, window_width-cur_radius)
cur_init_y = np.random.randint(
cur_radius, window_height-cur_radius)
cur_obstacle = self.create_obstacle(
init_x=cur_init_x, init_y=cur_init_y, obstacle_radius=cur_radius)
self.obstacles.append(cur_obstacle)
self.obstacles_radius.append(cur_radius)
"""
self.setup_collision_handler()
@property
def splited_areas(self):
return deepcopy(self._splited_areas)
@splited_areas.setter
def splited_areas(self, splited_areas):
self._splited_areas = deepcopy(splited_areas)
@property
def ship_num(self):
return len(self.ship_body)
def setup_collision_handler(self):
def post_solve_ship_obstacle(arbiter, space, data):
self.crashed = True
self.space.add_collision_handler(
COLLISION_SHIP, COLLISION_OBSTACLE).post_solve = post_solve_ship_obstacle
def create_bounds(self, window_height: float, window_width: float):
static = [
pymunk.Segment(
self.space.static_body,
(0, self.window_height-1), (0, self.window_height-window_height), 1),
pymunk.Segment(
self.space.static_body,
(1, self.window_height-window_height), (window_width, self.window_height-window_height), 1),
pymunk.Segment(
self.space.static_body,
(window_width-1, self.window_height-window_height), (window_width-1, self.window_height-1), 1),
pymunk.Segment(
self.space.static_body,
(1, self.window_height-1), (window_width, self.window_height-1), 1)
]
for s in static:
s.friction = 1.
s.group = 1
# bound is the same as obstacle for collision
s.collision_type = COLLISION_OBSTACLE
s.color = THECOLORS['blue']
self.space.add(*static)
def create_ship(self, start_position, start_r: float, battery_capacity: int):
self.ship_body, self.ship_shape = [], []
for init_xy, init_r in zip(start_position, start_r):
init_x, init_y = init_xy
inertia = pymunk.moment_for_circle(
mass=1, inner_radius=0, outer_radius=self.ship_radius, offset=(0, 0))
self.ship_body.append(pymunk.Body(1, inertia))
self.ship_body[-1].position = init_x, self.window_height - init_y
self.ship_shape.append(pymunk.Circle(
body=self.ship_body[-1], radius=self.ship_radius, offset=(0, 0)))
self.ship_shape[-1].color = THECOLORS["green"]
self.ship_shape[-1].elasticity = 1.0
self.ship_body[-1].angle = init_r
self.ship_shape[-1].collision_type = COLLISION_SHIP
self.space.add(self.ship_body[-1], self.ship_shape[-1])
self.init_ships = True
self.finished_idx = [False for _ in range(self.ship_num)]
self.battery_capacity = battery_capacity
self._ship_batteries = [
self.battery_capacity for _ in range(len(start_position))]
self._ship_charging = [False for _ in range(self.ship_num)]
self._ship_charging_positions = [[] for _ in range(self.ship_num)]
self._ship_charging_time_left = [0 for _ in range(self.ship_num)]
def create_rectangle_obstacle(self, x, y, width, height):
# points = [(0, 0), (0, height), (width, height), (width, 0)]
points = [(-width/2, -height/2), (-width/2, height/2),
(width/2, height/2), (width/2, -height/2)]
# points = [(-width, -height), (-width, 0), (0, 0), (0, -height)]
mass = 9999999999
moment = pymunk.moment_for_poly(mass, points, (0, 0))
obstacle_body = pymunk.Body(mass=mass, moment=moment)
obstacle_body.position = (x+width/2, self.window_height-y-height/2)
obstacle_shape = pymunk.Poly(obstacle_body, points)
obstacle_shape.color = THECOLORS["blue"]
obstacle_shape.collision_type = COLLISION_OBSTACLE
self.space.add(obstacle_body, obstacle_shape)
return obstacle_body
def _filling_get_last_position(self, ship_id: int):
if self._ship_charging[ship_id]:
last_pos = self._ship_charging_positions[ship_id][-1]
else:
last_pos = self.ship_body[ship_id].position
return last_pos
def get_battery(self, ship_id: int):
return self._ship_batteries[ship_id]
def frame_step(self, actions: List[int]):
"""frame update for one step
Args:
action (int): the next direction of the ship
0: up
1: down
2: left
3: right
"""
assert self.init_ships, 'Have not initialized ships!'
if self.terminate:
raise AssertionError("Cannot continue running after crash!")
for ship_id, action in enumerate(actions):
if action == UP: # up
self._ship_charging[ship_id] = False
self._ship_charging_time_left[ship_id] = 0
self.ship_body[ship_id].angle = 1.5*np.pi
elif action == DOWN: # down
self._ship_charging[ship_id] = False
self._ship_charging_time_left[ship_id] = 0
self.ship_body[ship_id].angle = 0.5*np.pi
elif action == LEFT: # left
self._ship_charging[ship_id] = False
self._ship_charging_time_left[ship_id] = 0
self.ship_body[ship_id].angle = np.pi
elif action == RIGHT: # right
self._ship_charging[ship_id] = False
self._ship_charging_time_left[ship_id] = 0
self.ship_body[ship_id].angle = 0.0
elif action == FINISHED:
self._ship_charging[ship_id] = False
self._ship_charging_time_left[ship_id] = 0
driving_direction = Vec2d(1, 0).rotated(
self.ship_body[ship_id].angle)
self.ship_body[ship_id].velocity = 0.0 * driving_direction
if not self.finished_idx[ship_id]:
self.space.remove(self.ship_body[ship_id])
self.finished_idx[ship_id] = True
elif action == FILLING_UP:
last_pos = self._filling_get_last_position(ship_id)
self._ship_charging_positions[ship_id].append(
(last_pos[0], last_pos[1]-self.ship_velocity*self.time_click))
self._ship_charging[ship_id] = True
self._ship_charging_time_left[ship_id] = self.filling_time
recent_x, recent_y = self._ship_charging_positions[ship_id][-1]
rx, ry = int(
recent_x//self.field_size[0]), int(recent_y//self.field_size[1])
if self.fields[rx, ry] >= 0:
self.fields[rx, ry] += 1
elif action == FILLING_DOWN:
last_pos = self._filling_get_last_position(ship_id)
self._ship_charging_positions[ship_id].append(
(last_pos[0], last_pos[1]+self.ship_velocity*self.time_click))
self._ship_charging[ship_id] = True
self._ship_charging_time_left[ship_id] = self.filling_time
recent_x, recent_y = self._ship_charging_positions[ship_id][-1]
rx, ry = int(
recent_x//self.field_size[0]), int(recent_y//self.field_size[1])
if self.fields[rx, ry] >= 0:
self.fields[rx, ry] += 1
elif action == FILLING_LEFT:
last_pos = self._filling_get_last_position(ship_id)
self._ship_charging_positions[ship_id].append(
(last_pos[0]-self.ship_velocity*self.time_click, last_pos[1]))
self._ship_charging[ship_id] = True
self._ship_charging_time_left[ship_id] = self.filling_time
recent_x, recent_y = self._ship_charging_positions[ship_id][-1]
rx, ry = int(
recent_x//self.field_size[0]), int(recent_y//self.field_size[1])
if self.fields[rx, ry] >= 0:
self.fields[rx, ry] += 1
elif action == FILLING_RIGHT:
last_pos = self._filling_get_last_position(ship_id)
self._ship_charging_positions[ship_id].append(
(last_pos[0]+self.ship_velocity*self.time_click, last_pos[1]))
self._ship_charging[ship_id] = True
self._ship_charging_time_left[ship_id] = self.filling_time
recent_x, recent_y = self._ship_charging_positions[ship_id][-1]
rx, ry = int(
recent_x//self.field_size[0]), int(recent_y//self.field_size[1])
if self.fields[rx, ry] >= 0:
self.fields[rx, ry] += 1
elif action == FILLING_BACK:
assert self._ship_charging[ship_id] == True
assert self._ship_charging_time_left[ship_id] == 0
self._ship_charging_positions[ship_id].pop()
if len(self._ship_charging_positions[ship_id]):
recent_x, recent_y = self._ship_charging_positions[ship_id][-1]
rx, ry = int(
recent_x//self.field_size[0]), int(recent_y//self.field_size[1])
if self.fields[rx, ry] >= 0:
self.fields[rx, ry] += 1
elif action == FILLING:
assert self._ship_charging[ship_id] == True
self._ship_charging_time_left[ship_id] -= 1
if self._ship_charging_time_left[ship_id] == 0:
self._ship_batteries[ship_id] = self.battery_capacity
else:
raise ValueError(f"invalid action: {action}")
self.ship_body[ship_id].angle = process_angle(
self.ship_body[ship_id].angle)
# calculate velocity with direction
driving_direction = Vec2d(1, 0).rotated(
self.ship_body[ship_id].angle)
if not self._ship_charging[ship_id]:
self.ship_body[ship_id].velocity = self.ship_velocity * \
driving_direction
else:
self.ship_body[ship_id].velocity = 0.0 * driving_direction
# Update the screen and stuff.
if self.draw_screen:
# self.screen.fill(THECOLORS["black"])
self.screen.blit(self.ss_img, (0, 0))
# draw(self.screen, self.space)
if self.show_fields:
for i in range(self.fields.shape[0]):
for j in range(self.fields.shape[1]):
if self.fields[i, j] > 0:
pos_x, pos_y = self.field_size[0] * i,\
self.field_size[1] * j
# the size of your rect
s = pygame.Surface(self.field_size)
# alpha level
s.set_alpha(int(96 * self.fields[i, j]))
# this fills the entire surface
color = COLORS[int(self._splited_areas[i, j] - 1)]
s.fill(color)
# (0,0) are the top-left coordinates
self.screen.blit(s, (pos_x, pos_y))
elif self.fields[i, j] == -1:
pos_x, pos_y = self.field_size[0] * i,\
self.field_size[1] * j
# the size of your rect
s = pygame.Surface(self.field_size)
# alpha level
# this fills the entire surface
if self.actual_background:
color = (0, 0, 0)
s.set_alpha(100)
else:
color = (0, 0, 255)
s.set_alpha(255)
s.fill(color)
# (0,0) are the top-left coordinates
self.screen.blit(s, (pos_x, pos_y))
for i in range(self.ship_num):
self.screen.blit(
self.ship_img, (self.ship_body[i].position[0]-2.0*self.ship_radius, self.ship_body[i].position[1]-2.0*self.ship_radius))
# draw charging trajectory
for ship_id in range(self.ship_num):
if self._ship_charging[ship_id]:
for cx, cy in self._ship_charging_positions[ship_id]:
pygame.draw.circle(
self.screen, (255, 255, 255), (cx, cy), 2)
pygame.display.flip()
self.clock.tick()
self.space.step(self.time_click)
for i in range(self.ship_num):
if not self.finished_idx[i]:
if actions[i] != FILLING:
self._ship_batteries[i] -= 1
if self._ship_batteries[i] < 0:
self.crashed = True
if self.crashed:
print('crashed!!!')
self.terminate = True
self.num_steps += 1
return self.current()
def current(self):
state = None
for ship_id in range(len(self.ship_body)):
x, y = self.ship_body[ship_id].position
if self.finished_idx[ship_id]:
direction = UP
elif self.ship_body[ship_id].angle == 1.5 * np.pi:
direction = UP
elif self.ship_body[ship_id].angle == 0.0:
direction = RIGHT
elif self.ship_body[ship_id].angle == 0.5 * np.pi:
direction = DOWN
elif self.ship_body[ship_id].angle == np.pi:
direction = LEFT
else:
raise ValueError(
f'invalid angle: {self.ship_body[ship_id].angle}')
cur_state = np.asarray([
int(x//self.field_size[0]),
int(y//self.field_size[1]),
direction])[None]
if state is None:
state = cur_state
else:
state = np.concatenate([state, cur_state], axis=0)
if not self.crashed and not self.finished_idx[ship_id] and not self._ship_charging[ship_id]:
cx, cy = int(x//self.field_size[0]), int(y//self.field_size[1])
if self.fields[cx, cy] >= 0 and 0 <= x < self.window_width \
and 0 <= y < self.window_height:
self.fields[cx, cy] += 1
else:
print(f'crash')
self.crashed = True
if np.sum(self.fields > 0) / np.sum(self.fields >= 0) >= self.cover_ratio:
self.finished = True
print('finished!!!')
self.terminate = True
info = {'fields': self.fields, 'finished': self.finished}
return state, self.terminate, info
def get_rotated_point(self, x_1, y_1, x_2, y_2, radians):
# Rotate x_2, y_2 around x_1, y_1 by angle.
x_change = (x_2 - x_1) * math.cos(radians) + \
(y_2 - y_1) * math.sin(radians)
y_change = (y_1 - y_2) * math.cos(radians) - \
(x_1 - x_2) * math.sin(radians)
new_x = x_change + x_1
# new_y = self.window_height - (y_change + y_1)
new_y = y_change + y_1
return int(new_x), int(new_y)
def get_sonar_readings(self, x, y, angle):
"""
Instead of using a grid of boolean(ish) sensors, sonar readings
simply return N "distance" readings, one for each sonar
we're simulating. The distance is a count of the first non-zero
reading starting at the object. For instance, if the fifth sensor
in a sonar "arm" is non-zero, then that arm returns a distance of 5.
"""
# Make our arms.
arm_left = self.make_sonar_arm(x, y)
# arm_middle = arm_left
# arm_right = arm_left
# Rotate them and get readings.
readings = []
for shift_angle in np.arange(-0.75, 0.75, 0.05):
readings.append(self.get_arm_distance(
arm_left, x, y, angle, shift_angle))
# print(len(readings))
# readings.append(self.get_arm_distance(arm_left, x, y, angle, 0.75))
# readings.append(self.get_arm_distance(arm_middle, x, y, angle, 0))
# readings.append(self.get_arm_distance(arm_right, x, y, angle, -0.75))
if self.draw_screen and self.show_sensors:
pygame.display.update()
return readings
def make_sonar_arm(self, x, y):
spread = 10 # Default spread.
distance = 10 # Gap before first sensor.
arm_points = []
# Make an arm. We build it flat because we'll rotate it about the
# center later.
for i in range(1, self.sonar_spread_times):
arm_points.append((distance + x + (spread * i), y))
return arm_points
def get_arm_distance(self, arm, x, y, angle, offset) -> int:
# Used to count the distance.
i = 0
# Look at each point and see if we've hit something.
for point in arm:
i += 1
# Move the point to the right spot.
rotated_p = self.get_rotated_point(
x, y, point[0], point[1], angle + offset
)
# Check if we've hit something. Return the current i (distance)
# if we did.
if rotated_p[0] <= 0 or rotated_p[1] <= 0 \
or rotated_p[0] >= self.window_width or rotated_p[1] >= self.window_height:
return i # Sensor is off the screen.
else:
for obstacle, obstacle_w_h in zip(self.obstacles, self.obstacles_w_h):
obstacle_position = obstacle.position
if 0 <= rotated_p[0] - obstacle_position[0] <= obstacle_w_h[0] \
and 0 <= rotated_p[1] - obstacle_position[1] <= obstacle_w_h[1]:
return i
if self.draw_screen and self.show_sensors:
pygame.draw.circle(
self.screen, (255, 255, 255), (rotated_p), 2)
# Return the distance for the arm.
return i