Inspire a community of professionals skilled in spatiotemporal analysis for community health and safety.
Assemble 1st implementation of an algorithm for prototyping solar mini grid configurations according to specifications provided by Electric Vine Industries.
Roy Hyunjin Han
Automating mini grid site configuration will make it possible to automate mini grid site selection.
20171115-1415 - 20171122-1415: 1 week estimated
20171115-1415 - 20171129-1415: 2 weeks actual
+ Draft 1st implementation
+ Deploy 1st implementation
Press SHIFT-ENTER to run each cell.
20171115-1415 - 20171115-1445: 30 minutes
import attr
from shapely.geometry.base import BaseGeometry
@attr.s
class GeometryMixin(object):
id = attr.ib(default=0)
geometry = attr.ib(default=attr.Factory(BaseGeometry))
attributes = attr.ib(default=attr.Factory(dict))
class PointMixin(GeometryMixin):
@property
def xy(self):
return self.geometry.x, self.geometry.y
class Customer(GeometryMixin):
demand_in_kwh_per_day = 0
class Road(GeometryMixin):
pass
class Obstacle(GeometryMixin):
pass
class Battery(PointMixin):
type_id = None
drop_poles = None
demand_in_kwh_per_day = 0
class Line(GeometryMixin):
pole1_id = None
pole2_id = None
length = 0
class Pole(PointMixin):
type_id = None
customers = None
has_solar_panel = False
has_street_lamp = False
has_corner_angle = False
has_wireless_internet = False
+ Define classes
+ Gather datasets
from os.path import exists, join
dataset_folder = '../Tools'
customer_geotable_path = join(dataset_folder, 'customers.shp.zip')
road_geotable_path = join(dataset_folder, 'roads.shp.zip')
obstacle_geotable_path = join(dataset_folder, 'obstacles.shp.zip')
for path in customer_geotable_path, road_geotable_path, obstacle_geotable_path:
assert exists(customer_geotable_path)
I really need to rewrite geometryIO one of these days.
import geometryIO
proj4, geometries, field_packs, field_definitions = geometryIO.load(
customer_geotable_path, targetProj4=geometryIO.proj4LL)
customer_geometries = geometries
field_definitions
proj4, geometries, field_packs, field_definitions = geometryIO.load(
road_geotable_path, targetProj4=geometryIO.proj4LL)
road_geometries = geometries
proj4, geometries, field_packs, field_definitions = geometryIO.load(
obstacle_geotable_path, targetProj4=geometryIO.proj4LL)
obstacle_geometries = geometries
# This visualization is probably not oriented properly
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection
ColorfulGeometryCollection([
GeometryCollection(obstacle_geometries),
GeometryCollection(road_geometries),
GeometryCollection(customer_geometries),
], colors=[
'gray',
'black',
'yellow',
])
proj4
+ Load datasets
+ Examine datasets
_ Option 1: Load everything in WGS84 (longitude, latitude) and use vincenty distance
_ Option 2: Convert everything to spherical mercator and use euclidean distance
Option 3: Convert everything to UTM and use euclidean distance
The problem is that I think most of shapely's commands assume the euclidean metric space. And I think distances are more accurate in UTM than spherical mercator (which is optimized for visualization). Distances are important here, so let's use UTM.
20171115-1530 - 20171115-1630: 60 minutes
A solar mini grid is contained in a local region, thus it is safe to use a single UTM zone number. The distances represented by UTM coordinates become skewed the farther they are from their zone.
geometries = customer_geometries + road_geometries + obstacle_geometries
geometry_collection = GeometryCollection(geometries)
centroid_lonlat = geometry_collection.centroid
centroid_latlon = centroid_lonlat.y, centroid_lonlat.x
centroid_latlon
import utm
x, y, zone_number, zone_letter = utm.from_latlon(*centroid_latlon)
print(x, y, zone_number, zone_letter)
+ Get the average of all geometries
+ Convert that average into utm
+ Set the zone number from the average utm point as the official zone number
zone_letter.upper() < 'N'
'S' < 'N'
def get_utm_proj4(zone_number, zone_letter):
parts = []
parts.extend([
'+proj=utm',
'+zone=%s' % zone_number,
])
if zone_letter.upper() < 'N':
parts.append('+south')
parts.extend([
'+ellps=WGS84',
'+datum=WGS84',
'+units=m',
'+no_defs',
])
return ' '.join(parts)
target_proj4 = get_utm_proj4(zone_number, zone_letter)
target_proj4
import geometryIO
proj4, geometries, field_packs, field_definitions = geometryIO.load(
customer_geotable_path, targetProj4=target_proj4)
customer_geometries = geometries
proj4, geometries, field_packs, field_definitions = geometryIO.load(
road_geotable_path, targetProj4=target_proj4)
road_geometries = geometries
proj4, geometries, field_packs, field_definitions = geometryIO.load(
obstacle_geotable_path, targetProj4=target_proj4)
obstacle_geometries = geometries
+ Get proj4 in utm
+ Convert coordinates to utm
str(customer_geometries[0])
# This visualization is probably not oriented properly
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection
ColorfulGeometryCollection([
GeometryCollection(obstacle_geometries),
GeometryCollection(road_geometries),
GeometryCollection(customer_geometries),
], colors=[
'gray',
'black',
'yellow',
])
import utm
from shapely.geometry import GeometryCollection
def get_utm_proj4_from_lonlat_geometries(lonlat_geometries):
lonlat_point = GeometryCollection(lonlat_geometries).centroid
longitude = lonlat_point.x
latitude = lonlat_point.y
return get_utm_proj4_from_latlon(latitude, longitude)
def get_utm_proj4_from_latlon(latitude, longitude):
x, y, zone_number, zone_letter = utm.from_latlon(latitude, longitude)
return get_utm_proj4(zone_number, zone_letter)
def get_utm_proj4(zone_number, zone_letter):
parts = []
parts.extend([
'+proj=utm',
'+zone=%s' % zone_number,
])
if zone_letter.upper() < 'N':
parts.append('+south')
parts.extend([
'+ellps=WGS84',
'+datum=WGS84',
'+units=m',
'+no_defs',
])
return ' '.join(parts)
def load_instances(geotable_path, target_proj4, TargetClass, default_value_by_name=None):
geometries, field_packs, field_definitions = geometryIO.load(
geotable_path, targetProj4=target_proj4)[1:]
instances = []
for index, (geometry, field_pack) in enumerate(zip(geometries, field_packs)):
d = {}
for field_value, (field_name, field_type) in zip(field_pack, field_definitions):
d[field_name] = field_value
instance = TargetClass(
id=d.pop('id', index),
geometry=geometry,
attributes=d)
for k, v in (default_value_by_name or {}).items():
setattr(instance, k, d.pop(k, v))
instances.append(instance)
return instances
import geometryIO
def load_customers(geotable_path, target_proj4, demand_in_kwh_per_day):
return load_instances(geotable_path, target_proj4, Customer, {
'demand_in_kwh_per_day': demand_in_kwh_per_day,
})
utm_proj4 = get_utm_proj4_from_lonlat_geometries(geometryIO.load(
customer_geotable_path, targetProj4=geometryIO.proj4LL)[1])
customers = load_customers(customer_geotable_path, utm_proj4, 100)
customers[0]
def load_roads(geotable_path, target_proj4):
return load_instances(geotable_path, target_proj4, Road)
roads = load_roads(road_geotable_path, utm_proj4)
def load_obstacles(geotable_path, target_proj4):
return load_instances(geotable_path, target_proj4, Obstacle)
obstacles = load_obstacles(obstacle_geotable_path, target_proj4)
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection
def get_geometries(instances):
return [x.geometry for x in instances]
def visualize_geometries(*instance_lists, colors=None):
return ColorfulGeometryCollection([
GeometryCollection(get_geometries(x)) for x in instance_lists
], colors)
visualize_geometries(obstacles, roads, customers, colors=[
'gray', 'black', 'yellow'])
from copy import copy
from shapely.geometry import LineString
from shapely.ops import polygonize, unary_union
def get_source_polygons(target_geometries, maximum_distance):
# Convert target_geometries into target_polygons using maximum_distance
target_polygons = [x.buffer(maximum_distance) for x in target_geometries]
# Identify overlapping areas
sliced_polygons = get_disjoint_polygons(target_polygons)
for polygon in sliced_polygons:
candidate_polygons = []
for target_polygon in target_polygons:
if target_polygon.contains(polygon.centroid):
candidate_polygons.append(target_polygon)
polygon.candidate_polygons = candidate_polygons
# Sort overlapping areas by overlap count
sorted_polygons = sorted(
sliced_polygons, key=lambda x: -len(x.candidate_polygons))
# Assign target_polygons to each sorted_polygon
social_polygons, lonely_polygons = [], []
for polygon in sorted_polygons:
connected_polygons = []
for candidate_polygon in polygon.candidate_polygons:
try:
target_polygons.remove(candidate_polygon)
except ValueError:
continue
connected_polygons.append(candidate_polygon)
connection_count = len(connected_polygons)
if connection_count > 1:
social_polygons.append(polygon)
elif connection_count == 1:
lonely_polygons.append(polygon)
polygon.connected_polygons = connected_polygons
return social_polygons + lonely_polygons
def get_disjoint_polygons(overlapping_polygons):
'Split overlapping polygons into disjoint polygons'
rings = [LineString(list(
x.exterior.coords)) for x in overlapping_polygons]
return list(polygonize(unary_union(rings)))
maximum_drop_line_length_in_meters = 50
maximum_drop_line_count_per_pole = 5
service_drop_pole_polygons = get_source_polygons(
customer_geometries, maximum_drop_line_length_in_meters)
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection
ColorfulGeometryCollection([
GeometryCollection(service_drop_pole_polygons),
GeometryCollection(customer_geometries),
], colors=['green', 'yellow'])
from math import ceil
def get_service_drop_pole_count(
source_polygons, maximum_connection_count_per_pole):
service_drop_pole_count = 0
for polygon in source_polygons:
connection_count = len(polygon.connected_polygons)
pole_count = ceil(connection_count / float(
maximum_connection_count_per_pole))
service_drop_pole_count += pole_count
return service_drop_pole_count
get_service_drop_pole_count(
service_drop_pole_polygons,
maximum_drop_line_count_per_pole)
len(service_drop_pole_polygons)
How can we convert service drop polygons into service drop points? I think we can use k-means.
def get_source_polygons_with_connections(target_instances, maximum_distance):
target_geometries = get_geometries(target_instances)
# Convert target_geometries into target_polygons using maximum_distance
target_polygons = [x.buffer(maximum_distance) for x in target_geometries]
# Identify overlapping areas
sliced_polygons = get_disjoint_polygons(target_polygons)
for polygon in sliced_polygons:
candidates = []
for target_instance, target_polygon in zip(target_instances, target_polygons):
if target_polygon.contains(polygon.centroid):
candidates.append(target_instance)
polygon.candidates = candidates
# Sort overlapping areas by overlap count
sorted_polygons = sorted(sliced_polygons, key=lambda x: -len(x.candidates))
# Assign target_polygons to each sorted_polygon
target_instances = copy(target_instances)
social_polygons, lonely_polygons = [], []
for polygon in sorted_polygons:
connections = []
for target_instance in polygon.candidates:
try:
target_instances.remove(target_instance)
except ValueError:
continue
connections.append(target_instance)
connection_count = len(connections)
if connection_count > 1:
social_polygons.append(polygon)
elif connection_count == 1:
lonely_polygons.append(polygon)
polygon.connections = connections
return social_polygons + lonely_polygons
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection, Point
example_customers = [
Customer(geometry=Point(-1, 0)),
Customer(geometry=Point(+1, 0)),
Customer(geometry=Point(0, +2)),
]
ColorfulGeometryCollection([
GeometryCollection(get_source_polygons_with_connections(
example_customers, 1.5)),
GeometryCollection(get_geometries(
example_customers)),
], colors=['green', 'yellow'])
maximum_drop_line_length_in_meters = 50
drop_pole_polygons = get_source_polygons_with_connections(
customers, maximum_drop_line_length_in_meters)
len(drop_pole_polygons[0].connections)
import numpy as np
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=2).fit(np.array([
[0, 0],
[1, 1],
[9, 9],
]))
kmeans.labels_
import numpy as np
from collections import defaultdict
from shapely.geometry import Point
from sklearn.cluster import KMeans
def get_instance_clusters(instances, cluster_count):
geometries = get_geometries(instances)
points = [x.centroid for x in geometries]
xys = np.array([(point.x, point.y) for point in points])
kmeans = KMeans(n_clusters=cluster_count).fit(xys)
instances_by_label = defaultdict(list)
for instance, label in zip(instances, kmeans.labels_):
instances_by_label[label].append(instance)
return list(instances_by_label.values())
get_instance_clusters([
Customer(id=1, geometry=Point(0, 0)),
Customer(id=2, geometry=Point(1, 1)),
Customer(id=3, geometry=Point(9, 9)),
], 2)
def estimate_drop_pole_count(
drop_pole_polygon, maximum_drop_line_count_per_pole):
connection_count = len(drop_pole_polygon.connections)
if not connection_count:
return 1
return ceil(connection_count / float(
maximum_drop_line_count_per_pole))
estimate_drop_pole_count(
drop_pole_polygons[0], maximum_drop_line_count_per_pole)
_ Nelder-Mead
_ Powell
_ CG
_ BFGS
_ Newton-CG
L-BFGS-B
_ TNC
_ COBYLA
_ SLSQP
_ dogleg
_ trust-ncg
_ trust-exact
_ trust-krylov
+ Choose an optimization algorithm
from scipy.optimize import minimize
f = lambda x: x ** 2
minimize(f, 10, method='L-BFGS-B').x[0]
from shapely.geometry import Point
Point(0, 0).coords[0]
x = Point(0, 0)
x.distance(Point(1, 1))
from shapely.geometry import Point
points = [
Point(-1, 0),
Point(0, +2),
Point(+1, 0),
]
def sum_distances(xy):
return sum(Point(xy).distance(p) for p in points)
sum_distances((0, 0))
minimize(sum_distances, points[0], method='L-BFGS-B').x
from scipy.optimize import minimize
def place_store(geometries):
'Return the point that minimizes the sum of distances to each geometry'
def sum_distances(xy):
return sum(Point(xy).distance(g) for g in geometries)
xy = geometries[0].centroid.coords[0]
return Point(minimize(sum_distances, xy, method='L-BFGS-B').x)
place_store(points).coords[0]
def make_drop_pole(customers):
pole_point = place_store(get_geometries(customers))
pole = Pole(geometry=pole_point)
pole.customers = customers
return pole
def get_drop_poles(
customers,
maximum_drop_line_length_in_meters,
maximum_drop_line_count_per_pole):
drop_pole_polygons = get_source_polygons_with_connections(
customers, maximum_drop_line_length_in_meters)
drop_poles = []
for polygon in drop_pole_polygons:
connection_count = len(polygon.connections)
pole_count = estimate_drop_pole_count(
polygon, maximum_drop_line_count_per_pole)
if pole_count > 1:
clusters = get_instance_clusters(
polygon.connections, cluster_count=pole_count)
for customers in clusters:
drop_poles.append(make_drop_pole(customers))
else:
drop_poles.append(make_drop_pole(polygon.connections))
for index, pole in enumerate(drop_poles):
pole.id = index
for customer in pole.customers:
customer.pole_id = pole.id
customer.drop_line_length = Point(
pole.xy).distance(customer.geometry.centroid)
return drop_poles
drop_poles = get_drop_poles(
customers,
maximum_drop_line_length_in_meters,
maximum_drop_line_count_per_pole)
visualize_geometries(drop_poles, customers, colors=['red', 'yellow'])
drop_poles[:3]
len(drop_poles[0].customers)
import networkx as nx
from itertools import combinations
from scipy.spatial.distance import euclidean as get_distance
drop_pole_points = get_geometries(drop_poles)
graph = nx.Graph()
for p1, p2 in combinations(drop_pole_points, 2):
distance = get_distance(p1, p2)
graph.add_edge(
tuple(p1.coords[0]),
tuple(p2.coords[0]), weight=distance)
tree = nx.minimum_spanning_tree(graph)
edges = tree.edges()
len(edges)
list(edges)[0]
drop_poles[0].geometry.coords[0]
import networkx as nx
from itertools import combinations
from scipy.spatial.distance import euclidean as get_distance
from shapely.geometry import LineString
def get_distribution_lines(drop_poles):
drop_pole_points = get_geometries(drop_poles)
graph = nx.Graph()
for pole1, pole2 in combinations(drop_poles, 2):
distance = get_distance(pole1.geometry, pole2.geometry)
graph.add_edge(pole1.id, pole2.id, weight=distance)
tree = nx.minimum_spanning_tree(graph)
distribution_lines = []
pole_by_id = {x.id: x for x in drop_poles}
for pole1_id, pole2_id in tree.edges():
pole1_coordinates = pole_by_id[pole1_id].geometry.coords[0]
pole2_coordinates = pole_by_id[pole2_id].geometry.coords[0]
line_coordinates = pole1_coordinates, pole2_coordinates
l = LineString(line_coordinates)
line = Line(
id='%s-%s' % (pole1_id, pole2_id),
geometry=l)
line.pole1_id = pole1_id
line.pole2_id = pole2_id
line.length = l.length
distribution_lines.append(line)
return distribution_lines
distribution_lines = get_distribution_lines(drop_poles)
visualize_geometries(
distribution_lines, drop_poles, colors=['orange', 'red'])
print(list(distribution_lines[0].geometry.coords))
print(list(distribution_lines[0].geometry.coords[-1]))
line = LineString([(0, 0), (1, 1)])
line.coords[-1]
line.interpolate(line.length).coords[-1]
from shapely.geometry import LineString, MultiPoint, Point
def get_distribution_poles(
distribution_lines, distribution_pole_interval_in_meters):
distribution_poles = []
for distribution_line in distribution_lines:
line = distribution_line.geometry
line_length_in_meters = line.length
distribution_pole_count = int(
line_length_in_meters / distribution_pole_interval_in_meters)
segment_count = distribution_pole_count + 1
adjusted_interval_in_meters = line_length_in_meters / float(
segment_count)
def make_pole(index, point):
return Pole(
id='%s-%s-%s' % (
distribution_line.pole1_id,
distribution_line.pole2_id, index),
geometry=point)
for segment_index in range(1, segment_count):
distribution_poles.append(make_pole(
segment_index,
line.interpolate(
adjusted_interval_in_meters * segment_index)))
return distribution_poles
distribution_poles = get_distribution_poles(distribution_lines, 75)
visualize_geometries(
distribution_poles, drop_poles, colors=['orange', 'red'])
20171116-0000 - 20171116-0100: 60 minutes
+ Sort pieces by overlap
+ Write small assertion test for the bug pinpointed by Deril
+ Replace centroid with scipy.optimize.minimize
def get_batteries(drop_poles, maximum_battery_line_length_in_meters):
battery_polygons = get_source_polygons_with_connections(
drop_poles, maximum_battery_line_length_in_meters)
batteries = []
for index, battery_polygon in enumerate(battery_polygons):
connections = battery_polygon.connections
battery_point = place_store(get_geometries(connections))
battery = Battery(id=index, geometry=battery_point)
battery.drop_poles = connections
batteries.append(battery)
return batteries
batteries = get_batteries(
drop_poles, maximum_battery_line_length_in_meters=100)
visualize_geometries(
distribution_lines, drop_poles, batteries, colors=[
'orange', 'red', 'blue'])
20171117-1815 - 20171117-1845: 30 minutes
_ Project each battery to the nearest distribution line
maximum_lamp_pole_distance_in_meters = 200
poles = drop_poles + distribution_poles
import numpy as np
def get_best_pole(
customer_tree, poles, maximum_lamp_pole_distance_in_meters):
best_metric = np.inf, np.inf
best_pole = None
for pole in poles:
metric = compute_lamp_pole_metric(customer_tree, pole)
if metric < best_metric:
best_metric = metric
best_pole = pole
return best_pole
def compute_lamp_pole_metric(customer_tree, pole):
customer_count = len(customer_tree.data)
pole_point = pole.geometry
pole_xy = pole_point.x, pole_point.y
distances, indices = customer_tree.query(
pole_xy,
k=customer_count,
distance_upper_bound=maximum_lamp_pole_distance_in_meters)
if not hasattr(distances, '__iter__'):
distances = [distances]
indices = [indices]
distances = [x for x in distances if x < np.inf]
indices = [x for x in indices if x != customer_count]
return -len(indices), sum(distances)
from copy import copy
from scipy.spatial import KDTree
def get_lamp_poles(customers, poles):
remaining_customers = copy(customers)
remaining_poles = copy(poles)
lamp_poles = []
while remaining_customers:
customer_tree = KDTree([(_.x, _.y) for _ in get_geometries(
remaining_customers)])
pole = get_best_pole(
customer_tree,
remaining_poles,
maximum_lamp_pole_distance_in_meters)
lamp_poles.append(pole)
# Remove lamp pole from remaining_poles
remaining_poles.remove(pole)
# Remove satisfied customers from remaining_customers
pole_point = pole.geometry
pole_xy = pole_point.x, pole_point.y
indices = customer_tree.query_ball_point(
pole_xy,
maximum_lamp_pole_distance_in_meters)
for customer in np.array(remaining_customers)[indices]:
remaining_customers.remove(customer)
for pole in lamp_poles:
pole.has_street_lamp = True
return lamp_poles
get_lamp_poles(customers, poles)
# Estimate the demand in kWh/day for each battery
for battery in batteries:
demand_in_kwh_per_day = 0
for pole in battery.drop_poles:
for customer in pole.customers:
demand_in_kwh_per_day += customer.demand_in_kwh_per_day
battery.demand_in_kwh_per_day = demand_in_kwh_per_day
batteries[0].demand_in_kwh_per_day
minimum_solar_pole_count_per_kwh = 5
distribution_lines[0].__dict__
# Estimate the number of solar panels needed
# Place a solar panel on poles closest to a battery
import networkx as nx
from networkx.algorithms.shortest_paths import shortest_path_length
from scipy.spatial import KDTree
distribution_graph = nx.Graph()
for x in distribution_lines:
distribution_graph.add_edge(x.pole1_id, x.pole2_id)
pole_by_id = {x.id: x for x in poles}
def get_nearest_poles_on_distribution_graph(pole, k):
packs = []
for pole_id in distribution_graph:
path_length = shortest_path_length(
distribution_graph, pole_id, pole.id)
packs.append((path_length, pole_id))
sorted_packs = sorted(packs)
selected_packs = sorted_packs[:k]
return [
pole_by_id[pole_id] for path_length, pole_id in selected_packs]
get_nearest_poles_on_distribution_graph(poles[0], 2)
solar_poles = []
pole_tree = KDTree([_.xy for _ in poles])
for battery in batteries:
solar_pole_count = \
minimum_solar_pole_count_per_kwh * \
battery.demand_in_kwh_per_day
distance, index = pole_tree.query(battery.xy)
battery_pole = poles[index]
nearest_poles = get_nearest_poles_on_distribution_graph(
battery_pole, solar_pole_count)
for pole in nearest_poles:
pole.has_solar_panel = True
if pole not in solar_poles:
solar_poles.append(pole)
battery.solar_poles = nearest_poles
len(solar_poles)
# !!!
expected_solar_pole_count = sum(
len(x.solar_poles) for x in batteries)
expected_solar_pole_count
import numpy as np
def compute_angle(a_xy, b_xy, c_xy):
# https://stackoverflow.com/a/31735642/192092
b_xy = np.array(b_xy)
angle1 = np.arctan2(*(a_xy - b_xy)[::-1])
angle2 = np.arctan2(*(c_xy - b_xy)[::-1])
angle_in_degrees = np.rad2deg(angle1 - angle2)
if angle_in_degrees < 0:
angle_in_degrees *= -1
if angle_in_degrees > 180:
angle_in_degrees = 360 - angle_in_degrees
return angle_in_degrees
print(compute_angle((2, 0), (0, 0), (-1, 1)))
print(compute_angle((2, 0), (0, 0), (1, 1)))
print(compute_angle((3, 1), (1, 1), (0, 2)))
pole = poles[0]
# Find distribution lines corresponding to pole
pole_lines = []
for distribution_line in distribution_lines:
pole1_id = distribution_line.pole1_id
pole2_id = distribution_line.pole2_id
if pole.id in (pole1_id, pole2_id):
pole_lines.append(distribution_line)
pole_lines
# Compute angle
l1 = pole_lines[0].geometry
l2 = pole_lines[1].geometry
b_point = l1.intersection(l2)
b_xy = b_point.x, b_point.y
b_xy
def get_other_endpoint_xy(line, endpoint_xy):
endpoint_xys = list(line.coords)
endpoint_xys.remove(endpoint_xy)
return endpoint_xys[0]
a_xy = get_other_endpoint_xy(l1, b_xy)
c_xy = get_other_endpoint_xy(l2, b_xy)
print(a_xy)
print(b_xy)
print(c_xy)
l1_xys = list(l1.coords)
l1_xys.remove(b_xy)
a_xy = l1_xys[0]
a_xy
compute_angle(a_xy, b_xy, c_xy)
def find_pole_lines(distribution_lines, pole):
pole_lines = []
for distribution_line in distribution_lines:
pole1_id = distribution_line.pole1_id
pole2_id = distribution_line.pole2_id
if pole.id in (pole1_id, pole2_id):
pole_lines.append(distribution_line)
return pole_lines
def get_pole_angle_points(pole_lines):
l1 = pole_lines[0].geometry
l2 = pole_lines[1].geometry
b_point = l1.intersection(l2)
b_xy = b_point.x, b_point.y
a_xy = get_other_endpoint_xy(l1, b_xy)
c_xy = get_other_endpoint_xy(l2, b_xy)
return a_xy, b_xy, c_xy
def get_other_endpoint_xy(line, endpoint_xy):
endpoint_xys = list(line.coords)
endpoint_xys.remove(endpoint_xy)
return endpoint_xys[0]
# Use pole type A when the angle of the incoming and outcoming distribution line is smaller than X degrees
# Use pole type A for solar panel poles
# Use pole type B otherwise
target_angle = 120
for pole in poles:
pole_lines = find_pole_lines(distribution_lines, pole)
if len(pole_lines) < 2:
continue
angle = compute_angle(*get_pole_angle_points(pole_lines))
if angle < target_angle:
pole.has_corner_angle = True
for pole in poles:
if pole.has_corner_angle or pole.has_solar_panel:
pole.type_id = 'A'
else:
pole.type_id = 'B'
import utm
from os.path import join
from pandas import DataFrame
target_folder = '/tmp'
x, y = poles[0].xy
utm.to_latlon(x, y, zone_number, zone_letter)
def get_lonlat(xy):
x, y = xy
lat, lon = utm.to_latlon(x, y, zone_number, zone_letter)
return lon, lat
rows = []
for pole in poles:
longitude, latitude = get_lonlat(pole.xy)
rows.append([
pole.id,
pole.type_id,
len(pole.customers or []),
pole.has_solar_panel,
pole.has_street_lamp,
pole.has_wireless_internet,
pole.has_corner_angle,
longitude,
latitude,
])
target_path = join(target_folder, 'poles.csv')
DataFrame(rows, columns=[
'pole_id',
'type_id',
'customer_count',
'has_solar_panel',
'has_street_lamp',
'has_wireless_internet',
'has_corner_angle',
'longitude',
'latitude',
]).to_csv(target_path, index=False)
rows = []
for battery in batteries:
longitude, latitude = get_lonlat(battery.xy)
rows.append([
battery.id,
battery.demand_in_kwh_per_day,
len(battery.solar_poles),
longitude,
latitude,
])
target_path = join(target_folder, 'batteries.csv')
DataFrame(rows, columns=[
'battery_id',
'demand_in_kwh_per_day',
'solar_panel_count',
'longitude',
'latitude',
]).to_csv(target_path, index=False)
rows = []
for customer in customers:
point = customer.geometry.centroid
customer_xy = point.x, point.y
longitude, latitude = get_lonlat(customer_xy)
rows.append([
customer.id,
customer.demand_in_kwh_per_day,
customer.pole_id,
customer.drop_line_length,
longitude,
latitude,
])
target_path = join(target_folder, 'customers.csv')
DataFrame(rows, columns=[
'customer_id',
'demand_in_kwh_per_day',
'pole_id',
'drop_line_length',
'longitude',
'latitude',
]).to_csv(target_path, index=False)
rows = []
for distribution_line in distribution_lines:
rows.append([
distribution_line.pole1_id,
distribution_line.pole2_id,
distribution_line.length,
])
target_path = join(target_folder, 'lines.csv')
DataFrame(rows, columns=[
'pole1_id',
'pole2_id',
'length',
]).to_csv(target_path, index=False)
20171128-2300 - 20171128-2315: 15 minutes
Last week, we confirmed that there should be enough solar panels to satisfy the demand per day.
+ Ask about discrepancy between actual solar pole count and expected solar pole count
Last week, we confirmed that the end poles should use the shortest pole type.
+ Ask about pole type for end poles
20171129-1230 - 20171129-1315: 45 minutes
Finalize public logs
Make sure all parameters are in the first code cell
Make sure that 1st implementation works with all provided datasets
Re-deploy tool