import os
import re
import math
import time
import shutil
import numpy as np
import pathos.multiprocessing as mp
from numpy.polynomial.polynomial import polyfit
from scipy import stats
from scipy.stats import gaussian_kde, variation
from sklearn.metrics import mean_squared_error, r2_score
from astropy.convolution import convolve, CustomKernel
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from matplotlib.offsetbox import AnchoredText
from matplotlib import patches
from osgeo import gdal, ogr
from osgeo.gdalconst import GA_Update
from spatialist import haversine, Raster, Vector, crsConvert, gdalwarp, gdal_translate, intersect
from spatialist.ancillary import finder
[docs]def scatter(x, y, z=None, xlab='', ylab='', title='', nsamples=1000, mask=None,
measures=None, regline=False, o2o=False, denscol=False, grid=False,
xlim=None, ylim=None, sort_z=False, legend=False,
regline_label='regression', o2o_label='1-to-1'):
"""
general function for creating scatter plots.
Parameters
----------
x: numpy.ndarray
dataset I
y: numpy.ndarray
dataset II
z: numpy.ndarray
dataset III for coloring the data points; overrides parameter `denscol`
xlab: str
the x-axis label
ylab: str
the y-axis label
title: str
the plot title
nsamples: int
the number of data samples to plot
mask: numpy.ndarray
an optional array for masking the datasets
measures: list
additional measures to be printed in a text box; current options:
- `eq`: the linear regression equation
- `rmse`
- `r2`
- `n`: the number of samples
- `cv_x`, `cv_y`: the coefficient of variation of either `x` or `y`
- `mean_x`, `mean_y`: the mean value of either `x` or `y`
regline: bool
draw a linear regression line?
o2o: bool
draw a data one-to-one line?
denscol: bool
color the points by Gaussian density?; overridden by parameter `z`
grid: bool
add a mesh grid to the plot?
xlim: tuple
the x-axis limits
ylim: tuple
the y-axis limits
sort_z: bool
if `z` is not None, sort its values so that points with high `z`
values are plotted last?
legend: bool
add a legend for the regression line and one-to-one line if they exist?
regline_label: str
the legend label for the regression line
o2o_label: str
the legend label for the one-to-one line
Returns
-------
"""
nanmask = np.isfinite(x) & np.isfinite(y)
if mask is None:
mask = nanmask
else:
mask = mask & nanmask
sample_ids = sampler(mask, nsamples)
x = x.flatten()[sample_ids]
y = y.flatten()[sample_ids]
measures = [] if measures is None else measures
fields = []
if regline or 'eq' in measures:
b, m = polyfit(x, y, 1)
if 'eq' in measures:
sign = '+' if m > 0 else '-'
fields.append('y = {:.2f} {} {:.2f} * x'.format(b, sign, abs(m)))
if 'rmse' in measures:
rmse = round(math.sqrt(mean_squared_error(x, y)), 2)
fields.append('RMSE = {:.2f}'.format(rmse))
if 'r2' in measures:
r2 = round(r2_score(x, y), 2)
fields.append('$R^2$ = {:.2f}'.format(r2))
if 'n' in measures:
fields.append('n = {}'.format(len(x)))
if 'cv_x' in measures:
cv = round(float(variation(x)), 4)
fields.append('CV(x) = {}'.format(cv))
if 'cv_y' in measures:
# cv = variation(y)
cv = round(float(variation(y)), 4)
fields.append('CV(y) = {}'.format(cv))
if 'mean_x' in measures:
mean = np.mean(x)
fields.append(r'$\mu$(x) = {:.2f}'.format(mean))
if 'mean_y' in measures:
mean = np.mean(y)
fields.append(r'$\mu$(y) = {:.2f}'.format(mean))
text = '\n'.join(fields)
if z is not None:
z = z.flatten()[sample_ids]
denscol = False
# # Sort the points by z, so that points with high z values are plotted last
if sort_z:
idx = z.argsort()
x, y, z = x[idx], y[idx], z[idx]
if denscol:
# # Calculate the point density
xy = np.vstack([x, y])
z = gaussian_kde(xy)(xy)
# # Sort the points by density, so that the densest points are plotted last
idx = z.argsort()
x, y, z = x[idx], y[idx], z[idx]
plt.scatter(x, y, c=z, s=20, edgecolor=None, zorder=3)
# set axes range
if xlim is not None:
plt.xlim(*xlim)
if ylim is not None:
plt.ylim(*ylim)
# determine x-axis limits and compute border offset for lines
xmin, xmax = plt.xlim()
xo = (xmax - xmin) / 100 * 2
if len(text) > 0:
text_box = AnchoredText(text, frameon=True, loc='lower right')
plt.setp(text_box.patch, facecolor='white') # , alpha=0.5
plt.gca().add_artist(text_box)
if regline:
ffit = np.poly1d((m, b))
x_new = np.linspace(xmin + xo, xmax - xo, num=2)
plt.plot(x_new, ffit(x_new), color='red', zorder=2, label=regline_label)
if o2o:
ax = plt.gca()
line = Line2D([0, 1], [0, 1], color='black', zorder=1, label=o2o_label)
transform = ax.transAxes
line.set_transform(transform)
ax.add_line(line)
plt.title(title)
plt.xlabel(xlab)
plt.ylabel(ylab)
if legend:
ax = plt.gca()
handles, labels = ax.get_legend_handles_labels()
if len(handles) > 0:
plt.legend(handles, labels, loc='upper left')
if grid:
plt.grid()
def one2oneSARplotsratio(in_sar1_img, in_sar2_img, vv_band, vh_band, sar1_ref, sar2_ref, lc, lc_ints, lc_string):
# Read data.
sar1 = readimg2array(in_sar1_img, vv_band, 2) / readimg2array(in_sar1_img, vh_band, 2)
sar2 = readimg2array(in_sar2_img, vv_band, 2) / readimg2array(in_sar2_img, vh_band, 2)
sar1 = 10 * np.log10(sar1)
sar2 = 10 * np.log10(sar2)
for p, i in enumerate(lc_ints):
start_time = time.time()
s1_c = sar1[lc == i]
s2_c = sar2[lc == i]
sar3 = s1_c[(np.isfinite(s1_c)) & (np.isfinite(s2_c))]
sar4 = s2_c[(np.isfinite(s1_c)) & (np.isfinite(s2_c))]
print(np.shape(sar3))
print(np.shape(sar4))
lowest = min([min(sar3), min(sar4)])
highest = max([max(sar3), max(sar4)])
o2o_min = [lowest, highest]
o2o_max = [lowest, highest]
num = len(sar3)
rmse = math.sqrt(mean_squared_error(sar3, sar4))
print(rmse)
r2 = r2_score(sar3, sar4)
print(r2)
slope, intercept, r_value, p_value, std_err = stats.linregress(sar3, sar4)
line = slope * s1_c + intercept
print(r_value ** 2, p_value, std_err, intercept, slope)
# Plot SAR vars sliced to pixels of land cover.
plt.scatter(sar1[lc == i], sar2[lc == i], s=0.1)
plt.xlabel('%s' % (sar1_ref))
plt.ylabel('%s' % (sar2_ref))
plt.title('%s \n %s vs %s \n N=%s' % (lc_string[p], sar1_ref, sar2_ref, num))
plt.xlim(lowest, highest)
plt.ylim(lowest, highest)
# Add one to one line.
plt.plot(o2o_min, o2o_max, color="black")
# Add best fit line.
# plt.plot(np.unique(s1_c), np.poly1d(np.polyfit(s1_c, s2_c, 1))(np.unique(s1_c)))
plt.plot(s1_c, line, color="red")
plt.grid()
plt.show()
print("--- %s seconds ---" % (time.time() - start_time))
sar1 = None
sar2 = None
sar3 = None
sar4 = None
# Function to read first band of image and convert to dB is SAR input is specified.
def readimg2array(in_img, band, sar):
start_time = time.time()
ds = gdal.Open(in_img)
band = ds.GetRasterBand(band)
cols = ds.RasterXSize
rows = ds.RasterYSize
print(cols, rows)
print("--- %s seconds ---" % (time.time() - start_time))
if sar == 1:
x = band.ReadAsArray()
x[(x == -32768) | (x == 0.0)] = 'NaN'
return 10 * np.log10(x)
elif sar == 2:
x = band.ReadAsArray()
x[(x == -32768) | (x == 0.0)] = 'NaN'
return x
else:
x = band.ReadAsArray()
return x
def simplify_lc(in_lc):
in_lc = np.where((in_lc <= 11), 1, in_lc)
in_lc = np.where((in_lc >= 12) & (in_lc <= 22), 12, in_lc)
in_lc = np.where((in_lc >= 23) & (in_lc <= 34), 23, in_lc)
in_lc = np.where((in_lc >= 35) & (in_lc <= 39), 35, in_lc)
in_lc = np.where((in_lc >= 40) & (in_lc <= 44), 40, in_lc)
return in_lc
[docs]def dem_aspect(img):
"""
compute the aspect of a DEM.
Parameters
----------
img: numpy.ndarray
the DEM array
Returns
-------
numpy.ndarray
the computed aspect array
"""
boundary = 'extend'
kernel = CustomKernel(np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]]) / 8.)
xchangerate_array = -convolve(img, kernel, normalize_kernel=False, boundary=boundary,
nan_treatment='fill', fill_value=np.nan)
kernel = CustomKernel(np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]]) / 8.)
ychangerate_array = -convolve(img, kernel, normalize_kernel=False, boundary=boundary,
nan_treatment='fill', fill_value=np.nan)
aspect = np.rad2deg(np.arctan2(ychangerate_array, -xchangerate_array))
aspect_value = np.copy(aspect)
# numpy raises a warning if np.nan values are set to False in a mask
with np.errstate(invalid='ignore'):
mask = aspect < 0.0
aspect_value[mask] = 90.0 - aspect[mask]
mask = aspect > 90.0
aspect_value[mask] = 360.0 - aspect[mask] + 90.0
mask = (aspect >= 0.0) & (aspect < 90.0)
aspect_value[mask] = 90.0 - aspect[mask]
return aspect_value
[docs]def dem_distribution(slope, aspect, head_angle, inc_angle, look_dir='right',
nsamples=1000, title='', mask=None):
"""
create a polar slope-aspect DEM plot superimposed with the area visible to a SAR sensor.
Parameters
----------
slope: numpy.ndarray
aspect: numpy.ndarray
head_angle: float
the SAR sensor heading
inc_angle: float
the SAR sensor's incident angle
look_dir: str
the SAR sensor look direction; either `left` or `right`
nsamples: int
the number of samples to select from the `slope` and `aspect` arrays
using function :func:`~S1_ARD.util.sampler`
title: str
the plot's title
mask: numpy.ndarray
an additional binary array to mask the slope and aspect values
Returns
-------
See Also
--------
visible_sar_angle_map
"""
nanmask = np.isfinite(slope) & np.isfinite(aspect)
if mask is None:
mask = nanmask
else:
mask = mask & nanmask
sample_ids = sampler(mask, nsamples)
slope = slope.flatten()[sample_ids]
aspect = aspect.flatten()[sample_ids]
# # Calculate the point density
xy = np.vstack([slope, aspect])
z = gaussian_kde(xy)(xy)
# # Sort the points by density, so that the densest points are plotted last
idx = z.argsort()
x, y, z = aspect[idx], slope[idx], z[idx]
# create visible angle map mesh grid
xx, yy = np.meshgrid(np.radians(np.arange(0, 361)),
np.arange(0, 91),
sparse=False)
vis_map = visible_sar_angle_map(head_angle, inc_angle, look_dir)
# get current axis if a figure exists
ax = plt.gca() if plt.get_fignums() else None
# create a new figure if there is no axis or the projection of the current axis is not polar
if ax is None or not ax.__class__.__name__ == 'PolarAxesSubplot':
fig = plt.figure()
ax = fig.add_subplot(111, projection='polar')
# ax.set_theta_zero_location('N')
ax.set_theta_direction(-1)
c = ax.scatter(x, y, c=z, s=10, alpha=0.75)
ax.contour(xx, yy, vis_map, colors='red')
ax.set_title(title)
ax.set_ylim(0, 95)
[docs]def dem_slope(img, xres_m, yres_m):
"""
compute the slope of a DEM.
Parameters
----------
img: numpy.ndarray
the input DEM
xres_m: int or float
the x resolution of the DEM in same units as the height values
yres_m: int or float
the y resolution of the DEM in same units as the height values
Returns
-------
"""
boundary = 'extend'
kernel = CustomKernel(np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]]) / (8. * xres_m))
xchangerate_array = convolve(img, kernel, normalize_kernel=False, boundary=boundary,
nan_treatment='fill', fill_value=np.nan)
kernel = CustomKernel(np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]]) / (8. * yres_m))
ychangerate_array = convolve(img, kernel, normalize_kernel=False, boundary=boundary,
nan_treatment='fill', fill_value=np.nan)
slope_radians = np.arctan(np.sqrt(np.square(xchangerate_array) + np.square(ychangerate_array)))
slope_degrees = np.rad2deg(slope_radians)
return slope_degrees
[docs]def dem_degree2meter(demfile):
"""
compute the spatial resolution in meters for a DEM with WGS84 degree coordinates.
Parameters
----------
demfile: str
the DEM file
Returns
-------
tuple
(posting_east, posting_north)
See Also
--------
spatialist.auxil.haversine
"""
with Raster(demfile) as ras:
res_lon, res_lat = ras.res
lat = (ras.geo['ymin'] + ras.geo['ymax']) / 2
lon = (ras.geo['xmin'] + ras.geo['xmax']) / 2
post_north = haversine(lat, lon, lat + res_lat, lon)
post_east = haversine(lat, lon, lat, lon + res_lon)
return post_east, post_north
[docs]def sampler(nanmask, nsamples=None, seed=42):
"""
central function to select random samples from arrays.
Parameters
----------
nanmask: numpy.ndarray
a mask to limit the sample selection
nsamples: int
the number of samples to select
seed: int
seed used to initialize the pseudo-random number generator
Returns
-------
numpy.ndarray
the generated random samples
See Also
--------
numpy.random.seed
numpy.random.choice
"""
indices = np.where(nanmask.flatten())[0]
samplesize = min(indices.size, nsamples) if nsamples is not None else indices.size
np.random.seed(seed)
sample_ids = np.random.choice(a=indices, size=samplesize, replace=False)
return sample_ids
[docs]def visible_sar_angle_map(head_angle, inc_angle, look_dir='right'):
"""
create a SAR sensor slope-aspect visibility mask;
used by :func:`~S1_ARD.util.dem_distribution`.
Parameters
----------
head_angle: float
the SAR sensor heading
inc_angle: float
the SAR sensor's incident angle
look_dir: str
the SAR sensor look direction; either `left` or `right`
Returns
-------
numpy.ndarray
the binary map with aspect-slope coordinates
"""
# convert deg to rad and geographic heading to mathematical angle
head_ang = math.radians(90. - head_angle)
inc_ang = math.radians(inc_angle)
# flight direction vector
vel_vec = np.array([math.cos(head_ang), math.sin(head_ang), 0])
# viewing plane (look vector)
if look_dir == 'right':
look_vec = np.array([math.sin(inc_ang) * math.sin(head_ang),
-math.sin(inc_ang) * math.cos(head_ang),
-math.cos(inc_ang)])
viewplane = np.cross(look_vec, vel_vec)
elif look_dir == 'left':
look_vec = np.array([-math.sin(inc_ang) * math.sin(head_ang),
math.sin(inc_ang) * math.cos(head_ang),
-math.cos(inc_ang)])
viewplane = np.cross(vel_vec, look_vec)
else:
raise ValueError("look_dir must either be 'left' or 'right'")
# map in aspect and slope coordinates
angle, radius = np.meshgrid(np.radians(np.arange(0, 361)),
np.radians(np.arange(0, 91)),
sparse=False)
# surface normal vector
normal = np.array([np.sin(radius) * np.cos(angle),
np.sin(radius) * np.sin(angle),
np.cos(radius)])
# condition that facet is visible by radar
view_prod = normal[0, :, :] * look_vec[0] + \
normal[1, :, :] * look_vec[1] + \
normal[2, :, :] * look_vec[2]
# condition that facet is tilted positive in viewplane
mont_prod = normal[0, :, :] * viewplane[0] + \
normal[1, :, :] * viewplane[1] + \
normal[2, :, :] * viewplane[2]
# visible map_array
visible_map_array = (view_prod < 0) & (mont_prod > 0)
return visible_map_array
[docs]def wkt2shp(wkt, srs, outname):
"""
convert a well-known text string geometry to a shapefile.
Parameters
----------
wkt: str
the well-known text description
srs: int, str
the spatial reference system; see :func:`spatialist.auxil.crsConvert` for options.
outname: str
the name of the shapefile to write
Returns
-------
"""
geom = ogr.CreateGeometryFromWkt(wkt)
geom.FlattenTo2D()
srs = crsConvert(srs, 'osr')
layername = os.path.splitext(os.path.basename(outname))[0]
with Vector(driver='Memory') as bbox:
bbox.addlayer(layername, srs, geom.GetGeometryType())
bbox.addfield('area', ogr.OFTReal)
bbox.addfeature(geom, fields={'area': geom.Area()})
bbox.write(outname, format='ESRI Shapefile')
geom = None
[docs]def parallel_apply_along_axis(func1d, axis, arr, cores=4, *args, **kwargs):
"""
Like :func:`numpy.apply_along_axis()`, but takes advantage of multiple cores.
Adapted from `here <https://stackoverflow.com/questions/45526700/
easy-parallelization-of-numpy-apply-along-axis>`_.
Parameters
----------
func1d: function
the function to be applied
axis: int
the axis along which to apply `func1d`
arr: numpy.ndarray
the input array
cores: int
the number of parallel cores
args: any
Additional arguments to `func1d`.
kwargs: any
Additional named arguments to `func1d`.
Returns
-------
numpy.ndarray
"""
# Effective axis where apply_along_axis() will be applied by each
# worker (any non-zero axis number would work, so as to allow the use
# of `np.array_split()`, which is only done on axis 0):
effective_axis = 1 if axis == 0 else axis
if effective_axis != axis:
arr = arr.swapaxes(axis, effective_axis)
def unpack(arguments):
func1d, axis, arr, args, kwargs = arguments
return np.apply_along_axis(func1d, axis, arr, *args, **kwargs)
chunks = [(func1d, effective_axis, sub_arr, args, kwargs)
for sub_arr in np.array_split(arr, mp.cpu_count())]
pool = mp.Pool(cores)
individual_results = pool.map(unpack, chunks)
# Freeing the workers:
pool.close()
pool.join()
return np.concatenate(individual_results)
def inc_stack(small, gamma, snap, outdir, prefix=''):
outnames_base = ['small', 'gamma', 'snap']
outnames = [os.path.join(outdir, prefix + x) + '.tif' for x in outnames_base]
if all([os.path.isfile(x) for x in outnames]):
return outnames
# set SMALL product nodata GeoTiff value
with Raster(small)[0:100, 0:100] as ras:
if ras.nodata is None:
print('setting nodata value for SMALL product')
mat = ras.matrix()
nodata = float(mat[0, 0])
ras2 = gdal.Open(small, GA_Update)
ras2.GetRasterBand(1).SetNoDataValue(nodata)
ras2 = None
tmpdir = os.path.join(outdir, 'tmp')
if not os.path.isdir(tmpdir):
os.makedirs(tmpdir)
small_edit = os.path.join(tmpdir, os.path.basename(small).replace('.tif', '_edit.tif'))
if not os.path.isfile(small_edit):
print('reducing resolution of SMALL product')
gdal_translate(small, small_edit,
options={'xRes': 90, 'yRes': 90, 'resampleAlg': 'average',
'format': 'GTiff'})
# subtract 90 degrees from SMALL product
small_out = outnames[0]
if not os.path.isfile(small_out):
print('subtracting 90 degrees from SMALL product')
with Raster(small_edit) as ras:
mat = ras.matrix() - 90
ras.assign(mat, 0)
print('creating {}'.format(small_out))
ras.write(small_out, format='GTiff', nodata=-99)
# set GAMMA product nodata value
with Raster(gamma) as ras:
if ras.nodata != 0:
print('setting nodata value of GAMMA product')
ras2 = gdal.Open(gamma, GA_Update)
ras2.GetRasterBand(1).SetNoDataValue(0)
ras2 = None
# convert GAMMA product from radians to degrees
gamma_deg = os.path.join(tmpdir, os.path.basename(gamma).replace('.tif', '_deg.tif'))
if not os.path.isfile(gamma_deg):
print('converting GAMMA product from radians to degrees')
with Raster(gamma) as ras:
mat = np.rad2deg(ras.matrix())
ras.assign(mat, 0)
ras.write(gamma_deg, format='GTiff')
gamma = gamma_deg
# use extent of SMALL product as reference
ext = Raster(small_out).bbox().extent
# create new directory for the stacked files
if not os.path.isdir(outdir):
os.makedirs(outdir)
# warp the products to their common extent
warp_opts = {'options': ['-q'], 'format': 'GTiff', 'multithread': True,
'outputBounds': (ext['xmin'], ext['ymin'], ext['xmax'], ext['ymax']),
'dstNodata': -99, 'xRes': 90, 'yRes': 90, 'resampleAlg': 'bilinear',
'dstSRS': 'EPSG:32632'}
for i, item in enumerate([gamma, snap]):
outfile = outnames[i + 1]
if not os.path.isfile(outfile):
print('creating {}'.format(outfile))
gdalwarp(src=item, dst=outfile, options=warp_opts)
shutil.rmtree(tmpdir)
return outnames
[docs]def commonextent(*args):
"""
compute the common extent of multiple extent dictionaries.
Parameters
----------
args: dict
an extent dictionary, see e.g. :attr:`spatialist.vector.Vector.extent`
Returns
-------
dict:
the common extent
"""
ext_new = {}
for ext in args:
if len(ext_new.keys()) == 0:
ext_new = ext
else:
for key in ['xmin', 'ymin']:
if ext[key] > ext_new[key]:
ext_new[key] = ext[key]
for key in ['xmax', 'ymax']:
if ext[key] < ext_new[key]:
ext_new[key] = ext[key]
return ext_new
[docs]def clc_legend(filename):
"""
read clc meta data from the dedicated CSV file available
`here <https://www.eea.europa.eu/data-and-maps/data/
corine-land-cover-3/corine-land-cover-classes-and/clc_legend.csv>`_.
Parameters
----------
filename: str
the CSV file to be read
Returns
-------
dict
the CSV values in a dictionary
"""
pattern = r'([0-9]{1,2}),' \
r'([0-9]{3}),' \
r'([a-zA-Z ]*),' \
r'(["]*[a-zA-Z ,-/]*["]*),' \
r'(["]*[a-zA-Z ,-/]*["]*),' \
r'([0-9-]{11})\n'
with open(filename, 'r') as clc:
keys = clc.readline().strip().split(',')
out = dict(zip(keys, [[] for x in keys]))
for line in clc:
match = re.search(pattern, line)
if match:
items = match.groups()
for i in range(0, len(keys)):
key = keys[i]
if key == 'RGB':
value = tuple([int(x) / 255. for x in items[i].split('-')])
else:
value = items[i].replace('"', '')
out[key].append(value)
return out
[docs]def clc_prep(clc, reference, outname):
"""
resample and crop the corine product to the resolution and extent of a reference image.
Parameters
----------
clc: str
the name of the CLC input file
reference: str
the name of the reference file
outname: str
the named of the output image
Returns
-------
"""
with Raster(reference).bbox() as box_ras:
with Raster(clc).bbox() as box_clc:
if intersect(box_ras, box_clc) is None:
print('no intersect')
return
if not os.path.isfile(outname):
with Raster(reference) as ras:
ref_crs = ras.projection
xres, yres = ras.res
with ras.bbox() as box:
ref_ext = box.extent
outputBounds = (ref_ext['xmin'], ref_ext['ymin'], ref_ext['xmax'], ref_ext['ymax'])
gdalwarp_opt = {'format': 'GTiff', 'outputBounds': outputBounds, 'multithread': True,
'xRes': xres, 'yRes': yres, 'dstSRS': ref_crs, 'resampleAlg': 'mode'}
gdalwarp(src=clc, dst=outname, options=gdalwarp_opt)
else:
print('outfile already exists')
[docs]def clc_prepare(reference, outdir, source):
"""
create a CLC subset resampled to a reference image.
Parameters
----------
reference: str
the reference file with the target CRS and extent
outdir: str
the directory to write the new file to;
new files are named clc{index}.tif, e.g. clc1.tif.
source: str
the original product to be subsetted
Returns
-------
str
the name of the file written to `outdir`
"""
with Raster(reference) as ras:
xRes, yRes = ras.res
epsg = ras.epsg
ext = ras.extent
#########################################################################
warp_opts = {'options': ['-q'], 'format': 'GTiff', 'multithread': True,
'dstNodata': -99, 'resampleAlg': 'mode'}
if not os.path.isdir(outdir):
os.makedirs(outdir)
clc_subs = finder(outdir, ['clc[0-9].tif'], regex=True)
match = False
if len(clc_subs) > 0:
for j, sub in enumerate(clc_subs):
with Raster(sub) as ras:
if ras.extent == ext:
clc_sub = sub
match = True
if not match:
clc_sub = os.path.join(outdir, 'clc{}.tif'.format(len(clc_subs)))
print('creating', clc_sub)
warp_opts['dstSRS'] = 'EPSG:{}'.format(epsg)
warp_opts['xRes'] = xRes
warp_opts['yRes'] = yRes
warp_opts['outputBounds'] = (ext['xmin'], ext['ymin'],
ext['xmax'], ext['ymax'])
gdalwarp(src=source, dst=clc_sub, options=warp_opts)
return clc_sub
[docs]def uzh_prepare(reference, outdir, source):
"""
create an UZH incident angle subset resampled to a reference image.
Parameters
----------
reference: str
the reference file with the target extent
outdir: str
the directory to write the new file to;
new files are named uzh_{epsg}_{index}.tif, e.g. uzh_4326_1.tif.
source: str
the original product to be subsetted
Returns
-------
numpy.ndarray
the content of the file written to `outdir`
"""
with Raster(reference) as ras:
xRes, yRes = ras.res
epsg = ras.epsg
ext = ras.extent
warp_opts = {'options': ['-q'], 'format': 'GTiff', 'multithread': True,
'dstNodata': -99, 'resampleAlg': 'bilinear'}
if not os.path.isdir(outdir):
os.makedirs(outdir)
# find existing files
uzh_subs = finder(outdir, ['uzh_[0-9]{4,5}_[0-9].tif'], regex=True)
# check if any of the existing files matches the extent of the reference
match = False
if len(uzh_subs) > 0:
for j, sub in enumerate(uzh_subs):
with Raster(sub) as ras:
if ras.extent == ext:
uzh_sub = sub
match = True
if not match:
with Raster(source) as ras:
if ras.epsg != epsg:
raise RuntimeError('CRS mismatch')
basename = 'uzh_{}_{}.tif'.format(epsg, len(uzh_subs))
uzh_sub = os.path.join(outdir, basename)
print('creating', uzh_sub)
warp_opts['dstSRS'] = 'EPSG:{}'.format(epsg)
warp_opts['xRes'] = xRes
warp_opts['yRes'] = yRes
warp_opts['outputBounds'] = (ext['xmin'], ext['ymin'],
ext['xmax'], ext['ymax'])
gdalwarp(src=source, dst=uzh_sub, options=warp_opts)
return uzh_sub
[docs]def dev_max(arr):
"""
compute the maximum deviation from the median of all array
values and the corresponding ID.
Parameters
----------
arr: numpy.ndarray
the 1D array
Returns
-------
tuple
(maximum deviation, ID)
"""
if len(arr[~np.isnan(arr)]) > 0:
med = np.nanmedian(arr)
absdev = abs(arr - med)
maxdev = np.nanmax(absdev)
if maxdev > 0:
maxdev_id = np.nanargmax(absdev)
else:
maxdev_id = np.nan
return maxdev, maxdev_id
else:
return np.nan, np.nan
[docs]def extent2patch(extent, edgecolor='r'):
"""
create a matplotlib rectangle patch from an extent dictionary.
Parameters
----------
extent: dict
an extent dictionary, see e.g. :attr:`spatialist.vector.Vector.extent`
edgecolor: str
the edge color of the path
Returns
-------
matplotlib.patches.Rectangle
"""
xdiff = extent['xmax'] - extent['xmin']
ydiff = extent['ymax'] - extent['ymin']
rect = patches.Rectangle((extent['xmin'], extent['ymin']),
xdiff, ydiff, linewidth=2,
edgecolor=edgecolor, facecolor='none')
return rect