import numpy as np import matplotlib.pyplot as plt from matplotlib import colors, cm from matplotlib.collections import PatchCollection import scipy.interpolate as interpolate from scipy import ndimage import math from io import BytesIO import pandas as pd from shapely.geometry import Point, MultiPoint from descartes import PolygonPatch from constants import INPUTS, SEASONS from draw import draw from train import A_params from model import Model from utils import * parameters = { 'width': { 'default': 360, 'type': 'int', }, 'height': { 'default': 180, 'type': 'int', }, 'mountain_ratio': { 'default': 0.3, 'type': 'float', 'min': 0, 'max': 1, 'step': 0.01 }, 'sharpness': { 'default': 0.9, 'type': 'float', 'min': 0, 'max': 1, 'step': 0.01 }, 'max_elevation': { 'default': 10000, 'type': 'int', 'min': 0, 'max': 10000, }, 'min_elevation': { 'default': -400, 'type': 'int', 'min': -1000, 'max': 0 }, 'ground_noise': { 'default': 6000, 'type': 'int', 'min': 0, 'max': 10000, }, 'water_proportion': { 'default': 0.3, 'type': 'float', 'min': 0, 'max': 0.99, 'step': 0.01 }, 'mountain_concentration': { 'default': 1, 'type': 'float', 'min': 0, 'max': 5, 'step': 0.1 }, 'mountain_sea_distance': { 'default': 50, 'type': 'int', 'min': 0, 'max': 200, }, 'mountain_sea_threshold': { 'default': 2, 'type': 'int', 'min': 0, 'max': 5, }, 'water_level': { 'default': 0, 'type': 'int', }, 'mountain_area_elevation': { 'default': 0.4, 'type': 'float', 'min': 0, 'max': 1, 'step': 0.01 }, 'mountain_area_elevation_points': { 'default': 5, 'type': 'int', 'min': 0, 'max': 15, }, 'mountain_area_elevation_area': { 'default': 10, 'type': 'int', 'min': 0, 'max': 25, }, 'continents': { 'default': 5, 'type': 'int', }, 'continent_spacing': { 'default': 0.3, 'type': 'float', 'min': 0, 'max': 1, 'step': 0.1 }, 'biomes': { 'default': False, 'type': 'bool' }, 'mean_temperature': { 'default': -4.2, 'type': 'float', 'step': 1, }, 'mean_precipitation': { 'default': 45.24, 'type': 'float', 'step': 1, }, 'seed': { 'default': '', 'type': 'int', 'description': 'Leave empty for a random seed generated from the current timestamp.' }, } p = { k: parameters[k]['default'] for k in parameters } CONTINENT_MAX_TRIALS = 1e4 SEA_COLOR = np.array((53, 179, 220, 255)) / 255 DIRECTIONS = [(-1, -1), (-1, 0), (-1, 1), (1, 1), (1, 0), (1, -1), (0, -1), (0, 1)] def s(x): return -2 * x**3 + 3 * x**2 def is_ground(value): return value > p['water_level'] # TODO: should check as a sphere def in_range(p, m, size): x, y = p mx, my = m return ((x - mx)**2 + (y - my)**2) < size def max_recursion(fn, max_recursion=0): def f(*args, recursion=0, **kwargs): if recursion > max_recursion: return return fn(*args, **kwargs) def bound_check(ground, point): x, y = point w, h = ground.shape if x < 0: x = w + x elif x >= w: x = x - w if y < 0: y = h + y elif y >= h: y = y - h if x < 0 or x >= w or y < 0 or y >= h: return bound_check(ground, (x, y)) return (x, y) def continent_agent(ground, position, size): if size <= 0: return x, y = position w, h = ground.shape trials = 0 while True: # if trials > CONTINENT_MAX_TRIALS: # print('couldnt proceed') if size <= 0 or trials > CONTINENT_MAX_TRIALS: break # if size <= 0: break dx = np.random.randint(2) or -1 dy = np.random.randint(2) or -1 r = np.random.randint(3) new_point = bound_check(ground, (x + dx, y + dy)) if r == 0: x = new_point[0] elif r == 1: y = new_point[1] else: x, y = new_point x, y = bound_check(ground, (x, y)) if not is_ground(ground[x, y]) and in_range((x, y), position, size**2 * np.pi): trials = 0 size -= 1 ground[x, y] = np.random.randint(p['water_level'] + 1, p['ground_noise']) else: trials += 1 def neighbours(ground, position, radius): x, y = position return ground[x-radius:x+radius+1, y-radius:y+radius+1] def away_from_sea(ground, position, radius=p['mountain_sea_distance']): ns = neighbours(ground, position, radius).flatten() sea = len([1 for x in ns if not is_ground(x)]) return sea < p['mountain_sea_threshold'] def random_elevate_agent(ground, position, height, size=p['mountain_area_elevation_points']): position = position + np.random.random_integers(-p['mountain_area_elevation_area'], p['mountain_area_elevation_area'], size=2) for i in range(size): d = DIRECTIONS[np.random.randint(len(DIRECTIONS))] change = height * p['mountain_area_elevation'] new_index = bound_check(ground, position + np.array(d)) if is_ground(ground[new_index]): ground[new_index] += change def mountain_agent(ground, position): if not away_from_sea(ground, position): return x, y = position height = np.random.randint(p['max_elevation']) ground[x, y] = height last_height = height for i in range(1, height): for d in DIRECTIONS: change = np.random.randint(p['mountain_concentration'] + 1) distance = np.array(d)*i new_index = bound_check(ground, position + distance) if is_ground(ground[new_index]): ground[new_index] = last_height - change last_height = last_height - change if last_height < 0: break random_elevate_agent(ground, position, height) # takes an initial position and a list of (direction, probability) tuples to walk on # def split_agent(ground, position, directions): def constant_filter(a): if a[0] > (1 - p['sharpness']): return max(1, a[0]) return 0 def generate_map(biomes=False, **kwargs): plt.clf() p.update(kwargs) rs = np.random.randint(0, 999) np.random.seed(p['seed'] or rs) print('seed', rs) width, height = p['width'], p['height'] continents = p['continents'] ground = np.zeros((width, height)) ground_size = width * height * (1 - p['water_proportion'])**3 print(ground_size / ground.size) # position = (int(width / 2), int(height / 2)) # ground_size = width * height * GROUND_PROPORTION # continent_agent(ground, position, size=ground_size) position = (0, int(height / 2)) ym = 1 for continent in range(continents): position = (position[0] + np.random.randint(p['continent_spacing'] * width * 0.8, p['continent_spacing'] * width * 1.2), position[1] + ym * np.random.randint(p['continent_spacing'] * height * 0.8, p['continent_spacing'] * height * 1.2)) ym = ym * -1 random_size = ground_size / continents continent_agent(ground, position, size=random_size) ground = ndimage.gaussian_filter(ground, sigma=(1 - p['sharpness']) * 20) for i in range(int(ground_size * p['mountain_ratio'] / (p['max_elevation'] / 2))): position = (np.random.randint(0, width), np.random.randint(0, height)) mountain_agent(ground, position) norm = colors.Normalize(vmin=p['water_level'] + 1) greys = cm.get_cmap('Greys') greys.set_under(color=SEA_COLOR) ground = ndimage.gaussian_filter(ground, sigma=1) ground = ndimage.generic_filter(ground, constant_filter, size=1) print(np.min(ground), np.max(ground), p['max_elevation']) print('water proportion', 1 - (np.count_nonzero(ground) / ground.size)) plt.gca().invert_yaxis() plt.imshow(ground.T, cmap=greys, norm=norm) plt.gca().invert_yaxis() if biomes: generate_biomes(ground) figfile = BytesIO() plt.savefig(figfile, format='png') figfile.seek(0) return figfile def generate_biomes(ground): width, height = p['width'], p['height'] height_to_latitude = to_range(0, height, -90, 90) width_to_longitude = to_range(0, width, -180, 180) print('generate_biomes') data = {} for col in ['longitude', 'latitude', 'elevation', 'distance_to_water']: data[col] = [] points = [] for x, y in np.ndindex(ground.shape): v = ground[x,y] if v > p['water_level']: points.append((x, y)) data['longitude'].append(width_to_longitude(x)) data['latitude'].append(height_to_latitude(y)) data['elevation'].append(v) points = MultiPoint(points) boundary = points.convex_hull.boundary # fig = plt.figure() # ax = fig.add_subplot(111) # minx, miny, maxx, maxy = boundary.bounds # w, h = maxx - minx, maxy - miny # ax.set_xlim(minx - 0.2 * w, maxx + 0.2 * w) # ax.set_ylim(miny - 0.2 * h, maxy + 0.2 * h) # ax.set_aspect(1) # ax.add_collection(PatchCollection([PolygonPatch(boundary_buf, fc='red', ec='black', zorder=1)], match_original=True)) # plt.show() for x, y in np.ndindex(ground.shape): if ground[x, y] > p['water_level']: d = Point(x, y).distance(boundary) d = (d - root_df['distance_to_water'].mean()) / root_df['distance_to_water'].std() d = max(0, d) data['distance_to_water'].append(d) df = pd.DataFrame(data) print(df['elevation'].min(), df['elevation'].max()) print('distance_to_water', df['distance_to_water'].min(), df['distance_to_water'].max()) print('dtw', df['distance_to_water'].mean(), df['distance_to_water'].std()) print(df['latitude'].min(), df['latitude'].max()) print('running prediction models') print(p['mean_precipitation'], p['mean_temperature']) result = predict_end_to_end(df) # fig = plt.figure() # ax = fig.add_subplot(111) # minx, miny, maxx, maxy = boundary.bounds # w, h = maxx - minx, maxy - miny # ax.set_xlim(minx - 0.2 * w, maxx + 0.2 * w) # ax.set_ylim(miny - 0.2 * h, maxy + 0.2 * h) # ax.set_aspect(1) # ax.add_collection(PatchCollection([PolygonPatch(boundary.buffer(0.1), fc='red', ec='black', zorder=1)], match_original=True)) # plt.show() df = pd.read_pickle('data.p') root_df = df print(df['elevation'].min(), df['elevation'].max()) print('distance_to_water', df['distance_to_water'].min(), df['distance_to_water'].max()) print('dtw', df['distance_to_water'].mean(), df['distance_to_water'].std()) print(df['latitude'].min(), df['latitude'].max()) def predict_end_to_end(input_df, checkpoint_temp='checkpoints/temp.h5', checkpoint_precip='checkpoints/precip.h5', checkpoint_biomes='checkpoints/b.h5', year=2000): batch_size = A_params['batch_size']['grid_search'][0] layers = A_params['layers']['grid_search'][0] optimizer = A_params['optimizer']['grid_search'][0](A_params['lr']['grid_search'][0]) Temp = Model('temp', epochs=1) Temp.prepare_for_use( batch_size=batch_size, layers=layers, dataset_fn=dataframe_to_dataset_temp, optimizer=optimizer, out_activation=None, loss='mse', metrics=['mae'] ) Temp.restore(checkpoint_temp) Precip = Model('precip', epochs=1) Precip.prepare_for_use( batch_size=batch_size, layers=layers, dataset_fn=dataframe_to_dataset_temp, optimizer=optimizer, out_activation=None, loss='mse', metrics=['mae'] ) Precip.restore(checkpoint_precip) Biomes = Model('b', epochs=1) Biomes.prepare_for_use() Biomes.restore(checkpoint_biomes) inputs = input_df[INPUTS] inputs.loc[:, 'mean_temp'] = p['mean_temperature'] inputs_copy = df[INPUTS] inputs_copy.loc[:, 'mean_temp'] = mean_temperature_over_years(df, size=inputs_copy.shape[0]) inputs = inputs.to_numpy() inputs = normalize_ndarray(inputs, inputs_copy) out_columns = ['temp_{}_{}'.format(season, year) for season in SEASONS] out = Temp.predict(inputs) temp_output = pd.DataFrame(data=denormalize(out, df[out_columns].to_numpy()), columns=out_columns) inputs = input_df[INPUTS] inputs.loc[:, 'mean_precip'] = p['mean_precipitation'] inputs_copy = df[INPUTS] inputs_copy.loc[:, 'mean_precip'] = mean_precipitation_over_years(df, size=inputs_copy.shape[0]) inputs = inputs.to_numpy() inputs = normalize_ndarray(inputs, inputs_copy) out_columns = ['precip_{}_{}'.format(season, year) for season in SEASONS] out = Precip.predict(inputs) precip_output = pd.DataFrame(data=denormalize(out, df[out_columns].to_numpy()), columns=out_columns) inputs = list(INPUTS) frame = input_df[inputs + ['longitude']] for season in SEASONS: tc = 'temp_{}_{}'.format(season, year) pc = 'precip_{}_{}'.format(season, year) frame.loc[:, tc] = temp_output[tc] frame.loc[:, pc] = precip_output[pc] frame.loc[:, 'latitude'] = input_df['latitude'] frame_cp = frame.copy() columns = ['latitude', 'longitude', 'biome_num'] new_data = pd.DataFrame(columns=columns) nframe = pd.DataFrame(columns=frame.columns, data=normalize_ndarray(frame.to_numpy(), df[frame.columns].to_numpy())) for season in SEASONS: inputs += [ 'temp_{}_{}'.format(season, year), 'precip_{}_{}'.format(season, year) ] for i, (chunk, chunk_original) in enumerate(zip(chunker(nframe, Biomes.batch_size), chunker(frame_cp, Biomes.batch_size))): if chunk.shape[0] < Biomes.batch_size: continue input_data = chunk.loc[:, inputs].values out = Biomes.predict_class(input_data) f = pd.DataFrame({ 'longitude': chunk_original.loc[:, 'longitude'], 'latitude': chunk_original.loc[:, 'latitude'], 'biome_num': out }, columns=columns) new_data = new_data.append(f) draw(new_data, earth=False, only_draw=True, longitude_max=p['width'], latitude_max=p['height'], alpha=0.7) if __name__ == "__main__": # p['width'] = 300 # p['height'] = 250 generate_map(True) plt.show()