Variables at t(2):
– Matrix size → 256 × 256
– Energy types → 4
– Agent classes → 8
– Class assignment per agent → 4
– Execution time → 10,713,600 seconds
Execution Time
– start ->01:30:41 FEB 03, 2026 (utc)
– end -> 01:30:41 Jun 07, 2026 (UTC)
Key:
– void -> Black
– seed -> Brown
– producer -> Green
– consumer -> Red
– nomad -> Orange
– architect -> Blue
– predator -> Purple
– defender -> White
– parasite -> Chartreuse
– symbiont ->Pink
– Light -> Sun
– Spots -> Rain
Connecting…
Scripts for t(2):
MATRIX_f:defines the grid structure and its dimensions.
matrix_a: Abstract agent state, containing per‑cell attributes.
fifo: Flow system for intentions and update application.
# matrix/matrix_f.py
# 29-12-2025
import numpy as np
import copy
from time_energy.world_energy_state import WorldEnergyState
class MatrixF:
def __init__(self, size=256, channels=4, dtype=np.float32):
self.size = size
self.channels = channels
self.dtype = dtype
self.data = np.zeros((channels, size, size), dtype=dtype)
self.sphere_coords = self.compute_sphere_coords()
self.env_state = WorldEnergyState()
self.tmp_rain = np.zeros((size, size), dtype=np.float32)
self.tmp_wind = np.full((size, size), 5, dtype=np.uint8)
self.tmp_strength = np.zeros((size, size), dtype=np.float32)
self.tmp_change = np.zeros((size, size), dtype=np.float32)
self.tmp_strength_change = np.zeros((size, size), dtype=np.float32)
def compute_sphere_coords(self):
size = self.size
theta = np.linspace(-np.pi/2, np.pi/2, size, dtype=np.float32)
phi = np.linspace(0, 2*np.pi, size, dtype=np.float32)
theta_grid, phi_grid = np.meshgrid(theta, phi, indexing='ij')
cos_t = np.cos(theta_grid)
X = cos_t * np.cos(phi_grid)
Y = np.sin(theta_grid)
Z = cos_t * np.sin(phi_grid)
return np.stack((X, Y, Z), axis=-1).astype(np.float32)
def copy_env(self, state):
self.env_state = copy.deepcopy(state)
# 02-01-2026
def move_on_sphere(self, x, y, vx, vy):
size = self.size
new_x = (x + vx) % size
new_y = y + vy
if new_y < 0:
new_y = -new_y
new_x = (new_x + size//2) % size
vy = -vy
elif new_y >= size:
new_y = 2*(size-1) - new_y
new_x = (new_x + size//2) % size
vy = -vy
return new_x, new_y, vx, vy
# matrix/matrix_a.py
# 05-01-2026
import numpy as np
# 16-01-2026
from vars.vars import agent_fields, agent_class
# 17-01-2026
import msgpack
import os
class MatrixA:
def __init__(self, size=256):
self.size = size
self.agent_class = agent_class
self.agent_dtype = np.dtype([(name, dtype) for name, dtype in agent_fields.items()])
self.agent = np.zeros((size, size), dtype=self.agent_dtype)
self.env_view = {
"tick": 0,
"day_phase": 0.0,
"year_phase": 0.0,
"season": 0,
"sun_intensity": 0.0,
"sun_day_factor": 0.0,
"sun_season_factor": 0.0,
"clouds_north": [],
"clouds_south": [],
"axial_tilt": 23.5,
"day_length": 256,
"year_length": 256 * 256,
"simulation_years" : 0.0
}
self.type_stats = np.zeros(9, dtype=np.uint32)
self.global_count = np.zeros(1, dtype=np.uint64)
# 17-01-2026
@classmethod
def load_backup(cls, path_base):
path_data = path_base + ".msgpack"
path_arrays = path_base + ".npz"
if not os.path.exists(path_data) or not os.path.exists(path_arrays):
return cls()
self = cls()
with open(path_data, "rb") as f:
data = msgpack.unpackb(f.read(), raw=False, strict_map_key=False)
if "env_view" in data:
self.env_view = data["env_view"]
if "type_stats" in data:
self.type_stats = np.array(data["type_stats"], dtype=np.uint32)
if "global_count" in data:
self.global_count = np.array(data["global_count"], dtype=np.uint64)
arr = np.load(path_arrays)
if "agent" in arr:
self.agent[:] = arr["agent"]
return self
# matrix/fifo_matrix.py
# 28-12-2025
from queue import Queue
from matrix.matrix_f import MatrixF
class FifoMatrix:
def __init__(self, buffer_size=3, size=256, channels=4, dtype=None):
self.fifoTimeEnergy = Queue(maxsize=buffer_size)
self.fifoPopulation = Queue(maxsize=buffer_size)
for _ in range(buffer_size):
m = ( MatrixF(size=size, channels=channels) if dtype is None else MatrixF(size=size, channels=channels, dtype=dtype) )
self.fifoTimeEnergy.put(m)
def pop_for_timeenergy(self):
if not self.fifoTimeEnergy.empty():
return self.fifoTimeEnergy.get()
return None
def push_from_timeenergy(self, matrix):
if not self.fifoPopulation.full():
self.fifoPopulation.put(matrix)
def pop_for_population(self):
if not self.fifoPopulation.empty():
return self.fifoPopulation.get()
return None
def push_from_population(self, matrix):
if not self.fifoTimeEnergy.full():
self.fifoTimeEnergy.put(matrix)
# matrix/fifo_population_sse.py
# 06-01-2026
from queue import Queue
from matrix.matrix_a import MatrixA
class FifoPopulationSSE:
def __init__(self, buffer_size=3, size=256):
self.fifoPopulation = Queue(maxsize=buffer_size)
self.fifoSSE = Queue(maxsize=buffer_size)
for _ in range(buffer_size):
m = MatrixA(size=size)
self.fifoPopulation.put(m)
def pop_for_population(self):
if not self.fifoPopulation.empty():
return self.fifoPopulation.get()
return None
def push_from_population(self, matrix):
if not self.fifoSSE.full():
self.fifoSSE.put(matrix)
def pop_for_sse(self):
if not self.fifoSSE.empty():
return self.fifoSSE.get()
return None
def push_from_sse(self, matrix):
if not self.fifoPopulation.full():
self.fifoPopulation.put(matrix)
Energy cycle → distribution and flow of energy across cells.
# time_energy/world_energy_state.py
# 28-12-2025
import os
from vars.vars import backup_world_energy_state, matrix_size
import numpy as np
# 16-01-2026
import msgpack
class WorldEnergyState:
def __init__(self, size=matrix_size):
self.size = size
self.tick = 0
self.day_length = 256
self.year_length = 256 * 256
self.day_phase = 0.0
self.year_phase = 0.0
self.season_length = self.year_length // 4
self.season = 0
self._last_season = 0
self.axial_tilt = 23.5 * np.pi / 180.0
self.sun_intensity = 1.0
self.sun_day_factor = 1.0
self.sun_season_factor = 1.0
# 08-01-2026
self.simulation_years = 0.0
# 30-12-2025
self.cloud_params = {
"global": {
"radius_min": 0.0,
"respiration_speed": 0.01
},
"seasonal": {
0: {
"count": 12,
"radius_max": 14.0,
"speed": 0.4,
"intensity": 0.8
},
1: {
"count": 10,
"radius_max": 10.0,
"speed": 0.32,
"intensity": 0.4
},
2: {
"count": 10,
"radius_max": 8.0,
"speed": 0.24,
"intensity": 1.0
},
3: {
"count": 12,
"radius_max": 12.0,
"speed": 0.32,
"intensity": 0.6
}
}
}
self.clouds_north = []
self.clouds_south = []
# 02-01-2026
self.wind_params = {
"node_count": 6,
"base_drift": 0.0005,
"osc_speed": 0.02,
"osc_amp": 1.0,
"strength_min": 0.5,
"strength_max": 1.0,
"radius": int(self.size * 0.25)
}
self.wind_nodes = []
# 02-01-2026
self.change_params = {
"zone_count": 6,
"radius_factor": 0.25,
"levels": [1.0, 0.6, 0.2],
"cycle_years": 2,
"invert_fraction": 0.5,
"drift_speed": 0.0001
}
self.change_zones = []
self.last_change_cycle = 0
# 16-01-2026
self.noise64 = np.zeros((64, 64), dtype=np.float32)
self.noise32 = np.zeros((32, 32), dtype=np.float32)
self.flow64_x = None
self.flow64_y = None
self.flow32_x = None
self.flow32_y = None
def update_phases(self):
self.tick += 1
self.day_phase = ( 2 * np.pi * (self.tick % self.day_length) / self.day_length )
self.year_phase = ( 2 * np.pi * (self.tick % self.year_length) / self.year_length )
self.season = (self.tick // self.season_length) % 4
self.simulation_years = self.tick / self.year_length
def encode_numpy(self, obj):
if isinstance(obj, (np.float32, np.float64)):
return float(obj)
if isinstance(obj, (np.int32, np.int64)):
return int(obj)
return obj
def save_backup(self, path_base=backup_world_energy_state):
path_data = path_base + ".msgpack"
path_arrays = path_base + ".npz"
with open(path_data, "wb") as f:
f.write(msgpack.packb(
self.to_dict(),
default=self.encode_numpy,
use_bin_type=True
))
np.savez_compressed(path_arrays,
noise64=self.noise64,
noise32=self.noise32,
flow64_x=self.flow64_x,
flow64_y=self.flow64_y,
flow32_x=self.flow32_x,
flow32_y=self.flow32_y,
)
@classmethod
def load_backup(cls, path_base=backup_world_energy_state):
path_data = path_base + ".msgpack"
path_arrays = path_base + ".npz"
if not os.path.exists(path_data):
return cls()
self = cls()
with open(path_data, "rb") as f:
data = msgpack.unpackb(f.read(), raw=False, strict_map_key=False)
self.from_dict(data)
if os.path.exists(path_arrays):
arr = np.load(path_arrays)
self.noise64 = arr["noise64"]
self.noise32 = arr["noise32"]
self.flow64_x = arr["flow64_x"]
self.flow64_y = arr["flow64_y"]
self.flow32_x = arr["flow32_x"]
self.flow32_y = arr["flow32_y"]
return self
def to_dict(self):
return {
"size": self.size,
"tick": self.tick,
"day_length": self.day_length,
"year_length": self.year_length,
"day_phase": self.day_phase,
"year_phase": self.year_phase,
"season_length": self.season_length,
"season": self.season,
"_last_season": self._last_season,
"axial_tilt": self.axial_tilt,
"sun_intensity": self.sun_intensity,
"sun_day_factor": self.sun_day_factor,
"sun_season_factor": self.sun_season_factor,
"simulation_years": self.simulation_years,
"cloud_params": self.cloud_params,
"clouds_north": self.clouds_north,
"clouds_south": self.clouds_south,
"wind_params": self.wind_params,
"wind_nodes": self.wind_nodes,
"change_params": self.change_params,
"change_zones": self.change_zones,
"last_change_cycle": self.last_change_cycle,
}
def from_dict(self, data):
for key, value in data.items():
setattr(self, key, value)
# time_energy/time_energy.py
# 28-12-2025
import time
from time_energy.world_energy_state import WorldEnergyState
#29-12-2025
import numpy as np
from vars.vars import sun_channel, rain_channel, air_channel, change_channel, backup_world_energy_state
# 08-01-2026
from concurrent.futures import ThreadPoolExecutor
# 14-01-2026
from numba import njit, prange
class TimeEnergy:
def __init__(self, fifo_matrix, sleep_min=0.001, sleep_max=0.005):
self.fm = fifo_matrix
self.sleep_min = sleep_min
self.sleep_max = sleep_max
self.running = True
self.state = WorldEnergyState.load_backup(path_base=backup_world_energy_state)
self.pool = ThreadPoolExecutor(max_workers=4)
self.rng = np.random.default_rng()
def compute(self):
while self.running:
matrix = self.fm.pop_for_timeenergy()
if matrix is None:
time.sleep(self.rng.uniform(self.sleep_min, self.sleep_max))
continue
f_sun = self.pool.submit(self.apply_sun, matrix)
f_rain = self.pool.submit(self.apply_rain, matrix)
f_wind = self.pool.submit(self.apply_wind, matrix)
f_change = self.pool.submit(self.apply_change, matrix)
for f in (f_sun, f_rain, f_wind, f_change):
try:
f.result()
except Exception as e:
print(e)
matrix.copy_env(self.state)
self.fm.push_from_timeenergy(matrix)
self.state.update_phases()
def stop(self):
self.running = False
self.pool.shutdown(wait=True)
# 29-12-2025
def apply_sun(self, matrix):
state = self.state
sun_lat = state.axial_tilt * np.sin(state.year_phase)
sun_lon = state.day_phase
sx = np.cos(sun_lat) * np.cos(sun_lon)
sy = np.sin(sun_lat)
sz = np.cos(sun_lat) * np.sin(sun_lon)
sun_dir = np.array([sx, sy, sz], dtype=np.float32)
out = matrix.data[sun_channel]
sun_kernel(matrix.sphere_coords, sun_dir, out)
out *= state.sun_intensity
out *= state.sun_day_factor
out *= state.sun_season_factor
# 02-01-2026
def apply_rain(self, matrix):
state = self.state
grid_size = state.size
max_idx = grid_size - 1
sc = matrix.sphere_coords
season_north = state.season
season_south = (state.season + 2) % 4
params_north = state.cloud_params["seasonal"][season_north]
params_south = state.cloud_params["seasonal"][season_south]
season_progress = (state.tick % state.season_length) / state.season_length
if state.season != state._last_season:
state.clouds_north = []
state.clouds_south = []
state._last_season = state.season
if len(state.clouds_north) == 0 or len(state.clouds_south) == 0:
params = state.cloud_params["seasonal"][state.season]
scale64 = params["radius_max"] * 0.3
scale32 = params["radius_max"] * 0.6
self.state.noise64 = self.generate_noise_matrix(
64,
scale=scale64,
seed=int((state.simulation_years + 1) * 1000 + (state.season + 1) * 100 + 1)
)
self.state.noise32 = self.generate_noise_matrix(
32,
scale=scale32,
seed=int((state.simulation_years + 1) * 1000 + (state.season + 1) * 100 + 2)
)
self.state.flow64_x, self.state.flow64_y = compute_curl(self.state.noise64)
self.state.flow32_x, self.state.flow32_y = compute_curl(self.state.noise32)
for _ in range(params_north["count"]):
state.clouds_north.append({
"x": self.rng.uniform(0, grid_size),
"y": self.rng.uniform(0, grid_size/2),
"vx": 0.0,
"vy": 0.0,
"radius_max": params_north["radius_max"],
"phase_offset": 0.0,
"radius": 0.0,
})
for _ in range(params_south["count"]):
state.clouds_south.append({
"x": self.rng.uniform(0, grid_size),
"y": self.rng.uniform(grid_size/2, grid_size),
"vx": 0.0,
"vy": 0.0,
"radius_max": params_south["radius_max"],
"phase_offset": 0.0,
"radius": 0.0,
})
for cloud_list in (state.clouds_north, state.clouds_south):
for c in cloud_list:
nx = c["x"] / grid_size
ny = c["y"] / grid_size
ix64 = int(nx * 64) % 64
iy64 = int(ny * 64) % 64
ix32 = int(nx * 32) % 32
iy32 = int(ny * 32) % 32
wind_x = ( self.state.flow64_x[iy64, ix64] * 0.7 + self.state.flow32_x[iy32, ix32] * 0.3 )
wind_y = ( self.state.flow64_y[iy64, ix64] * 0.7 + self.state.flow32_y[iy32, ix32] * 0.3 )
speed = params_north["speed"] if c in state.clouds_north else params_south["speed"]
c["vx"] += wind_x * speed * 0.001
c["vy"] += wind_y * speed * 0.001
c["x"], c["y"], c["vx"], c["vy"] = matrix.move_on_sphere(
c["x"], c["y"], c["vx"], c["vy"]
)
rain = matrix.tmp_rain
rain.fill(0.0)
cloud_groups = [
(state.clouds_north, params_north),
(state.clouds_south, params_south),
]
for cloud_list, params in cloud_groups:
for c in cloud_list:
phase = np.sin(np.pi * (season_progress + c["phase_offset"]))
if phase <= 0.0:
continue
radius = c["radius_max"] * phase
c["radius"] = max(radius, 0.0)
if radius <= 0.0:
continue
sigma = (radius / grid_size) * np.pi
if sigma <= 0.0:
continue
sigma2 = 2.0 * sigma * sigma
k = 1.0 / (2.0 * sigma2)
cx = int(c["x"]) % grid_size
cy = int(c["y"]) % grid_size
lat = (cy / grid_size) * 2.0 - 1.0
lon_scale = 1.0 / max(0.01, np.cos(lat * np.pi * 0.5))
radius_y = int(radius)
radius_x = int(radius * lon_scale)
radius_y = min(radius_y, max_idx)
radius_x = min(radius_x, max_idx)
rain_kernel(
sc,
rain,
cx, cy,
radius_y, radius_x,
k,
params["intensity"]
)
np.maximum(rain, 0.0, out=rain)
np.minimum(rain, 1.0, out=rain)
matrix.data[rain_channel][:] = rain
# 02-01-2026
def apply_wind(self, matrix):
state = self.state
grid_size = state.size
max_idx = grid_size - 1
sc = matrix.sphere_coords
wp = state.wind_params
if len(state.wind_nodes) == 0:
for _ in range(wp["node_count"]):
node = {
"x": self.rng.uniform(0, grid_size),
"y": self.rng.uniform(0, grid_size),
"base_dir": self.rng.uniform(1, 9),
"osc_phase": self.rng.uniform(0, 2*np.pi),
"osc_speed": wp["osc_speed"] * self.rng.uniform(0.5, 1.5),
"osc_amp": wp["osc_amp"],
"vx": (self.rng.random() - 0.5) * wp["base_drift"],
"vy": (self.rng.random() - 0.5) * wp["base_drift"],
"strength": self.rng.uniform(wp["strength_min"], wp["strength_max"]),
"dir": 5,
}
state.wind_nodes.append(node)
for node in state.wind_nodes:
node["x"], node["y"], node["vx"], node["vy"] = matrix.move_on_sphere(
node["x"], node["y"], node["vx"], node["vy"]
)
node["osc_phase"] += node["osc_speed"]
osc = np.sin(node["osc_phase"]) * node["osc_amp"]
d = node["base_dir"] + osc
d = int(round(d))
d = ((d - 1) % 9) + 1
node["dir"] = d
wind = matrix.tmp_wind
wind.fill(5)
strength_map = matrix.tmp_strength
strength_map.fill(0.0)
if wp["radius"] <= 0:
sigma = (1.0 / grid_size) * np.pi
else:
sigma = (wp["radius"] / grid_size) * np.pi
sigma2 = 2.0 * sigma * sigma
k = 1.0 / (2.0 * sigma2)
for node in state.wind_nodes:
cx = int(node["x"]) % grid_size
cy = int(node["y"]) % grid_size
lat = (cy / grid_size) * 2.0 - 1.0
lon_scale = 1.0 / max(0.01, np.cos(lat * np.pi * 0.5))
radius_y = min(int(wp["radius"]), max_idx)
radius_x = min(int(wp["radius"] * lon_scale), max_idx)
wind_kernel(
sc,
strength_map,
wind,
cx, cy,
radius_y, radius_x,
k,
node["strength"],
node["dir"]
)
matrix.data[air_channel] = wind
def apply_change(self, matrix):
state = self.state
grid_size = state.size
max_idx = grid_size - 1
cp = state.change_params
sc = matrix.sphere_coords
if len(state.change_zones) == 0:
radius = int(grid_size * cp["radius_factor"])
for _ in range(cp["zone_count"]):
zone = {
"x": self.rng.uniform(0, grid_size),
"y": self.rng.uniform(0, grid_size),
"radius": radius,
"level": self.rng.choice(cp["levels"]),
"vx": (self.rng.random() - 0.5) * cp["drift_speed"],
"vy": (self.rng.random() - 0.5) * cp["drift_speed"],
}
state.change_zones.append(zone)
for z in state.change_zones:
z["x"], z["y"], z["vx"], z["vy"] = matrix.move_on_sphere(
z["x"], z["y"], z["vx"], z["vy"]
)
ticks_per_cycle = state.year_length * cp["cycle_years"]
if state.tick - state.last_change_cycle >= ticks_per_cycle:
state.last_change_cycle = state.tick
zone_indices = list(range(len(state.change_zones)))
self.rng.shuffle(zone_indices)
count_to_invert = int(len(zone_indices) * cp["invert_fraction"])
for idx in zone_indices[:count_to_invert]:
z = state.change_zones[idx]
z["level"] = 0.2 if z["level"] == 1.0 else 1.0
change_map = matrix.tmp_change
strength_map = matrix.tmp_strength_change
change_map.fill(0.0)
strength_map.fill(0.0)
for z in state.change_zones:
radius = z["radius"]
if radius <= 0:
continue
sigma = (radius / grid_size) * np.pi
if sigma <= 0.0:
continue
sigma2 = 2.0 * sigma * sigma
k = 1.0 / (2.0 * sigma2)
cx = int(z["x"]) % grid_size
cy = int(z["y"]) % grid_size
lat = (cy / grid_size) * 2.0 - 1.0
lon_scale = 1.0 / max(0.01, np.cos(lat * np.pi * 0.5))
radius_y = min(int(radius), max_idx)
radius_x = min(int(radius * lon_scale), max_idx)
change_kernel(
sc,
strength_map,
change_map,
cx, cy,
radius_y, radius_x,
k,
z["level"]
)
matrix.data[change_channel][:] = change_map
# 15-01-2026
def generate_noise_matrix(self, size, scale=0.05, seed=0):
rng = np.random.default_rng(seed)
perm = np.arange(256, dtype=np.int32)
rng.shuffle(perm)
perm = np.concatenate([perm, perm])
mat = generate_simplex_noise(size, scale, perm)
mn = mat.min()
mx = mat.max()
return (mat - mn) / (mx - mn)
grad3 = np.array([
[1,1], [-1,1], [1,-1], [-1,-1],
[1,0], [-1,0], [1,0], [-1,0],
[0,1], [0,-1], [0,1], [0,-1]
], dtype=np.float32)
@njit
def fastfloor(x):
return int(x) if x > 0 else int(x) - 1
@njit
def dot(g, x, y):
return g[0] * x + g[1] * y
@njit(fastmath=True)
def simplex2d(xin, yin, perm):
F2 = 0.5 * (np.sqrt(3.0) - 1.0)
G2 = (3.0 - np.sqrt(3.0)) / 6.0
s = (xin + yin) * F2
i = fastfloor(xin + s)
j = fastfloor(yin + s)
t = (i + j) * G2
X0 = i - t
Y0 = j - t
x0 = xin - X0
y0 = yin - Y0
if x0 > y0:
i1, j1 = 1, 0
else:
i1, j1 = 0, 1
x1 = x0 - i1 + G2
y1 = y0 - j1 + G2
x2 = x0 - 1.0 + 2.0 * G2
y2 = y0 - 1.0 + 2.0 * G2
ii = i & 255
jj = j & 255
gi0 = perm[ii + perm[jj]] % 12
gi1 = perm[ii + i1 + perm[jj + j1]] % 12
gi2 = perm[ii + 1 + perm[jj + 1]] % 12
n0 = 0.0
n1 = 0.0
n2 = 0.0
t0 = 0.5 - x0*x0 - y0*y0
if t0 > 0:
t0 *= t0
n0 = t0 * t0 * dot(grad3[gi0], x0, y0)
t1 = 0.5 - x1*x1 - y1*y1
if t1 > 0:
t1 *= t1
n1 = t1 * t1 * dot(grad3[gi1], x1, y1)
t2 = 0.5 - x2*x2 - y2*y2
if t2 > 0:
t2 *= t2
n2 = t2 * t2 * dot(grad3[gi2], x2, y2)
return 70.0 * (n0 + n1 + n2)
@njit(parallel=True, fastmath=True)
def generate_simplex_noise(size, scale, perm):
mat = np.zeros((size, size), dtype=np.float32)
for y in prange(size):
for x in range(size):
mat[y, x] = simplex2d(x * scale, y * scale, perm)
return mat
@njit(parallel=True, fastmath=True)
def compute_curl(noise):
h, w = noise.shape
curl_x = np.zeros((h, w), dtype=np.float32)
curl_y = np.zeros((h, w), dtype=np.float32)
for y in prange(1, h - 1):
for x in range(1, w - 1):
dx = (noise[y, x+1] - noise[y, x-1]) * 0.5
dy = (noise[y+1, x] - noise[y-1, x]) * 0.5
curl_x[y, x] = dy
curl_y[y, x] = -dx
return curl_x, curl_y
# 14-01-2026
@njit(parallel=True, fastmath=True)
def change_kernel(sc, strength_map, change_map, cx, cy, radius_y, radius_x, k, level):
H = sc.shape[0]
W = sc.shape[1]
cxv = sc[cy, cx, 0]
cyv = sc[cy, cx, 1]
czv = sc[cy, cx, 2]
min_y = cy - radius_y
if min_y < 0:
min_y = 0
max_y = cy + radius_y
if max_y >= H:
max_y = H - 1
for y in prange(min_y, max_y + 1):
for dx in range(-radius_x, radius_x + 1):
x = cx + dx
if x < 0:
x += W
elif x >= W:
x -= W
vx = sc[y, x, 0]
vy = sc[y, x, 1]
vz = sc[y, x, 2]
dot = vx * cxv + vy * cyv + vz * czv
dot = min(1.0, max(-1.0, dot))
influence = 1.0 - (1.0 - dot) * 2.0 * k
if influence > 0.0 and influence > strength_map[y, x]:
strength_map[y, x] = influence
change_map[y, x] = level
@njit(parallel=True, fastmath=True)
def wind_kernel(sc, strength_map, wind, cx, cy, radius_y, radius_x, k, strength, direction):
H = sc.shape[0]
W = sc.shape[1]
cxv = sc[cy, cx, 0]
cyv = sc[cy, cx, 1]
czv = sc[cy, cx, 2]
min_y = cy - radius_y
if min_y < 0:
min_y = 0
max_y = cy + radius_y
if max_y >= H:
max_y = H - 1
for y in prange(min_y, max_y + 1):
for dx in range(-radius_x, radius_x + 1):
x = cx + dx
if x < 0:
x += W
elif x >= W:
x -= W
vx = sc[y, x, 0]
vy = sc[y, x, 1]
vz = sc[y, x, 2]
dot = vx * cxv + vy * cyv + vz * czv
dot = min(1.0, max(-1.0, dot))
influence = 1.0 - (1.0 - dot) * 2.0 * k
if influence <= 0.0:
continue
influence *= strength
if influence > strength_map[y, x]:
strength_map[y, x] = influence
wind[y, x] = direction
@njit(parallel=True, fastmath=True)
def rain_kernel(sc, rain, cx, cy, radius_y, radius_x, k, intensity):
H = sc.shape[0]
W = sc.shape[1]
cxv = sc[cy, cx, 0]
cyv = sc[cy, cx, 1]
czv = sc[cy, cx, 2]
min_y = cy - radius_y
if min_y < 0:
min_y = 0
max_y = cy + radius_y
if max_y >= H:
max_y = H - 1
for y in prange(min_y, max_y + 1):
for dx in range(-radius_x, radius_x + 1):
x = cx + dx
if x < 0:
x += W
elif x >= W:
x -= W
vx = sc[y, x, 0]
vy = sc[y, x, 1]
vz = sc[y, x, 2]
dot = vx * cxv + vy * cyv + vz * czv
dot = min(1.0, max(-1.0, dot))
cloud_val = 1.0 - (1.0 - dot) * 2.0 * k
if cloud_val > 0.0:
rain[y, x] += cloud_val * intensity
@njit(parallel=True, fastmath=True)
def sun_kernel(sc, sun_dir, out):
for i in prange(sc.shape[0]):
for j in range(sc.shape[1]):
d = sc[i, j, 0] * sun_dir[0] + sc[i, j, 1] * sun_dir[1] + sc[i, j, 2] * sun_dir[2]
out[i, j] = d if d > 0 else 0
Agents → rules and behaviors of the agent population.
# agents/clean_backup.py
# 17-01-2026
import os
import re
import time
import threading
from vars.vars import backup_dir
class CleanBackup:
def __init__(self, backup_dir=backup_dir, keep_last=20, interval=600):
self.backup_dir = backup_dir
self.keep_last = keep_last
self.interval = interval
self.running = True
self.pattern = re.compile(
r"(world_energy_state|population)_(\d{4}_\d{2}_\d{2}__\d{2}_\d{2})\.(msgpack|npz)$"
)
def stop(self):
self.running = False
def loop(self):
while self.running:
try:
self.cleanup()
except Exception as e:
print(e)
for _ in range(self.interval):
if not self.running:
break
time.sleep(1)
def cleanup(self):
files = os.listdir(self.backup_dir)
backups = {}
for f in files:
m = self.pattern.match(f)
if not m:
continue
prefix, timestamp, ext = m.groups()
backups.setdefault(timestamp, []).append(f)
timestamps = sorted(backups.keys())
if len(timestamps) <= self.keep_last:
return
to_delete = timestamps[:-self.keep_last]
for ts in to_delete:
for f in backups[ts]:
full_path = os.path.join(self.backup_dir, f)
try:
os.remove(full_path)
except Exception as e:
print(e)
# agents/population.py
# 17-01-2026
import copy
import threading
import msgpack
#18-01-2026
from numba import njit, prange
from vars.vars import type_void, type_seed, type_producer, type_consumer, type_nomad, type_architect, type_predator ,type_defender, type_parasite, type_symbiont
from vars.vars import type_energy, type_speed_l, type_cost, type_speed, type_dead_prob
from vars.vars import agent_energy, agent_index, agent_class_choice, agent_is_died, agent_seed1, agent_seed2, agent_seed3, agent_seed4, agent_seed_prob1, agent_seed_prob2, agent_seed_prob3, agent_seed_prob4, agent_fitness1, agent_fitness2, agent_fitness3, agent_fitness4, agent_fitness_prob1, agent_fitness_prob2, agent_fitness_prob3, agent_fitness_prob4, agent_turn
# 20-01-2026
from vars.vars import max_intent, intent_dtype, move_born, move_move, move_producer, move_consumer, move_nomad_seed, move_nomad_plant, move_architect, move_predator, move_defender, move_parasite, move_symbiont
from vars.vars import sx, sy, tx, ty, move_type, velocity
from vars.vars import sun_channel, rain_channel, air_channel, change_channel
# 21-01-2026
from vars.vars import mutation_rate
# 28-01-2026
from vars.vars import id_src, id_dest
class Population:
def __init__(self, fifo_matrix, fifo_sse, sleep_min=0.001, sleep_max=0.005):
self.fm = fifo_matrix
self.fsse = fifo_sse
self.sleep_min = sleep_min
self.sleep_max = sleep_max
self.running = True
self._last_backup = time.time()
self.backup_interval = backup_interval
self.matrix_a_buffers, self.history_count = Population.load_backup()
self.current_index = 0
self.rng = np.random.default_rng()
self.intentions = np.zeros((256, 256, max_intent),dtype=intent_dtype)
self.intentions_count = np.zeros((256, 256), dtype=np.int8)
self.neigh_x = np.empty(8, np.int32)
self.neigh_y = np.empty(8, np.int32)
self.backup_class = np.empty((256, 256), dtype=np.int8)
def stop(self):
self.running = False
def compute(self):
while self.running:
matrix_f = self.fm.pop_for_population()
if matrix_f is None:
time.sleep(self.rng.uniform(self.sleep_min, self.sleep_max))
continue
matrix_a = None
while matrix_a is None and self.running:
matrix_a = self.fsse.pop_for_population()
if matrix_a is None:
time.sleep(self.rng.uniform(self.sleep_min, self.sleep_max))
try:
matrix_a = self.do_operation(matrix_f, matrix_a)
except Exception as e:
print(f"[Population] ERROR in do_operation: {e}", flush=True)
self.fm.push_from_population(matrix_f)
if matrix_a is not None:
self.fsse.push_from_population(matrix_a)
continue
now = time.time()
if now - self._last_backup >= self.backup_interval:
self._last_backup = now
env_state_copy = copy.deepcopy(matrix_f.env_state)
agent_copy, env_view_copy, type_stats_copy, global_count_copy = self.snapshot_matrix_a()
threading.Thread(
target=self.do_backup,
args=(env_state_copy, agent_copy, env_view_copy, type_stats_copy, global_count_copy),
daemon=True
).start()
self.fm.push_from_population(matrix_f)
self.fsse.push_from_population(matrix_a)
def do_backup(self, env_state_copy, agent_copy, env_view_copy, type_stats_copy, global_count_copy):
os.makedirs(backup_dir, exist_ok=True)
suffix = datetime.now().strftime("%Y_%m_%d__%H_%M")
base_world = f"world_energy_state_{suffix}"
base_pop = f"population_{suffix}"
base_path_world = os.path.join(backup_dir, base_world)
base_path_population = os.path.join(backup_dir, base_pop)
env_state_copy.save_backup(path_base=base_path_world)
with open(base_path_population + ".msgpack", "wb") as f:
f.write(msgpack.packb(
{
"env_view": env_view_copy,
"type_stats": type_stats_copy.tolist(),
"global_count": global_count_copy.tolist(),
},
default=self.encode_numpy,
use_bin_type=True
))
np.savez_compressed(base_path_population + ".npz", agent=agent_copy)
link_msgpack = backup_world_energy_state + ".msgpack"
link_npz = backup_world_energy_state + ".npz"
target_msgpack = base_world + ".msgpack"
target_npz = base_world + ".npz"
link_msgpack_agents = backup_population + ".msgpack"
link_npz_agents = backup_population + ".npz"
target_msgpack_agents = base_pop + ".msgpack"
target_npz_agents = base_pop + ".npz"
try:
for link, target in [
(link_msgpack, target_msgpack),
(link_npz, target_npz),
(link_msgpack_agents, target_msgpack_agents),
(link_npz_agents, target_npz_agents),
]:
full_link = link
full_target = os.path.join(backup_dir, target)
if os.path.islink(full_link) or os.path.exists(full_link):
os.remove(full_link)
os.symlink(full_target, full_link)
except Exception as e:
print(e)
# 06-01-2026
def copy_env_to_matrix_a(self, matrix_f, matrix_a):
env = matrix_f.env_state
clouds_north = [c.copy() for c in env.clouds_north]
clouds_south = [c.copy() for c in env.clouds_south]
snapshot = {
"tick": env.tick,
"day_phase": env.day_phase,
"year_phase": env.year_phase,
"season": env.season,
"sun_intensity": env.sun_intensity,
"sun_day_factor": env.sun_day_factor,
"sun_season_factor": env.sun_season_factor,
"clouds_north": clouds_north,
"clouds_south": clouds_south,
"axial_tilt": env.axial_tilt,
"day_length": env.day_length,
"year_length": env.year_length,
"simulation_years": env.simulation_years
}
matrix_a.env_view = snapshot
def do_operation(self, matrix_f, matrix_a):
current_buffer = self.matrix_a_buffers[self.current_index]
next_index = (self.current_index + 1) % 3
next_buffer = self.matrix_a_buffers[next_index]
matrix_a.env_view = current_buffer.env_view
matrix_a.agent = current_buffer.agent
matrix_a.type_stats = current_buffer.type_stats
self.copy_env_to_matrix_a(matrix_f, next_buffer)
self.intentions_count.fill(0)
self.backup_class[:] = matrix_a.agent[agent_class_choice]
update_agents_and_intentions(
matrix_a.agent,
matrix_f.data,
matrix_a.agent_class,
self.intentions,
self.intentions_count,
matrix_a.type_stats,
self.neigh_x,
self.neigh_y
)
apply_intentions(
matrix_a.agent,
matrix_f.data,
self.intentions,
self.intentions_count,
next_buffer.agent,
self.history_count,
matrix_a.agent_class,
matrix_f.env_state.tick
)
matrix_a.agent[agent_class_choice] = self.backup_class
matrix_a.global_count[:] = self.history_count
self.current_index = next_index
return matrix_a
def snapshot_matrix_a(self):
current = self.matrix_a_buffers[self.current_index]
agent_copy = current.agent.copy()
env_view_copy = copy.deepcopy(current.env_view)
type_stats_copy = current.type_stats.copy()
global_count_copy = self.history_count.copy()#current.global_count.copy()
return agent_copy, env_view_copy, type_stats_copy, global_count_copy
def encode_numpy(self, obj):
if isinstance(obj, (np.float32, np.float64)):
return float(obj)
if isinstance(obj, (np.int32, np.int64)):
return int(obj)
return obj
# door
@classmethod
def load_backup(cls):
base_pop = backup_population
path_msgpack = base_pop + ".msgpack"
path_npz = base_pop + ".npz"
if not os.path.exists(path_msgpack) or not os.path.exists(path_npz):
buffers = [MatrixA(), MatrixA(), MatrixA()]
i = genesis_matrix(buffers[0].agent, buffers[0].agent_class, population_density=0.90)
buffers[0].global_count[0] = i
return buffers, buffers[0].global_count
loaded = MatrixA.load_backup(base_pop)
buffers = [MatrixA(), MatrixA(), MatrixA()]
buffers[0].agent[:] = loaded.agent
buffers[0].env_view = copy.deepcopy(loaded.env_view)
buffers[0].type_stats[:] = loaded.type_stats
buffers[0].global_count[:] = loaded.global_count
return buffers, buffers[0].global_count
# 18-01-2026 ~ 23-01-2026
@njit
def genesis_matrix(agent_matrix, class_params, population_density):
size_y, size_x = agent_matrix.shape
n_cells = size_y * size_x
genesis_agents = int(n_cells * population_density)
for y in prange(size_y):
for x in range(size_x):
row = agent_matrix[y, x]
row[agent_class_choice] = 0
row[agent_energy] = 0.0
row[agent_index] = 0
row[agent_is_died] = 0
row[agent_seed1] = 0
row[agent_seed2] = 0
row[agent_seed3] = 0
row[agent_seed4] = 0
row[agent_seed_prob1] = 0.0
row[agent_seed_prob2] = 0.0
row[agent_seed_prob3] = 0.0
row[agent_seed_prob4] = 0.0
row[agent_fitness1] = 0
row[agent_fitness2] = 0
row[agent_fitness3] = 0
row[agent_fitness4] = 0
row[agent_fitness_prob1] = 0.0
row[agent_fitness_prob2] = 0.0
row[agent_fitness_prob3] = 0.0
row[agent_fitness_prob4] = 0.0
row[agent_turn] = 0
base_genes = np.array([2,3,4,5,6,7,8,9], dtype=np.int8)
groups = 8
per_group = genesis_agents // groups
agent_id = 1
for g in range(groups):
base_gene = base_genes[g]
s1 = base_gene
s2 = base_gene
s3 = base_gene
if np.random.rand() < mutation_rate:
s4 = np.random.randint(2, 10)
else:
s4 = base_gene
generated = 0
while generated < per_group:
y = int(np.random.rand() * size_y)
x = int(np.random.rand() * size_x)
row = agent_matrix[y, x]
if row[agent_class_choice] != 0:
continue
row[agent_class_choice] = type_seed
row[agent_index] = agent_id
row[agent_is_died] = 0
row[agent_fitness_prob1] = 0.0
row[agent_fitness_prob2] = 0.0
row[agent_fitness_prob3] = 0.0
row[agent_fitness_prob4] = 0.0
row[agent_seed1] = s1
row[agent_seed2] = s2
row[agent_seed3] = s3
row[agent_seed4] = s4
row[agent_seed_prob1] = 0.25
row[agent_seed_prob2] = 0.25
row[agent_seed_prob3] = 0.25
row[agent_seed_prob4] = 0.25
row[agent_fitness1] = 0
row[agent_fitness2] = 0
row[agent_fitness3] = 0
row[agent_fitness4] = 0
row[agent_energy] = class_params[type_seed, 0]
generated += 1
agent_id += 1
return agent_id - 1
@njit
def update_agents_and_intentions(matrix_a_input, matrix_f, agent_class_params, intentions_matrix, intentions_count, type_stats, neigh_x, neigh_y):
size = matrix_a_input.shape[0]
for i in range(type_stats.shape[0]):
type_stats[i] = 0
for y in range(size):
for x in range(size):
row = matrix_a_input[y, x]
if row[agent_index] == 0 or row[agent_class_choice] == type_seed:
continue
rand = np.random.rand()
p1 = row[agent_fitness_prob1]
p2 = row[agent_fitness_prob2]
p3 = row[agent_fitness_prob3]
p4 = row[agent_fitness_prob4]
t1 = p1
t2 = p1 + p2
t3 = p1 + p2 + p3
if rand < t1:
row[agent_class_choice] = row[agent_fitness1]
elif rand < t2:
row[agent_class_choice] = row[agent_fitness2]
elif rand < t3:
row[agent_class_choice] = row[agent_fitness3]
else:
row[agent_class_choice] = row[agent_fitness4]
for y in range(size):
for x in range(size):
row = matrix_a_input[y, x]
if row[agent_index] == 0:
continue
class_choice = row[agent_class_choice]
if class_choice == type_seed:
type_stats[type_seed - 1] += 1
energy = row[agent_energy] - agent_class_params[type_seed, type_cost]
row[agent_energy] = energy
if energy <= 0.0:
row[agent_index] = 0
row[agent_class_choice] = type_void
continue
sun = matrix_f[sun_channel][y][x]
rain = matrix_f[rain_channel][y][x]
if sun <= 0.01 and rain <= 0.01:
move_flag = move_move
else:
move_flag = move_born
row[agent_turn] = 0
write_intention(intentions_matrix, intentions_count, x, y, x, y, move_flag, agent_class_params[type_seed, type_speed], 0, 0)
continue
type_stats[class_choice - 1] += 1
energy = row[agent_energy] - agent_class_params[class_choice, type_cost]
if energy <= 0.0:
row[agent_index] = 0
row[agent_class_choice] = type_void
continue
row[agent_energy] = energy
if np.random.rand() < agent_class_params[class_choice, type_dead_prob] and row[agent_is_died] == 0:
if row[agent_seed1] != 0:
row[agent_class_choice] = type_seed
row[agent_energy] = agent_class_params[type_seed, type_energy]
continue
row[agent_is_died] = 1
continue
if row[agent_is_died] == 1:
continue
row[agent_turn] += 1
if class_choice == type_producer:
generate_producer_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, agent_class_params, size, neigh_x, neigh_y)
elif class_choice == type_consumer:
generate_consumer_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params)
elif class_choice == type_nomad:
generate_nomad_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params)
elif class_choice == type_architect:
generate_architect_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params)
elif class_choice == type_predator:
generate_predator_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y,agent_class_params)
elif class_choice == type_defender:
generate_defender_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, agent_class_params)
elif class_choice == type_parasite:
generate_parasite_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params)
elif class_choice == type_symbiont:
generate_symbiont_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params)
@njit(inline='always')
def generate_producer_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, agent_class_params, size, neigh_x, neigh_y):
get_spherical_neighbors(x, y, size, neigh_x, neigh_y)
empty_i = -1
id_src_value = matrix_a_input[y, x][agent_index]
id_dest_value = 0
for i in range(8):
nx = neigh_x[i]
ny = neigh_y[i]
agent_type = matrix_a_input[ny, nx][agent_class_choice]
if agent_type == type_void:
if empty_i == -1:
empty_i = i
ttx = x
tty = y
if matrix_a_input[y, x][agent_seed1] != 0 and empty_i != -1:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
vel = agent_class_params[type_producer, type_speed] * matrix_a_input[y, x][agent_turn]
write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_producer, vel, id_src_value, id_dest_value)
reproduce_agent(matrix_a_input, x, y, agent_class_params[type_producer, type_speed_l])
@njit(inline='always')
def generate_consumer_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params):
get_spherical_neighbors(x, y, size, neigh_x, neigh_y)
best_energy = -1.0
best_i = -1
empty_i = -1
defender_present = 0
id_src_value = matrix_a_input[y, x][agent_index]
id_dest_value = 0
for i in range(8):
nx = neigh_x[i]
ny = neigh_y[i]
agent_type = matrix_a_input[ny, nx][agent_class_choice]
if agent_type == type_producer:
e = matrix_a_input[ny, nx][agent_energy]
if e > best_energy:
best_energy = e
best_i = i
id_dest_value = matrix_a_input[ny, nx][agent_index]
elif agent_type == type_void:
if empty_i == -1:
empty_i = i
elif agent_type == type_defender:
if np.random.rand() < 0.90:
defender_present = 1
if best_i != -1 and defender_present == 0:
ttx = neigh_x[best_i]
tty = neigh_y[best_i]
move_flag = move_consumer
else:
if empty_i == -1:
ttx = x
tty = y
else:
wind_dir = matrix_f[air_channel][y][x]
wind_tx, wind_ty = wind_target(x, y, size, wind_dir)
wind_type = matrix_a_input[wind_ty, wind_tx][agent_class_choice]
if wind_type == type_void:
if np.random.rand() < 0.90:
ttx = wind_tx
tty = wind_ty
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
move_flag = move_move
vel = agent_class_params[type_consumer, type_speed] * matrix_a_input[y, x][agent_turn]
write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_flag, vel, id_src_value, id_dest_value)
reproduce_agent(matrix_a_input, x, y, agent_class_params[type_consumer, type_speed_l])
@njit(inline='always')
def generate_nomad_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params):
get_spherical_neighbors(x, y, size, neigh_x, neigh_y)
has_seed = matrix_a_input[y, x][agent_seed1]
target_architect = -1
target_seed_carrier = -1
empty_i = -1
id_src_value = matrix_a_input[y, x][agent_index]
id_dest_value = 0
for i in range(8):
nx = neigh_x[i]
ny = neigh_y[i]
agent_type = matrix_a_input[ny, nx][agent_class_choice]
if agent_type == type_void and empty_i == -1:
empty_i = i
if has_seed != 0:
if agent_type == type_architect:
target_architect = i
id_dest_value = matrix_a_input[ny, nx][agent_index]
break
else:
if matrix_a_input[ny, nx][agent_seed1] != 0 and matrix_a_input[ny, nx][agent_class_choice] != type_nomad and matrix_a_input[ny, nx][agent_class_choice] != type_architect and matrix_a_input[ny, nx][agent_class_choice] != type_seed :
target_seed_carrier = i
id_dest_value = matrix_a_input[ny, nx][agent_index]
break
if has_seed != 0:
if target_architect != -1:
ttx = neigh_x[target_architect]
tty = neigh_y[target_architect]
move_flag = move_nomad_plant
else:
if empty_i == -1:
ttx = x
tty = y
else:
wind_dir = matrix_f[air_channel][y][x]
wind_tx, wind_ty = wind_target(x, y, size, wind_dir)
wind_type = matrix_a_input[wind_ty, wind_tx][agent_class_choice]
if wind_type == type_void:
if np.random.rand() < 0.90:
ttx = wind_tx
tty = wind_ty
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
move_flag = move_move
else:
if target_seed_carrier != -1:
ttx = neigh_x[target_seed_carrier]
tty = neigh_y[target_seed_carrier]
move_flag = move_nomad_seed
else:
if empty_i == -1:
ttx = x
tty = y
else:
wind_dir = matrix_f[air_channel][y][x]
wind_tx, wind_ty = wind_target(x, y, size, wind_dir)
wind_type = matrix_a_input[wind_ty, wind_tx][agent_class_choice]
if wind_type == type_void:
if np.random.rand() < 0.90:
ttx = wind_tx
tty = wind_ty
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
move_flag = move_move
reproduce_agent(matrix_a_input, x, y, agent_class_params[type_nomad, type_speed_l])
vel = agent_class_params[type_nomad, type_speed] * matrix_a_input[y, x][agent_turn]
write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_flag, vel, id_src_value, id_dest_value)
@njit(inline='always')
def generate_architect_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params):
get_spherical_neighbors(x, y, size, neigh_x, neigh_y)
reproduce_agent(matrix_a_input, x, y, agent_class_params[type_architect, type_speed_l])
ttx = x
tty = y
id_src_value = matrix_a_input[y, x][agent_index]
id_dest_value = 0
if matrix_a_input[y, x][agent_seed1] != 0:
empty_i = -1
for i in range(8):
nx = neigh_x[i]
ny = neigh_y[i]
agent_type = matrix_a_input[ny, nx][agent_class_choice]
if agent_type == type_void and empty_i == -1:
empty_i = i
break
if empty_i != -1:
wind_dir = matrix_f[air_channel][y][x]
wind_tx, wind_ty = wind_target(x, y, size, wind_dir)
wind_type = matrix_a_input[wind_ty, wind_tx][agent_class_choice]
if wind_type == type_void:
if np.random.rand() < 0.90:
ttx = wind_tx
tty = wind_ty
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
move_flag = move_architect
else:
move_flag = move_move
else:
move_flag = move_move
vel = agent_class_params[type_architect, type_speed] * matrix_a_input[y, x][agent_turn]
write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_flag, vel, id_src_value, id_dest_value)
@njit(inline='always')
def generate_predator_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params):
get_spherical_neighbors(x, y, size, neigh_x, neigh_y)
best_energy = -1.0
best_i = -1
empty_i = -1
defender_present = 0
id_src_value = matrix_a_input[y, x][agent_index]
id_dest_value = 0
for i in range(8):
nx = neigh_x[i]
ny = neigh_y[i]
agent_type = matrix_a_input[ny, nx][agent_class_choice]
if agent_type == type_defender:
if np.random.rand() < 0.90:
defender_present = 1
elif agent_type == type_void:
if empty_i == -1:
empty_i = i
elif agent_type == type_consumer or agent_type == type_parasite or agent_type == type_symbiont or agent_type == type_defender:
e = matrix_a_input[ny, nx][agent_energy]
if e > best_energy:
best_energy = e
best_i = i
id_dest_value = matrix_a_input[ny, nx][agent_index]
if best_i != -1 and defender_present == 0:
ttx = neigh_x[best_i]
tty = neigh_y[best_i]
move_flag = move_predator
else:
if empty_i == -1:
ttx = x
tty = y
else:
wind_dir = matrix_f[air_channel][y][x]
wind_tx, wind_ty = wind_target(x, y, size, wind_dir)
wind_type = matrix_a_input[wind_ty, wind_tx][agent_class_choice]
if wind_type == type_void:
if np.random.rand() < 0.90:
ttx = wind_tx
tty = wind_ty
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
move_flag = move_move
slot = intentions_count[tty, ttx]
vel = agent_class_params[type_predator, type_speed] * matrix_a_input[y, x][agent_turn]
write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_flag, vel, id_src_value, id_dest_value)
reproduce_agent(matrix_a_input, x, y, agent_class_params[type_predator, type_speed_l])
@njit(inline='always')
def generate_defender_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, agent_class_params):
ttx = x
tty = y
id_src_value = matrix_a_input[y, x][agent_index]
id_dest_value = 0
move_flag = move_defender
vel = agent_class_params[type_defender, type_speed] * matrix_a_input[y, x][agent_turn]
write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_flag, vel, id_src_value, id_dest_value)
reproduce_agent(matrix_a_input, x, y, agent_class_params[type_defender, type_speed_l])
# loop
@njit(inline='always')
def generate_parasite_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params):
get_spherical_neighbors(x, y, size, neigh_x, neigh_y)
worst_energy = 1_000_000_000
worst_i = -1
empty_i = -1
id_src_value = matrix_a_input[y, x][agent_index]
id_dest_value = 0
for i in range(8):
nx = neigh_x[i]
ny = neigh_y[i]
agent_type = matrix_a_input[ny, nx][agent_class_choice]
if agent_type == type_void:
if empty_i == -1:
empty_i = i
elif agent_type != type_defender:
e = matrix_a_input[ny, nx][agent_energy]
if e < worst_energy:
worst_energy = e
worst_i = i
id_dest_value = matrix_a_input[ny, nx][agent_index]
if worst_i != -1:
ttx = neigh_x[worst_i]
tty = neigh_y[worst_i]
move_flag = move_parasite
else:
if empty_i == -1:
ttx = x
tty = y
else:
wind_dir = matrix_f[air_channel][y][x]
wind_tx, wind_ty = wind_target(x, y, size, wind_dir)
wind_type = matrix_a_input[wind_ty, wind_tx][agent_class_choice]
if wind_type == type_void:
if np.random.rand() < 0.90:
ttx = wind_tx
tty = wind_ty
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
else:
ttx = neigh_x[empty_i]
tty = neigh_y[empty_i]
move_flag = move_move
vel = agent_class_params[type_parasite, type_speed] * matrix_a_input[y, x][agent_turn]
write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_flag, vel, id_src_value, id_dest_value)
reproduce_agent(matrix_a_input, x, y, agent_class_params[type_parasite, type_speed_l])
@njit(inline='always')
def generate_symbiont_intention(matrix_a_input, matrix_f, intentions_matrix, intentions_count, x, y, size, neigh_x, neigh_y, agent_class_params):
get_spherical_neighbors(x, y, size, neigh_x, neigh_y)
have_neigh = 0
id_src_value = matrix_a_input[y, x][agent_index]
id_dest_value = 0
for i in range(8):
nx = neigh_x[i]
ny = neigh_y[i]
agent_type = matrix_a_input[ny, nx][agent_class_choice]
if agent_type != type_void and agent_type != type_symbiont:
have_neigh = 1
break
if have_neigh == 1:
move_flag = move_symbiont
else:
move_flag = move_move
ttx = x
tty = y
vel = agent_class_params[type_symbiont, type_speed] * matrix_a_input[y, x][agent_turn]
write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_flag, vel, id_src_value, id_dest_value)
reproduce_agent(matrix_a_input, x, y, agent_class_params[type_symbiont, type_speed_l])
@njit(inline='always')
def get_spherical_neighbors(x, y, size, xs, ys):
last = size - 1
dirs = ((-1, -1), (0, -1), (1, -1),
(-1, 0), (1, 0),
(-1, 1), (0, 1), (1, 1))
for i in range(8):
dx, dy = dirs[i]
nx = (x + dx) % size
ny = y + dy
if ny < 0:
ny = 0
nx = (nx + size//2) % size
elif ny > last:
ny = last
nx = (nx + size//2) % size
xs[i] = nx
ys[i] = ny
return xs, ys
@njit(inline="always")
def write_intention(intentions_matrix, intentions_count, x, y, ttx, tty, move_flag, tvelocity, id_src_value, id_dest_value):
slot = intentions_count[tty, ttx]
row = intentions_matrix[tty, ttx, slot]
row[sx] = x
row[sy] = y
row[tx] = ttx
row[ty] = tty
row[move_type] = move_flag
row[velocity] = tvelocity
row[id_src] = id_src_value
row[id_dest] = id_dest_value
intentions_count[tty, ttx] = slot + 1
@njit(inline='always')
def reproduce_agent(matrix_a_input, x, y, energy_limit):
row = matrix_a_input[y, x]
if row[agent_seed1] == 0 and row[agent_energy] > energy_limit:
row[agent_seed1] = row[agent_fitness1]
row[agent_seed2] = row[agent_fitness2]
row[agent_seed3] = row[agent_fitness3]
row[agent_seed4] = row[agent_fitness4]
row[agent_seed_prob1] = row[agent_fitness_prob1]
row[agent_seed_prob2] = row[agent_fitness_prob2]
row[agent_seed_prob3] = row[agent_fitness_prob3]
row[agent_seed_prob4] = row[agent_fitness_prob4]
row[agent_energy] *= 0.5
if np.random.rand() < mutation_rate:
idx = np.random.randint(0, 4)
if idx == 0:
row[agent_seed1] = np.random.randint(2, 10)
elif idx == 1:
row[agent_seed2] = np.random.randint(2, 10)
elif idx == 2:
row[agent_seed3] = np.random.randint(2, 10)
else:
row[agent_seed4] = np.random.randint(2, 10)
@njit(inline='always')
def dir_to_offset(direction):
if direction == 1:
return -1, -1
elif direction == 2:
return 0, -1
elif direction == 3:
return 1, -1
elif direction == 4:
return -1, 0
elif direction == 5:
return 0, 0
elif direction == 6:
return 1, 0
elif direction == 7:
return -1, 1
elif direction == 8:
return 0, 1
else:
return 1, 1
@njit(inline='always')
def apply_spherical_offset(x, y, dx, dy, size):
last = size - 1
nx = (x + dx) % size
ny = y + dy
if ny < 0:
ny = 0
nx = (nx + size//2) % size
elif ny > last:
ny = last
nx = (nx + size//2) % size
return nx, ny
@njit(inline='always')
def wind_target(x, y, size, wind_dir):
dx, dy = dir_to_offset(wind_dir)
return apply_spherical_offset(x, y, dx, dy, size)
# 24-01-2026
@njit(parallel=True)
def apply_intentions(matrix_a_agent, matrix_f, intentions_matrix, intentions_count, next_agent, global_count, agent_class_params, tick):
size = matrix_a_agent.shape[0]
if tick % 2 == 0:
y_range = np.arange(size)
x_range = np.arange(size)
else:
y_range = np.arange(size - 1, -1, -1)
x_range = np.arange(size - 1, -1, -1)
for yy in prange(size):
y = y_range[yy]
for xx in range(size):
x = x_range[xx]
row = next_agent[y, x]
row[agent_index] = 0
row[agent_class_choice] = type_void
for yy in prange(size):
y = y_range[yy]
for xx in range(size):
x = x_range[xx]
count = intentions_count[y, x]
if count <= 0:
continue
cell_intentions = intentions_matrix[y, x]
sort_intentions_for_cell(cell_intentions, count)
for i in range(count):
row_int = cell_intentions[i]
ssx = row_int[sx]
ssy = row_int[sy]
ttx = row_int[tx]
tty = row_int[ty]
mtype = row_int[move_type]
agent_source = row_int[id_src]
agent_destination = row_int[id_dest]
if mtype == move_born:
global_count[0] += 1
row_src = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
row_next_tgt[agent_class_choice] = row_src[agent_fitness1]
row_next_tgt[agent_index] = global_count[0]
row_next_tgt[agent_is_died] = 0
born_copy_fitness_and_energy(next_agent, matrix_a_agent, ttx, tty)
born_copy_seed_probs_to_fitness_probs(next_agent, matrix_a_agent, ttx, tty)
born_reset_seed_fields(next_agent, ttx, tty)
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
elif mtype == move_move:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_src_tgt = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
if row_next_tgt[agent_class_choice] == type_void:
if row_src_tgt[agent_index] == row_src[agent_index]:
src_x, src_y = ttx, tty
dst_x, dst_y = ttx, tty
else:
src_x, src_y = ssx, ssy
dst_x, dst_y = ttx, tty
else:
src_x, src_y = ssx, ssy
dst_x, dst_y = ssx, ssy
row_src_final = matrix_a_agent[src_y, src_x]
class_choice = row_src_final[agent_class_choice]
energy_old = row_src_final[agent_energy]
energy_new = move_compute_energy(matrix_a_agent, matrix_f, src_x, src_y, class_choice)
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, dst_x, dst_y, src_x, src_y, energy_new)
if class_choice != type_seed:
update_fitness_probs(next_agent[dst_y, dst_x], delta)
elif mtype == move_producer:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
class_choice = row_src[agent_class_choice]
if (ttx != ssx or tty != ssy) and row_next_tgt[agent_class_choice] == type_void:
global_count[0] += 1
producer_architect_overflow_seed(next_agent, matrix_a_agent, agent_class_params, global_count, ssx, ssy, ttx, tty)
energy_new = ext_compute_energy(matrix_a_agent, matrix_f, ssx, ssy)
energy_old = row_src[agent_energy]
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
elif mtype == move_consumer:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_src_tgt = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
class_choice = row_src[agent_class_choice]
energy_old = row_src[agent_energy]
if row_next_tgt[agent_class_choice] != type_void and row_next_tgt[agent_index] != agent_destination:
cost = agent_class_params[class_choice, type_cost]
energy_new = energy_old
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
delta = compute_energy_delta(energy_old, energy_new, cost)
update_fitness_probs(next_agent[ssy, ssx], delta)
continue
gained = eat_compute_energy(matrix_a_agent, ttx, tty)
energy_new = energy_old + gained
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ttx, tty, ssx, ssy, energy_new)
update_fitness_probs(next_agent[tty, ttx], delta)
elif mtype == move_nomad_seed:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_src_tgt = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
class_choice = row_src[agent_class_choice]
energy_old = row_src[agent_energy]
use_new_target = False
if row_next_tgt[agent_class_choice] == type_void:
if row_src_tgt[agent_index] != agent_destination:
cost = agent_class_params[class_choice, type_cost]
energy_new = energy_old
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
delta = compute_energy_delta(energy_old, energy_new, cost)
update_fitness_probs(next_agent[ssy, ssx], delta)
continue
else:
if row_next_tgt[agent_index] != agent_destination:
cost = agent_class_params[class_choice, type_cost]
energy_new = energy_old
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
delta = compute_energy_delta(energy_old, energy_new, cost)
update_fitness_probs(next_agent[ssy, ssx], delta)
continue
use_new_target = True
if use_new_target:
nomad_take_seed(next_agent, ssx, ssy, ttx, tty)
else:
nomad_take_seed(matrix_a_agent, ssx, ssy, ttx, tty)
energy_new = move_compute_energy(matrix_a_agent, matrix_f, ssx, ssy, class_choice)
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
elif mtype == move_nomad_plant:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_src_tgt = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
class_choice = row_src[agent_class_choice]
energy_old = row_src[agent_energy]
fallback = False
use_new_target = False
if row_next_tgt[agent_class_choice] == type_void:
if row_src_tgt[agent_index] != agent_destination:
fallback = True
elif row_src_tgt[agent_class_choice] != type_architect:
fallback = True
else:
if row_next_tgt[agent_index] == agent_destination:
use_new_target = True
else:
fallback = True
if fallback:
cost = agent_class_params[class_choice, type_cost]
energy_new = energy_old
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
delta = compute_energy_delta(energy_old, energy_new, cost)
update_fitness_probs(next_agent[ssy, ssx], delta)
continue
if use_new_target:
nomad_plant_seed(next_agent, agent_class_params, ssx, ssy, ttx, tty)
gained = nomad_plant_compute_energy(next_agent, ttx, tty)
else:
nomad_plant_seed(matrix_a_agent, agent_class_params, ssx, ssy, ttx, tty)
gained = nomad_plant_compute_energy(matrix_a_agent, ttx, tty)
energy_new = energy_old + gained
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
elif mtype == move_architect:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_src_tgt = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
if row_src[agent_class_choice] != type_architect:
class_choice_new = row_src[agent_class_choice]
energy_new = move_compute_energy(matrix_a_agent, matrix_f, ssx, ssy, class_choice_new)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
continue
global_count[0] += 1
class_choice = row_src[agent_class_choice]
energy_old = row_src[agent_energy]
if row_next_tgt[agent_class_choice] != type_void:
energy_new = move_compute_energy(matrix_a_agent, matrix_f, ssx, ssy, class_choice)
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
continue
producer_architect_overflow_seed(next_agent, matrix_a_agent, agent_class_params, global_count, ssx, ssy, ttx, tty)
energy_new = move_compute_energy(matrix_a_agent, matrix_f, ssx, ssy, class_choice)
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
elif mtype == move_predator:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_src_tgt = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
class_choice = row_src[agent_class_choice]
energy_old = row_src[agent_energy]
fallback = False
use_new_target = False
if row_next_tgt[agent_class_choice] != type_void:
if row_next_tgt[agent_index] != agent_destination:
fallback = True
else:
use_new_target = True
if fallback:
energy_new = energy_old
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
continue
if use_new_target:
gained = eat_compute_energy(next_agent, ttx, tty)
else:
gained = eat_compute_energy(matrix_a_agent, ttx, tty)
energy_new = energy_old + gained
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ttx, tty, ssx, ssy, energy_new)
update_fitness_probs(next_agent[tty, ttx], delta)
elif mtype == move_defender:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
class_choice = row_src[agent_class_choice]
energy_old = row_src[agent_energy]
energy_new = defender_compute_energy(matrix_a_agent, matrix_f, ssx, ssy)
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
elif mtype == move_parasite:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_src_tgt = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
class_choice = row_src[agent_class_choice]
energy_old = row_src[agent_energy]
fallback = False
use_new_target = False
if row_next_tgt[agent_class_choice] != type_void:
if row_next_tgt[agent_index] != agent_destination:
fallback = True
else:
use_new_target = True
else:
if row_src_tgt[agent_index] != agent_destination:
fallback = True
if fallback:
energy_new = energy_old
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
continue
if use_new_target:
gained = parasite_compute_energy(next_agent, agent_class_params, ttx, tty)
else:
gained = parasite_compute_energy(matrix_a_agent, agent_class_params, ttx, tty)
energy_new = energy_old + gained
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
elif mtype == move_symbiont:
row_src = matrix_a_agent[ssy, ssx]
row_next_src = next_agent[ssy, ssx]
row_src_tgt = matrix_a_agent[tty, ttx]
row_next_tgt = next_agent[tty, ttx]
if row_next_src[agent_class_choice] != type_void:
row_src[agent_index] = 0
row_src[agent_class_choice] = type_void
continue
class_choice = row_src[agent_class_choice]
energy_old = row_src[agent_energy]
fallback = False
use_new_target = False
if row_next_tgt[agent_class_choice] != type_void:
if row_next_tgt[agent_index] != agent_destination:
fallback = True
else:
use_new_target = True
else:
if row_src_tgt[agent_index] != agent_destination:
fallback = True
if fallback:
energy_new = energy_old
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
continue
extracted = ext_compute_energy(matrix_a_agent, matrix_f, ssx, ssy)
gained_symbiont = extracted * 0.5
gained_target = extracted * 0.5
if use_new_target:
row_next_tgt[agent_energy] += gained_target
else:
row_src_tgt[agent_energy] += gained_target
energy_new = energy_old + gained_symbiont
cost = agent_class_params[class_choice, type_cost]
delta = compute_energy_delta(energy_old, energy_new, cost)
move_copy_and_clean(next_agent, matrix_a_agent, ssx, ssy, ssx, ssy, energy_new)
update_fitness_probs(next_agent[ssy, ssx], delta)
else:
continue
@njit(inline='always')
def sort_intentions_for_cell(intentions, count):
for i in range(1, count):
key = intentions[i]
j = i - 1
while j >= 0:
if intentions[j][move_type] > key[move_type]:
pass
elif intentions[j][move_type] == key[move_type] and intentions[j][velocity] < key[velocity]:
pass
else:
break
intentions[j + 1] = intentions[j]
j -= 1
intentions[j + 1] = key
# 26-01-2026
@njit(inline="always")
def born_copy_fitness_and_energy(na, ma, ttx, tty):
row_na = na[tty, ttx]
row_ma = ma[tty, ttx]
row_na[agent_fitness1] = row_ma[agent_seed1]
row_na[agent_fitness2] = row_ma[agent_seed2]
row_na[agent_fitness3] = row_ma[agent_seed3]
row_na[agent_fitness4] = row_ma[agent_seed4]
row_na[agent_energy] = row_ma[agent_energy]
row_na[agent_turn] = row_ma[agent_turn]
@njit(inline="always")
def born_copy_seed_probs_to_fitness_probs(na, ma, ttx, tty):
row_na = na[tty, ttx]
row_ma = ma[tty, ttx]
row_na[agent_fitness_prob1] = row_ma[agent_seed_prob1]
row_na[agent_fitness_prob2] = row_ma[agent_seed_prob2]
row_na[agent_fitness_prob3] = row_ma[agent_seed_prob3]
row_na[agent_fitness_prob4] = row_ma[agent_seed_prob4]
@njit(inline="always")
def born_reset_seed_fields(na, ttx, tty):
row_na = na[tty, ttx]
row_na[agent_seed1] = 0
row_na[agent_seed2] = 0
row_na[agent_seed3] = 0
row_na[agent_seed4] = 0
@njit(inline="always")
def move_copy_and_clean(na, ma, dx, dy, sx, sy, energy):
row_na = na[dy, dx]
row_ma = ma[sy, sx]
row_na[agent_class_choice] = row_ma[agent_class_choice]
row_na[agent_index] = row_ma[agent_index]
row_na[agent_is_died] = row_ma[agent_is_died]
row_na[agent_fitness_prob1] = row_ma[agent_fitness_prob1]
row_na[agent_fitness_prob2] = row_ma[agent_fitness_prob2]
row_na[agent_fitness_prob3] = row_ma[agent_fitness_prob3]
row_na[agent_fitness_prob4] = row_ma[agent_fitness_prob4]
row_na[agent_seed1] = row_ma[agent_seed1]
row_na[agent_seed2] = row_ma[agent_seed2]
row_na[agent_seed3] = row_ma[agent_seed3]
row_na[agent_seed4] = row_ma[agent_seed4]
row_na[agent_seed_prob1] = row_ma[agent_seed_prob1]
row_na[agent_seed_prob2] = row_ma[agent_seed_prob2]
row_na[agent_seed_prob3] = row_ma[agent_seed_prob3]
row_na[agent_seed_prob4] = row_ma[agent_seed_prob4]
row_na[agent_fitness1] = row_ma[agent_fitness1]
row_na[agent_fitness2] = row_ma[agent_fitness2]
row_na[agent_fitness3] = row_ma[agent_fitness3]
row_na[agent_fitness4] = row_ma[agent_fitness4]
row_na[agent_energy] = energy
row_na[agent_turn] = row_ma[agent_turn]
row_ma[agent_index] = 0
row_ma[agent_class_choice] = type_void
@njit(inline="always")
def move_compute_energy(ma, matrix_f, sx, sy, class_choice):
energy = ma[sy, sx][agent_energy]
if class_choice == type_architect or class_choice == type_nomad:
change = matrix_f[change_channel][sy][sx]
rain = matrix_f[rain_channel][sy][sx]
energy += change * rain
return energy
# 26-01-2026
@njit(inline="always")
def update_fitness_probs(row, delta):
p1 = row[agent_fitness_prob1]
p2 = row[agent_fitness_prob2]
p3 = row[agent_fitness_prob3]
p4 = row[agent_fitness_prob4]
class_choice = row[agent_class_choice]
if class_choice == row[agent_fitness1]:
p1 *= (1.0 + delta)
elif class_choice == row[agent_fitness2]:
p2 *= (1.0 + delta)
elif class_choice == row[agent_fitness3]:
p3 *= (1.0 + delta)
else:
p4 *= (1.0 + delta)
s = p1 + p2 + p3 + p4
inv = 1.0 / s
row[agent_fitness_prob1] = p1 * inv
row[agent_fitness_prob2] = p2 * inv
row[agent_fitness_prob3] = p3 * inv
row[agent_fitness_prob4] = p4 * inv
@njit(inline="always")
def compute_energy_delta(energy_old, energy_new, cost):
base = energy_old
if base < 0.0001:
base = 0.0001
return (energy_new - cost - energy_old) / base
# 28-01-2026
@njit(inline="always")
def ext_compute_energy(ma, matrix_f, sx, sy):
row = ma[sy, sx]
energy = row[agent_energy]
change = matrix_f[change_channel][sy][sx]
rain = matrix_f[rain_channel][sy][sx]
sun = matrix_f[sun_channel][sy][sx]
energy += change * (rain + sun)
return energy
@njit(inline="always")
def eat_compute_energy(ma, tx, ty):
row_producer = ma[ty, tx]
energy_producer = row_producer[agent_energy]
absorbed = energy_producer * 0.90
return absorbed
@njit(inline="always")
def nomad_take_seed(ma, sx, sy, tx, ty):
row_nomad = ma[sy, sx]
row_target = ma[ty, tx]
row_nomad[agent_seed1] = row_target[agent_seed1]
row_nomad[agent_seed2] = row_target[agent_seed2]
row_nomad[agent_seed3] = row_target[agent_seed3]
row_nomad[agent_seed4] = row_target[agent_seed4]
row_nomad[agent_seed_prob1] = row_target[agent_seed_prob1]
row_nomad[agent_seed_prob2] = row_target[agent_seed_prob2]
row_nomad[agent_seed_prob3] = row_target[agent_seed_prob3]
row_nomad[agent_seed_prob4] = row_target[agent_seed_prob4]
row_target[agent_seed1] = 0
row_target[agent_seed2] = 0
row_target[agent_seed3] = 0
row_target[agent_seed4] = 0
row_target[agent_seed_prob1] = 0.0
row_target[agent_seed_prob2] = 0.0
row_target[agent_seed_prob3] = 0.0
row_target[agent_seed_prob4] = 0.0
@njit(inline="always")
def nomad_plant_seed(ma, agent_class_params, sx, sy, tx, ty):
row_nomad = ma[sy, sx]
row_target = ma[ty, tx]
row_target[agent_seed1] = row_nomad[agent_seed1]
row_target[agent_seed2] = row_nomad[agent_seed2]
row_target[agent_seed3] = row_nomad[agent_seed3]
row_target[agent_seed4] = row_nomad[agent_seed4]
row_target[agent_seed_prob1] = row_nomad[agent_seed_prob1]
row_target[agent_seed_prob2] = row_nomad[agent_seed_prob2]
row_target[agent_seed_prob3] = row_nomad[agent_seed_prob3]
row_target[agent_seed_prob4] = row_nomad[agent_seed_prob4]
row_target[agent_class_choice] = type_seed
row_target[agent_is_died] = 0
row_target[agent_turn] = 0
row_target[agent_energy] = agent_class_params[row_target[agent_seed1], 0]
row_nomad[agent_seed1] = 0
row_nomad[agent_seed2] = 0
row_nomad[agent_seed3] = 0
row_nomad[agent_seed4] = 0
row_nomad[agent_seed_prob1] = 0.0
row_nomad[agent_seed_prob2] = 0.0
row_nomad[agent_seed_prob3] = 0.0
row_nomad[agent_seed_prob4] = 0.0
@njit(inline="always")
def nomad_plant_compute_energy(ma, tx, ty):
row_architect = ma[ty, tx]
energy_architect = row_architect[agent_energy]
absorbed = energy_architect * 0.50
return absorbed
@njit(inline="always")
def producer_architect_overflow_seed(next_agent, matrix_old, agent_class_params, global_count, sx, sy, tx, ty):
row_arch_old = matrix_old[sy, sx]
row_target_new = next_agent[ty, tx]
if row_target_new[agent_class_choice] != type_void:
return
row_target_new[agent_seed1] = row_arch_old[agent_seed1]
row_target_new[agent_seed2] = row_arch_old[agent_seed2]
row_target_new[agent_seed3] = row_arch_old[agent_seed3]
row_target_new[agent_seed4] = row_arch_old[agent_seed4]
row_target_new[agent_seed_prob1] = row_arch_old[agent_seed_prob1]
row_target_new[agent_seed_prob2] = row_arch_old[agent_seed_prob2]
row_target_new[agent_seed_prob3] = row_arch_old[agent_seed_prob3]
row_target_new[agent_seed_prob4] = row_arch_old[agent_seed_prob4]
row_target_new[agent_class_choice] = type_seed
row_target_new[agent_is_died] = 0
row_target_new[agent_turn] = 0
row_target_new[agent_index] = global_count[0]
row_target_new[agent_energy] = agent_class_params[row_target_new[agent_seed1], 0]
row_arch_new = next_agent[sy, sx]
row_arch_new[agent_seed1] = 0
row_arch_new[agent_seed2] = 0
row_arch_new[agent_seed3] = 0
row_arch_new[agent_seed4] = 0
row_arch_new[agent_seed_prob1] = 0.0
row_arch_new[agent_seed_prob2] = 0.0
row_arch_new[agent_seed_prob3] = 0.0
row_arch_new[agent_seed_prob4] = 0.0
@njit(inline="always")
def defender_compute_energy(ma, matrix_f, sx, sy):
row = ma[sy, sx]
energy = row[agent_energy]
change = matrix_f[change_channel][sy][sx]
sun = matrix_f[sun_channel][sy][sx]
energy += change * sun
return energy
#30-01-2026
@njit(inline="always")
def parasite_compute_energy(ma, agent_class_params, tx, ty):
row_target = ma[ty, tx]
class_choice = type_parasite
cost = agent_class_params[class_choice, type_cost]
absorbed = cost * 2.0
row_target[agent_energy] -= absorbed
return absorbed * 0.90
sse webserver → reverse 4444 for sse
# sse/sse_server.py
# 06-01-2026
import time
import random
# 07-01-2026
import json
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from collections import deque
# 10-01-2026
import base64
# 18-01-2026
from vars.vars import agent_class_choice
import zstandard as zstd
# 26-01-2026
import numpy as np
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
class SSEServer:
def __init__(self, fifo_sse, sleep_min=0.001, sleep_max=0.005):
self.fsse = fifo_sse
self.sleep_min = sleep_min
self.sleep_max = sleep_max
self.running = True
self.snapshots = deque(maxlen=1)
self.cctx = zstd.ZstdCompressor()
def stop(self):
self.running = False
def compute(self):
last_fps_time = time.time()
frame_counter = 0
server_fps = 0.0
while self.running:
matrix = self.fsse.pop_for_sse()
if matrix is None:
time.sleep(random.uniform(self.sleep_min, self.sleep_max))
continue
frame_counter += 1
now = time.time()
if now - last_fps_time >= 1.0:
server_fps = frame_counter / (now - last_fps_time)
frame_counter = 0
last_fps_time = now
env = matrix.env_view
matrix_bytes = matrix.agent[agent_class_choice].tobytes()
matrix_zstd = self.cctx.compress(matrix_bytes)
matrix_b64 = base64.b64encode(matrix_zstd).decode()
snapshot = {
"env": {
"tick": env["tick"],
"day_phase": env["day_phase"],
"year_phase": env["year_phase"],
"season": env["season"],
"sun_intensity": env["sun_intensity"],
"sun_day_factor": env["sun_day_factor"],
"sun_season_factor": env["sun_season_factor"],
"clouds_north": [
{"x": float(c["x"]), "y": float(c["y"]), "radius": c["radius"]}
for c in env["clouds_north"]
],
"clouds_south": [
{"x": float(c["x"]), "y": float(c["y"]), "radius": c["radius"]}
for c in env["clouds_south"]
],
"axial_tilt": env["axial_tilt"],
"day_length": env["day_length"],
"year_length": env["year_length"],
"simulation_years": env["simulation_years"],
"serverFPS": int(server_fps),
"global_count": int(matrix.global_count[0])
},
"matrix": matrix_b64
}
snapshot_json = json.dumps(snapshot)
self.snapshots.append(snapshot_json)
self.fsse.push_from_sse(matrix)
def start_http_server(self, host="127.0.0.1", port=4444):
sse_server = self
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/debug":
if sse_server.snapshots:
snap_json = sse_server.snapshots[0]
payload = snap_json.encode()
else:
payload = json.dumps({"status": "waiting"}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
return
if self.path == "/stream":
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.end_headers()
last_sent = None
last_heartbeat = time.time()
try:
while True:
if sse_server.snapshots:
snap_json = sse_server.snapshots[0]
if snap_json != last_sent:
msg = f"data: {snap_json}\n\n"
self.wfile.write(msg.encode())
self.wfile.flush()
last_sent = snap_json
now = time.time()
if now - last_heartbeat > 5:
try:
self.wfile.write(b": ping\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
return
last_heartbeat = now
time.sleep(0.04)
except (BrokenPipeError, ConnectionResetError):
return
self.send_response(404)
self.end_headers()
def log_message(self, *args):
return
def do_HEAD(self):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
def do_OPTIONS(self):
self.send_response(200)
self.send_header("Allow", "GET, HEAD, OPTIONS")
self.send_header("Content-Length", "0")
self.end_headers()
def do_POST(self):
self.send_response(405)
self.end_headers()
def do_PUT(self):
self.send_response(405)
self.end_headers()
def do_DELETE(self):
self.send_response(405)
self.end_headers()
httpd = ThreadingHTTPServer((host, port), Handler)
httpd.serve_forever()
Main → orchestrates execution and connects all scripts.
# main.py
# 28-12-2025
import threading
import time
from matrix.fifo_matrix import FifoMatrix
from time_energy.time_energy import TimeEnergy
from agents.population import Population
from matrix.fifo_population_sse import FifoPopulationSSE
from webserver.sse_server import SSEServer
# 17-01-2026
from agents.clean_backup import CleanBackup
from vars.vars import backup_dir
def main():
fm = FifoMatrix(buffer_size=3, size=256, channels=4)
fsse = FifoPopulationSSE(buffer_size=3, size=256)
te = TimeEnergy(fifo_matrix=fm)
pop = Population(fifo_matrix=fm, fifo_sse=fsse)
sse = SSEServer(fifo_sse=fsse)
clr = CleanBackup(backup_dir=backup_dir, keep_last=1440, interval=600)
te_thread = threading.Thread(target=te.compute, daemon=True)
te_thread.start()
pop_thread = threading.Thread(target=pop.compute, daemon=True)
pop_thread.start()
sse_thread = threading.Thread(target=sse.compute, daemon=True)
sse_thread.start()
sse_http_thread = threading.Thread(target=sse.start_http_server, kwargs={"port": 4444}, daemon=True)
sse_http_thread.start()
clr_thread = threading.Thread( target=clr.loop, daemon=True )
clr_thread.start()
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print("Stopping...")
te.stop()
pop.stop()
sse.stop()
clr.stop()
te_thread.join()
pop_thread.join()
sse_thread.join()
clr_thread.join()
if __name__ == "__main__":
main()
# vars/vars.py
# 28-12-2025
matrix_size = 256
backup_dir = "/home/bitnami/tcondue.online/backup/"
backup_world_energy_state = "/home/bitnami/tcondue.online/backup/world_energy_state"
# 29-12-2025
sun_channel = 0
rain_channel = 1
air_channel = 2
change_channel = 3
# 05-01-2026
backup_population = "/home/bitnami/tcondue.online/backup/population"
backup_interval = 60
# 16-01-2026
import numpy as np
type_void = 0
type_seed = 1
type_producer = 2
type_consumer = 3
type_nomad = 4
type_architect = 5
type_predator = 6
type_defender = 7
type_parasite = 8
type_symbiont = 9
type_energy = 0
type_speed_l = 1
type_cost = 2
type_speed = 3
type_dead_prob = 4
agent_class = np.array([
#type
# energy #seed_l #cost #speed #dead_prob
[0.000, 0.000000, 0.0000000, 0.000, 0.00000000 ], # void
[10.00, 0.000000, 0.0010000, 1.000, 0.00000000 ], # seed
[0.250, 0.500000, 0.0000200, 1.000, 0.00000010 ], # producer
[1.250, 2.500000, 0.0005000, 0.500, 0.00001000 ], # consumer
[1.500, 3.000000, 0.0000010, 0.750, 0.00005000 ], # nomad
[0.250, 0.500000, 0.0000010, 1.000, 0.00000010 ], # architect
[3.500, 7.000000, 0.0005000, 0.250, 0.00100000 ], # predator
[9.999, 128.0000, 0.0000010, 1.000, 0.00100000 ], # defender
[1.500, 3.000000, 0.0010000, 0.500, 0.00100000 ], # parasite
[1.000, 2.000000, 0.0001000, 0.500, 0.00100000 ], # symbiont
], dtype=np.float32)
agent_energy = "agent_energy"
agent_index = "agent_index"
agent_class_choice = "agent_class_choice"
agent_is_died = "agent_is_died"
agent_seed1 = "agent_seed1"
agent_seed2 = "agent_seed2"
agent_seed3 = "agent_seed3"
agent_seed4 = "agent_seed4"
agent_seed_prob1 = "agent_seed_prob1"
agent_seed_prob2 = "agent_seed_prob2"
agent_seed_prob3 = "agent_seed_prob3"
agent_seed_prob4 = "agent_seed_prob4"
agent_fitness1 = "agent_fitness1"
agent_fitness2 = "agent_fitness2"
agent_fitness3 = "agent_fitness3"
agent_fitness4 = "agent_fitness4"
agent_fitness_prob1 = "agent_fitness_prob1"
agent_fitness_prob2 = "agent_fitness_prob2"
agent_fitness_prob3 = "agent_fitness_prob3"
agent_fitness_prob4 = "agent_fitness_prob4"
agent_turn = "agent_turn"
agent_fields = {
agent_energy: np.float32,
agent_index: np.int64,
agent_class_choice: np.int8,
agent_is_died: np.uint8,
agent_seed1: np.int8,
agent_seed2: np.int8,
agent_seed3: np.int8,
agent_seed4: np.int8,
agent_seed_prob1: np.float32,
agent_seed_prob2: np.float32,
agent_seed_prob3: np.float32,
agent_seed_prob4: np.float32,
agent_fitness1: np.int8,
agent_fitness2: np.int8,
agent_fitness3: np.int8,
agent_fitness4: np.int8,
agent_fitness_prob1: np.float32,
agent_fitness_prob2: np.float32,
agent_fitness_prob3: np.float32,
agent_fitness_prob4: np.float32,
agent_turn: np.int32
}
# 20-01-2026
max_intent = 9
sx = "sx"
sy = "sy"
tx = "tx"
ty = "ty"
move_type = "move_type"
velocity = "velocity"
id_src = "id_src"
id_dest = "id_dest"
intent_dtype = np.dtype([
(sx, np.int16),
(sy, np.int16),
(tx, np.int16),
(ty, np.int16),
(move_type, np.int8),
(velocity, np.float32),
(id_src, np.int64),
(id_dest, np.int64),
])
move_born = 0
move_producer = 1
move_architect = 2
move_defender = 3
move_move = 4
move_nomad_seed = 5
move_nomad_plant = 6
move_consumer = 7
move_predator = 8
move_parasite = 9
move_symbiont = 10
# 21-01-2026
mutation_rate = 0.01
note:
# ..
# 1-12-2025
class Joypad: #oracle
def __init__(self, name="t2"):
self.name = name
def sendCommand(self, pubkey, message, sign, command):
return True