This tool was commissioned by Electric Vine Industries, an innovative solar mini grid company.
{ customer_geotable : Customers ? Remember to specify demand_in_kwh_per_day for each customer }
{ minimum_solar_pole_count_per_kwh : Minimum Solar Pole Count per kWh ? Set the minimum number of solar poles required per kWh of demand }
{ maximum_drop_line_length_in_meters : Maximum Service Drop Line Length in Meters ? Set the maximum distance of a service drop pole to a customer }
{ maximum_drop_line_count_per_pole : Maximum Service Drop Line Count per Pole ? Set the maximum number of service drop lines per pole }
{ maximum_battery_line_length_in_meters : Maximum Battery Line Length in Meters ? Set the maximum distance of a battery to a service drop pole as measured along a distribution line }
{ maximum_lamp_pole_distance_in_meters : Maximum Lamp Pole Distance in Meters ? Set the maximum distance of a lamp pole to a customer }
{ maximum_angle_for_pole_type_a : Maximum Angle for Pole Type A ? Set the maximum angle between the incoming and outgoing distribution lines for pole type A }
{ maximum_distribution_pole_interval_in_meters : Maximum Distribution Pole Interval in Meters ? Set the maximum distance between distribution poles }
# Press the Blue Button to preview this as a CrossCompute tool
customer_geotable_path = 'customers.shp.zip'
minimum_solar_pole_count_per_kwh = 5
maximum_drop_line_length_in_meters = 50
maximum_drop_line_count_per_pole = 5
maximum_battery_line_length_in_meters = 100
maximum_lamp_pole_distance_in_meters = 100
maximum_angle_for_pole_type_a = 120
maximum_distribution_pole_interval_in_meters = 75
target_folder = '/tmp'
import attr
from shapely.geometry.base import BaseGeometry
@attr.s
class GeometryMixin(object):
id = attr.ib(default=0)
geometry = attr.ib(default=attr.Factory(BaseGeometry))
attributes = attr.ib(default=attr.Factory(dict))
class PointMixin(GeometryMixin):
@property
def xy(self):
return self.geometry.x, self.geometry.y
class Customer(GeometryMixin):
demand_in_kwh_per_day = 0
class Battery(PointMixin):
type_id = None
drop_poles = None
demand_in_kwh_per_day = 0
class Line(GeometryMixin):
pole1_id = None
pole2_id = None
length = 0
class Pole(PointMixin):
type_id = None
customers = None
has_solar_panel = False
has_street_lamp = False
has_corner_angle = False
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection
def get_geometries(instances):
return [x.geometry for x in instances]
def visualize_geometries(*instance_lists, colors=None):
return ColorfulGeometryCollection([
GeometryCollection(get_geometries(x)) for x in instance_lists
], colors)
def load_instances(
geotable_path, target_proj4, TargetClass, default_value_by_name=None):
geometries, field_packs, field_definitions = geometryIO.load(
geotable_path, targetProj4=target_proj4)[1:]
instances = []
for index, (geometry, field_pack) in enumerate(zip(geometries, field_packs)):
d = {}
for field_value, (field_name, field_type) in zip(field_pack, field_definitions):
d[field_name] = field_value
instance = TargetClass(
id=d.pop('id', index),
geometry=geometry,
attributes=d)
for k, v in (default_value_by_name or {}).items():
setattr(instance, k, d.pop(k, v))
instances.append(instance)
return instances
import utm
from shapely.geometry import GeometryCollection
def get_utm_zone_from_lonlat_geometries(lonlat_geometries):
lonlat_point = GeometryCollection(lonlat_geometries).centroid
longitude = lonlat_point.x
latitude = lonlat_point.y
x, y, zone_number, zone_letter = utm.from_latlon(
latitude, longitude)
return zone_number, zone_letter
def get_utm_proj4(zone_number, zone_letter):
parts = []
parts.extend([
'+proj=utm',
'+zone=%s' % zone_number,
])
if zone_letter.upper() < 'N':
parts.append('+south')
parts.extend([
'+ellps=WGS84',
'+datum=WGS84',
'+units=m',
'+no_defs',
])
return ' '.join(parts)
import geometryIO
from geotable import ColorfulGeometryCollection
def load_customers(geotable_path, target_proj4, demand_in_kwh_per_day):
return load_instances(geotable_path, target_proj4, Customer, {
'demand_in_kwh_per_day': demand_in_kwh_per_day,
})
lonlat_geometries = geometryIO.load(
customer_geotable_path, targetProj4=geometryIO.proj4LL)[1]
zone_number, zone_letter = get_utm_zone_from_lonlat_geometries(
lonlat_geometries)
utm_proj4 = get_utm_proj4(zone_number, zone_letter)
customers = load_customers(customer_geotable_path, utm_proj4, 100)
visualize_geometries(customers, colors=['yellow'])
from shapely.geometry import LineString
from shapely.ops import polygonize, unary_union
def get_disjoint_polygons(overlapping_polygons):
'Split overlapping polygons into disjoint polygons'
rings = [LineString(list(
x.exterior.coords)) for x in overlapping_polygons]
return list(polygonize(unary_union(rings)))
from copy import copy
def get_source_polygons_with_connections(target_instances, maximum_distance):
target_geometries = get_geometries(target_instances)
# Convert target_geometries into target_polygons using maximum_distance
target_polygons = [x.buffer(maximum_distance) for x in target_geometries]
# Identify overlapping areas
sliced_polygons = get_disjoint_polygons(target_polygons)
for polygon in sliced_polygons:
candidates = []
for target_instance, target_polygon in zip(target_instances, target_polygons):
if target_polygon.contains(polygon.centroid):
candidates.append(target_instance)
polygon.candidates = candidates
# Sort overlapping areas by overlap count
sorted_polygons = sorted(sliced_polygons, key=lambda x: -len(x.candidates))
# Assign target_polygons to each sorted_polygon
target_instances = copy(target_instances)
social_polygons, lonely_polygons = [], []
for polygon in sorted_polygons:
connections = []
for target_instance in polygon.candidates:
try:
target_instances.remove(target_instance)
except ValueError:
continue
connections.append(target_instance)
connection_count = len(connections)
if connection_count > 1:
social_polygons.append(polygon)
elif connection_count == 1:
lonely_polygons.append(polygon)
polygon.connections = connections
return social_polygons + lonely_polygons
from math import ceil
def estimate_drop_pole_count(
drop_pole_polygon, maximum_drop_line_count_per_pole):
connection_count = len(drop_pole_polygon.connections)
if not connection_count:
return 1
return ceil(connection_count / float(
maximum_drop_line_count_per_pole))
import numpy as np
from collections import defaultdict
from shapely.geometry import Point
from sklearn.cluster import KMeans
def get_instance_clusters(instances, cluster_count):
geometries = get_geometries(instances)
points = [x.centroid for x in geometries]
xys = np.array([(point.x, point.y) for point in points])
kmeans = KMeans(n_clusters=cluster_count).fit(xys)
instances_by_label = defaultdict(list)
for instance, label in zip(instances, kmeans.labels_):
instances_by_label[label].append(instance)
return list(instances_by_label.values())
from scipy.optimize import minimize
def place_store(geometries):
'Return the point that minimizes the sum of distances to each geometry'
def sum_distances(xy):
return sum(Point(xy).distance(g) for g in geometries)
xy = geometries[0].centroid.coords[0]
return Point(minimize(sum_distances, xy, method='L-BFGS-B').x)
def make_drop_pole(customers):
pole_point = place_store(get_geometries(customers))
pole = Pole(geometry=pole_point)
pole.customers = customers
return pole
def get_drop_poles(
customers,
maximum_drop_line_length_in_meters,
maximum_drop_line_count_per_pole):
drop_pole_polygons = get_source_polygons_with_connections(
customers, maximum_drop_line_length_in_meters)
drop_poles = []
for polygon in drop_pole_polygons:
connection_count = len(polygon.connections)
pole_count = estimate_drop_pole_count(
polygon, maximum_drop_line_count_per_pole)
if pole_count > 1:
clusters = get_instance_clusters(
polygon.connections, cluster_count=pole_count)
for customers in clusters:
drop_poles.append(make_drop_pole(customers))
else:
drop_poles.append(make_drop_pole(polygon.connections))
for index, pole in enumerate(drop_poles):
pole.id = index
for customer in pole.customers:
customer.pole_id = pole.id
customer.drop_line_length = Point(
pole.xy).distance(customer.geometry.centroid)
return drop_poles
drop_poles = get_drop_poles(
customers,
maximum_drop_line_length_in_meters,
maximum_drop_line_count_per_pole)
visualize_geometries(drop_poles, customers, colors=['red', 'yellow'])
import networkx as nx
from itertools import combinations
from scipy.spatial.distance import euclidean as get_distance
from shapely.geometry import LineString
def get_distribution_lines(drop_poles):
drop_pole_points = get_geometries(drop_poles)
graph = nx.Graph()
for pole1, pole2 in combinations(drop_poles, 2):
distance = get_distance(pole1.geometry, pole2.geometry)
graph.add_edge(pole1.id, pole2.id, weight=distance)
tree = nx.minimum_spanning_tree(graph)
distribution_lines = []
pole_by_id = {x.id: x for x in drop_poles}
for pole1_id, pole2_id in tree.edges():
pole1_coordinates = pole_by_id[pole1_id].geometry.coords[0]
pole2_coordinates = pole_by_id[pole2_id].geometry.coords[0]
line_coordinates = pole1_coordinates, pole2_coordinates
l = LineString(line_coordinates)
line = Line(
id='%s-%s' % (pole1_id, pole2_id),
geometry=l)
line.pole1_id = pole1_id
line.pole2_id = pole2_id
line.length = l.length
distribution_lines.append(line)
return distribution_lines
distribution_lines = get_distribution_lines(drop_poles)
visualize_geometries(
distribution_lines, drop_poles, colors=['orange', 'red'])
from shapely.geometry import LineString, MultiPoint, Point
def get_distribution_poles(
distribution_lines,
maximum_distribution_pole_interval_in_meters):
distribution_poles = []
for distribution_line in distribution_lines:
line = distribution_line.geometry
line_length_in_meters = line.length
distribution_pole_count = int(line_length_in_meters / float(
maximum_distribution_pole_interval_in_meters))
segment_count = distribution_pole_count + 1
adjusted_interval_in_meters = line_length_in_meters / float(
segment_count)
def make_pole(index, point):
return Pole(
id='%s-%s-%s' % (
distribution_line.pole1_id,
distribution_line.pole2_id, index),
geometry=point)
for segment_index in range(1, segment_count):
distribution_poles.append(make_pole(
segment_index,
line.interpolate(
adjusted_interval_in_meters * segment_index)))
return distribution_poles
distribution_poles = get_distribution_poles(
distribution_lines,
maximum_distribution_pole_interval_in_meters)
visualize_geometries(
distribution_poles, drop_poles, colors=['orange', 'red'])
def get_batteries(drop_poles, maximum_battery_line_length_in_meters):
battery_polygons = get_source_polygons_with_connections(
drop_poles, maximum_battery_line_length_in_meters)
batteries = []
for index, battery_polygon in enumerate(battery_polygons):
connections = battery_polygon.connections
battery_point = place_store(get_geometries(connections))
battery = Battery(id=index, geometry=battery_point)
battery.drop_poles = connections
batteries.append(battery)
return batteries
batteries = get_batteries(
drop_poles, maximum_battery_line_length_in_meters)
visualize_geometries(
distribution_lines, drop_poles, batteries, colors=[
'orange', 'red', 'blue'])
poles = drop_poles + distribution_poles
import numpy as np
def get_best_pole(
customer_tree, poles, maximum_lamp_pole_distance_in_meters):
best_metric = np.inf, np.inf
best_pole = None
for pole in poles:
metric = compute_lamp_pole_metric(
customer_tree, pole,
maximum_lamp_pole_distance_in_meters)
if metric < best_metric:
best_metric = metric
best_pole = pole
return best_pole
def compute_lamp_pole_metric(
customer_tree, pole, maximum_lamp_pole_distance_in_meters):
customer_count = len(customer_tree.data)
pole_point = pole.geometry
pole_xy = pole_point.x, pole_point.y
distances, indices = customer_tree.query(
pole_xy,
k=customer_count,
distance_upper_bound=maximum_lamp_pole_distance_in_meters)
if not hasattr(distances, '__iter__'):
distances = [distances]
indices = [indices]
distances = [x for x in distances if x < np.inf]
indices = [x for x in indices if x != customer_count]
return -len(indices), sum(distances)
from copy import copy
from scipy.spatial import KDTree
def get_lamp_poles(
customers, poles, maximum_lamp_pole_distance_in_meters):
remaining_customers = copy(customers)
remaining_poles = copy(poles)
lamp_poles = []
while remaining_customers:
customer_tree = KDTree([(_.x, _.y) for _ in get_geometries(
remaining_customers)])
pole = get_best_pole(
customer_tree,
remaining_poles,
maximum_lamp_pole_distance_in_meters)
lamp_poles.append(pole)
# Remove lamp pole from remaining_poles
remaining_poles.remove(pole)
# Remove satisfied customers from remaining_customers
pole_point = pole.geometry
pole_xy = pole_point.x, pole_point.y
indices = customer_tree.query_ball_point(
pole_xy,
maximum_lamp_pole_distance_in_meters)
for customer in np.array(remaining_customers)[indices]:
remaining_customers.remove(customer)
for pole in lamp_poles:
pole.has_street_lamp = True
return lamp_poles
lamp_poles = get_lamp_poles(
customers, poles, maximum_lamp_pole_distance_in_meters)
visualize_geometries(
customers, lamp_poles, colors=['yellow', 'cyan'])
# Estimate the demand in kWh/day for each battery
for battery in batteries:
demand_in_kwh_per_day = 0
for pole in battery.drop_poles:
for customer in pole.customers:
demand_in_kwh_per_day += customer.demand_in_kwh_per_day
battery.demand_in_kwh_per_day = demand_in_kwh_per_day
# Estimate the number of solar panels needed
# Place a solar panel on poles closest to a battery
import networkx as nx
from networkx.algorithms.shortest_paths import shortest_path_length
from scipy.spatial import KDTree
distribution_graph = nx.Graph()
for x in distribution_lines:
distribution_graph.add_edge(x.pole1_id, x.pole2_id)
pole_by_id = {x.id: x for x in poles}
def get_nearest_poles_on_distribution_graph(pole, k):
packs = []
for pole_id in distribution_graph:
path_length = shortest_path_length(
distribution_graph, pole_id, pole.id)
packs.append((path_length, pole_id))
sorted_packs = sorted(packs)
selected_packs = sorted_packs[:k]
return [
pole_by_id[pole_id] for path_length, pole_id in selected_packs]
get_nearest_poles_on_distribution_graph(poles[0], 2)
solar_poles = []
pole_tree = KDTree([_.xy for _ in poles])
for battery in batteries:
solar_pole_count = \
minimum_solar_pole_count_per_kwh * \
battery.demand_in_kwh_per_day
distance, index = pole_tree.query(battery.xy)
battery_pole = poles[index]
nearest_poles = get_nearest_poles_on_distribution_graph(
battery_pole, solar_pole_count)
for pole in nearest_poles:
pole.has_solar_panel = True
if pole not in solar_poles:
solar_poles.append(pole)
battery.solar_poles = nearest_poles
visualize_geometries(
distribution_lines, batteries, solar_poles, colors=[
'orange', 'red', 'cyan'])
import numpy as np
def compute_angle(a_xy, b_xy, c_xy):
# https://stackoverflow.com/a/31735642/192092
b_xy = np.array(b_xy)
angle1 = np.arctan2(*(a_xy - b_xy)[::-1])
angle2 = np.arctan2(*(c_xy - b_xy)[::-1])
angle_in_degrees = np.rad2deg(angle1 - angle2)
if angle_in_degrees < 0:
angle_in_degrees *= -1
if angle_in_degrees > 180:
angle_in_degrees = 360 - angle_in_degrees
return angle_in_degrees
def find_pole_lines(distribution_lines, pole):
pole_lines = []
for distribution_line in distribution_lines:
pole1_id = distribution_line.pole1_id
pole2_id = distribution_line.pole2_id
if pole.id in (pole1_id, pole2_id):
pole_lines.append(distribution_line)
return pole_lines
def get_pole_angle_points(pole_lines):
l1 = pole_lines[0].geometry
l2 = pole_lines[1].geometry
b_point = l1.intersection(l2)
b_xy = b_point.x, b_point.y
a_xy = get_other_endpoint_xy(l1, b_xy)
c_xy = get_other_endpoint_xy(l2, b_xy)
return a_xy, b_xy, c_xy
def get_other_endpoint_xy(line, endpoint_xy):
endpoint_xys = list(line.coords)
endpoint_xys.remove(endpoint_xy)
return endpoint_xys[0]
for pole in poles:
pole_lines = find_pole_lines(distribution_lines, pole)
if len(pole_lines) < 2:
continue
angle = compute_angle(*get_pole_angle_points(pole_lines))
if angle < maximum_angle_for_pole_type_a:
pole.has_corner_angle = True
for pole in poles:
if pole.has_corner_angle or pole.has_solar_panel:
pole.type_id = 'A'
else:
pole.type_id = 'B'
import utm
from os.path import join
from pandas import DataFrame
def get_lonlat(xy):
return utm.to_latlon(*xy, zone_number, zone_letter)[::-1]
target_path = join(target_folder, 'poles.csv')
rows = []
for pole in poles:
longitude, latitude = get_lonlat(pole.xy)
rows.append([
pole.id,
pole.type_id,
len(pole.customers or []),
pole.has_solar_panel,
pole.has_street_lamp,
pole.has_corner_angle,
longitude,
latitude,
])
DataFrame(rows, columns=[
'pole_id',
'type_id',
'customer_count',
'has_solar_panel',
'has_street_lamp',
'has_corner_angle',
'longitude',
'latitude',
]).to_csv(target_path, index=False)
print('pole_table_path = %s' % target_path)
target_path = join(target_folder, 'batteries.csv')
rows = []
for battery in batteries:
longitude, latitude = get_lonlat(battery.xy)
rows.append([
battery.id,
battery.demand_in_kwh_per_day,
len(battery.solar_poles),
longitude,
latitude,
])
DataFrame(rows, columns=[
'battery_id',
'demand_in_kwh_per_day',
'solar_panel_count',
'longitude',
'latitude',
]).to_csv(target_path, index=False)
print('battery_table_path = %s' % target_path)
target_path = join(target_folder, 'customers.csv')
rows = []
for customer in customers:
point = customer.geometry.centroid
customer_xy = point.x, point.y
longitude, latitude = get_lonlat(customer_xy)
rows.append([
customer.id,
customer.demand_in_kwh_per_day,
customer.pole_id,
customer.drop_line_length,
longitude,
latitude,
])
DataFrame(rows, columns=[
'customer_id',
'demand_in_kwh_per_day',
'pole_id',
'drop_line_length',
'longitude',
'latitude',
]).to_csv(target_path, index=False)
print('customer_table_path = %s' % target_path)
target_path = join(target_folder, 'lines.csv')
rows = []
for distribution_line in distribution_lines:
rows.append([
distribution_line.pole1_id,
distribution_line.pole2_id,
distribution_line.length,
])
DataFrame(rows, columns=[
'pole1_id',
'pole2_id',
'length',
]).to_csv(target_path, index=False)
print('line_table_path = %s' % target_path)
target_path = join(target_folder, 'map.csv')
rows = []
for distribution_line in distribution_lines:
l = distribution_line.geometry
rows.append([
'Distribution Line %s' % distribution_line.id,
LineString([
get_lonlat(l.coords[0]),
get_lonlat(l.coords[1])]).wkt,
])
for customer in customers:
p = customer.geometry.centroid
customer_xy = p.x, p.y
rows.append([
'Customer %s' % customer.id,
Point(get_lonlat(customer_xy)).wkt,
])
DataFrame(rows, columns=[
'Description',
'WKT',
]).to_csv(target_path, index=False)
print('map_geotable_path = %s' % target_path)
{ map_geotable : Distribution Lines and Customers }
{ customer_table : Customers }
{ pole_table : Poles }
{ line_table : Distribution Lines }
{ battery_table : Batteries }