import numpy as np
from scipy.ndimage import gaussian_filter1d
from .base_behavior import BaseBehavior
from rtgym.dataclass import RawTrajectory, RawAgentState
[docs]
class AutonomousBehavior(BaseBehavior):
def __init__(self, gym, config):
super().__init__(gym, config)
self.raw_agent_state = RawAgentState()
def _check_params(self):
assert self.config.velocity_mean > 0, 'velocity_mean must be positive'
assert self.config.random_drift_magnitude >= 0, 'random_drift_magnitude must be non-negative'
assert 0 <= self.config.switch_direction_prob <= 1, 'switch_direction_prob must be in [0, 1]'
assert 0 <= self.config.switch_velocity_prob <= 1, 'switch_velocity_prob must be in [0, 1]'
[docs]
def init_from_profile(self, raw_profile):
required_keys = ['velocity_mean', 'random_drift_magnitude', 'switch_direction_prob', 'switch_velocity_prob']
if all(key in raw_profile for key in required_keys):
self.config.velocity_mean = raw_profile['velocity_mean']
self.config.velocity_sd = max(raw_profile.get('velocity_sd', 0), 0)
self.config.random_drift_magnitude = raw_profile['random_drift_magnitude']
self.config.switch_direction_prob = raw_profile['switch_direction_prob']
self.config.switch_velocity_prob = raw_profile['switch_velocity_prob']
self.config.avoid_boundary_dist = raw_profile.get('avoid_boundary_dist', -1)
self._check_params()
self._recompute_maps()
self.initialized = True
print("Autonomous behavior initialized")
else:
self.initialized = False
missing = [key for key in required_keys if key not in raw_profile]
print(f"Autonomous behavior not initialized: missing keys {missing}")
@BaseBehavior.require_init
def generate_trajectory(self, duration: float, batch_size: int, init_pos=None, init_state=None):
"""Generate a trajectory and return generated data.
Creates autonomous agent trajectories based on the configured behavior
parameters including random walks and boundary avoidance.
Args:
duration (float): Duration of the trial in seconds.
batch_size (int): Number of trials to generate.
init_pos (np.ndarray, optional): Initial position of the agent.
init_state (AgentState, optional): Initial state of the agent.
If both init_pos and init_state are provided, init_state will be used.
Returns:
Trajectory: Generated trajectory as a rtgym.dataclass.Trajectory object.
"""
self.dur_ts = self.gym.to_ts(duration)
self.batch_size = batch_size
# Initialize the raw trajectory
raw_traj = RawTrajectory(
dur_ts=self.dur_ts,
batch_size=self.batch_size
)
self.raw_agent_state = self._init_raw_agent_state(init_pos=init_pos, init_state=init_state)
raw_traj.update_state(0, self.raw_agent_state)
# Generate the trajectory
self._generate_trajectory(raw_traj)
self._compute_displacements(raw_traj)
self._generate_hds(raw_traj)
return raw_traj.to_trajectory(), self.raw_agent_state
def _random_velocity(self, size):
# Solve for the mean and sigma of the underlying Gaussian
# distribution with which we used to generate log-normal
m = self.config.velocity_mean
s = self.config.velocity_sd
sigma = np.sqrt(np.log(1 + (s**2 / m**2)))
mu = np.log(m**2 / np.sqrt(m**2 + s**2))
dist = np.random.lognormal(mean=mu, sigma=sigma, size=size)
return dist / 1e3 * self.t_res
def _compute_displacements(self, raw_traj):
raw_traj.displacements = np.diff(raw_traj.coord, axis=1)
raw_traj.displacements = np.concatenate([
raw_traj.displacements,
np.zeros((raw_traj.batch_size, 1, 2))
], axis=1)
def _generate_hds(self, raw_traj):
"""Generate head directions based on movement direction.
Computes head direction as the angle of movement direction,
aligning the agent's heading with its movement vector.
Args:
raw_traj (RawTrajectory): Raw trajectory object to update with head directions.
"""
# # TODO: Change scale
# raw_traj.hd[:] = np.random.normal(loc=0, scale=0.2, size=raw_traj.hd.shape)
# raw_traj.hd[:] = gaussian_filter1d(raw_traj.hd, sigma=5, axis=1)
# # Add the angle of the movement to the head direction
# angle_rad = np.arctan2(raw_traj.displacements[..., 0], raw_traj.disp[..., 1])
# raw_traj.hd += angle_rad[..., np.newaxis]
angle_rad = np.arctan2(raw_traj.displacements[..., 0], raw_traj.disp[..., 1])
raw_traj.hd[:] = angle_rad[..., np.newaxis]
def _generate_trajectory(self, raw_traj):
for ts in range(1, self.dur_ts):
self._update_coord(self.raw_agent_state)
self._update_drifting_direction(self.raw_agent_state)
self._update_velocity(self.raw_agent_state)
# Update the displacement, direction, and velocity
self.raw_agent_state.disp = self._rad_vel_to_disp(
direction=self.raw_agent_state.mv_dir,
velocity_norm=self.raw_agent_state.vel_norm,
drift=self.raw_agent_state.drift
)
self.raw_agent_state.mv_dir = self._avoid_boundary(
direction=self._compute_direction(self.raw_agent_state.disp),
coords=self.raw_agent_state.coord
)
raw_traj.update_state(ts, self.raw_agent_state)
def _update_drifting_direction(self, raw_agent_state):
"""Update the drifting direction of the agent.
Args:
raw_agent_state: Generator state of the agent. A rtgym.dataclass.GeneratorState object.
"""
# Randomly switch the direction of the random drift
switch_direction_idx = self._flip_coin(self.config.switch_direction_prob, raw_agent_state.batch_size)
n_switch = np.sum(switch_direction_idx)
if n_switch > 0:
raw_agent_state.drift[switch_direction_idx] = self._rad_vel_to_disp(
direction=self._random_rad(n_switch),
velocity_norm=raw_agent_state.vel_norm[switch_direction_idx],
scale=self.config.random_drift_magnitude
)
def _update_velocity(self, raw_agent_state):
"""Update the velocity of the agent.
Args:
raw_agent_state: Generator state of the agent. A rtgym.dataclass.GeneratorState object.
"""
# Randomly switch the velocity
switch_velocity_idx = self._flip_coin(self.config.switch_velocity_prob, raw_agent_state.batch_size)
n_switch = np.sum(switch_velocity_idx)
if n_switch > 0:
raw_agent_state.vel_norm[switch_velocity_idx] = self._random_velocity(n_switch)
def _compute_boundary_avoidance_adjustment(self, direction, perpend_angle):
"""Compute the boundary avoidance adjustment for the agent.
Args:
direction: Direction of the agent. A np.ndarray of shape (batch_size, 1).
perpend_angle: Perpendicular angle of the agent. This is used to determine which side the agent
should be drifted away from such that the agent will not make a sharp turn when near
the boundary.
"""
# Compute angle difference
angle_diff = perpend_angle[..., np.newaxis] - direction
angle_diff = (angle_diff + np.pi) % (2 * np.pi) - np.pi # Normalize to [-pi, pi)
# Determine pointing boundary
pointing_boundary = (angle_diff >= -np.pi / 2) & (angle_diff <= np.pi / 2)
pointing_boundary = pointing_boundary.astype(int)
# Compute complement angle
complement_angle = -np.sign(angle_diff) * (np.pi / 2 - np.abs(angle_diff))
return pointing_boundary, complement_angle
def _init_raw_agent_state(self, init_pos=None, init_state=None):
"""
This function will initialize the generator state of the agent. If the initial position is provided,
or the initial state is provided, the agent will start from the given initial position or state.
Otherwise, the agent will start from a random position.
When starting from a random position, the velocity and drift are sampled similar to the random explore
behavior used throughout this script.
In the generator state, the displacement and current coordinates are aligned in the current timestep.
That is, the coordinates are the current position of the agent, and the displacement is the displacement
that the agent will make in the next timestep.
Args:
init_pos: Initial position of the agent. A np.ndarray of shape (batch_size, 2).
init_state: Initial state of the agent. A rtgym.dataclass.AgentState object.
If None, the agent will start from a random position.
Else, the agent will start from the given initial state.
Returns:
raw_agent_state: Generator state of the agent. A rtgym.dataclass.RawAgentState object.
"""
raw_agent_state = RawAgentState()
if init_state is not None:
raw_agent_state = init_state
else:
if init_pos is not None:
raw_agent_state.coord = init_pos
else:
raw_agent_state.coord = self.arena.generate_random_pos(self.batch_size)
raw_agent_state.vel_norm = self._random_velocity(self.batch_size)
raw_agent_state.drift = self._rad_vel_to_disp(
direction=self._random_rad(self.batch_size),
velocity_norm=raw_agent_state.vel_norm,
scale=self.config.random_drift_magnitude
)
raw_agent_state.disp = self._rad_vel_to_disp(
direction=self._random_rad(self.batch_size),
velocity_norm=raw_agent_state.vel_norm,
drift=raw_agent_state.drift
)
gt_direction = self._compute_direction(raw_agent_state.disp)
raw_agent_state.mv_dir = self._avoid_boundary(gt_direction, raw_agent_state.coord)
return raw_agent_state
def _avoid_boundary(self, direction, coords):
"""
This will adjust the direction of the agent to avoid the boundary. This is different from
the boundary collision avoidance in the _update_coord function. This will apply a "force" to
push the agent away from the boundary, while the _update_coord function will only move the
agent if the next position is not valid.
Args:
direction: Direction of the agent. A np.ndarray of shape (batch_size, 1).
coords: Coordinates of the agent. A np.ndarray of shape (batch_size, 2).
"""
if self.config.avoid_boundary_dist <= 0:
return direction
int_coords = coords.astype(int)
avoid_coef = self.distance_map[int_coords[:, 0], int_coords[:, 1]]
avoid_idx = np.where(avoid_coef > 1e-2)[0] # Batches that are close to the boundary
if len(avoid_idx) > 0:
# If there are batches that are close to the boundary, adjust the direction
perpend_angle = self.perpend_angle_map[int_coords[avoid_idx, 0], int_coords[avoid_idx, 1]]
pointing_boundary, complement_angle = self._compute_boundary_avoidance_adjustment(
direction[avoid_idx], perpend_angle
)
# Compute the adjustment coefficient
avoid_coef = avoid_coef[avoid_idx][..., np.newaxis]
avoid_coef = avoid_coef * pointing_boundary # Not adjust if pointing away from the boundary
# Adjust the direction
adjusted_direction = (1 - avoid_coef) * direction[avoid_idx]
boundary_adjustment = avoid_coef * (direction[avoid_idx] + complement_angle)
direction[avoid_idx] = adjusted_direction + boundary_adjustment
return direction