Prototype Solar Mini Grid Layout (1st Implementation)




Pay Notebook Creator: Roy Hyunjin Han250
Set Container: Numerical CPU with TINY Memory for 10 Minutes 0
Total0

Vision

Inspire a community of professionals skilled in spatiotemporal analysis for community health and safety.

Mission

Assemble 1st implementation of an algorithm for prototyping solar mini grid configurations according to specifications provided by Electric Vine Industries.

Owner

Roy Hyunjin Han

Context

Automating mini grid site configuration will make it possible to automate mini grid site selection.

Timeframe

20171115-1415 - 20171122-1415: 1 week estimated

20171115-1415 - 20171129-1415: 2 weeks actual

Objectives

+ Draft 1st implementation
+ Deploy 1st implementation

Log

Press SHIFT-ENTER to run each cell.

Load Customers, Roads, Obstacles

20171115-1415 - 20171115-1445: 30 minutes

In [ ]:
import attr
from shapely.geometry.base import BaseGeometry

@attr.s
class GeometryMixin(object):
    id = attr.ib(default=0)
    geometry = attr.ib(default=attr.Factory(BaseGeometry))
    attributes = attr.ib(default=attr.Factory(dict))

class PointMixin(GeometryMixin):
    
    @property
    def xy(self):
        return self.geometry.x, self.geometry.y
In [ ]:
class Customer(GeometryMixin):
    demand_in_kwh_per_day = 0

class Road(GeometryMixin):
    pass

class Obstacle(GeometryMixin):
    pass
In [ ]:
class Battery(PointMixin):
    type_id = None
    drop_poles = None
    demand_in_kwh_per_day = 0

class Line(GeometryMixin):
    pole1_id = None
    pole2_id = None
    length = 0
    
class Pole(PointMixin):
    type_id = None
    customers = None
    has_solar_panel = False
    has_street_lamp = False
    has_corner_angle = False
    has_wireless_internet = False
+ Define classes
+ Gather datasets
In [ ]:
from os.path import exists, join
dataset_folder = '../Tools'

customer_geotable_path = join(dataset_folder, 'customers.shp.zip')
road_geotable_path = join(dataset_folder, 'roads.shp.zip')
obstacle_geotable_path = join(dataset_folder, 'obstacles.shp.zip')

for path in customer_geotable_path, road_geotable_path, obstacle_geotable_path:
    assert exists(customer_geotable_path)

I really need to rewrite geometryIO one of these days.

In [ ]:
import geometryIO
In [ ]:
proj4, geometries, field_packs, field_definitions = geometryIO.load(
    customer_geotable_path, targetProj4=geometryIO.proj4LL)
customer_geometries = geometries
In [ ]:
field_definitions
In [ ]:
proj4, geometries, field_packs, field_definitions = geometryIO.load(
    road_geotable_path, targetProj4=geometryIO.proj4LL)
road_geometries = geometries

proj4, geometries, field_packs, field_definitions = geometryIO.load(
    obstacle_geotable_path, targetProj4=geometryIO.proj4LL)
obstacle_geometries = geometries
In [ ]:
# This visualization is probably not oriented properly
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection

ColorfulGeometryCollection([
    GeometryCollection(obstacle_geometries),
    GeometryCollection(road_geometries),
    GeometryCollection(customer_geometries),
], colors=[
    'gray',
    'black',
    'yellow',
])
In [ ]:
proj4
+ Load datasets
+ Examine datasets

_ Option 1: Load everything in WGS84 (longitude, latitude) and use vincenty distance
_ Option 2: Convert everything to spherical mercator and use euclidean distance
Option 3: Convert everything to UTM and use euclidean distance

The problem is that I think most of shapely's commands assume the euclidean metric space. And I think distances are more accurate in UTM than spherical mercator (which is optimized for visualization). Distances are important here, so let's use UTM.

20171115-1530 - 20171115-1630: 60 minutes

A solar mini grid is contained in a local region, thus it is safe to use a single UTM zone number. The distances represented by UTM coordinates become skewed the farther they are from their zone.

In [ ]:
geometries = customer_geometries + road_geometries + obstacle_geometries
geometry_collection = GeometryCollection(geometries)
centroid_lonlat = geometry_collection.centroid
centroid_latlon = centroid_lonlat.y, centroid_lonlat.x
centroid_latlon
In [ ]:
import utm
x, y, zone_number, zone_letter = utm.from_latlon(*centroid_latlon)
print(x, y, zone_number, zone_letter)
+ Get the average of all geometries
+ Convert that average into utm
+ Set the zone number from the average utm point as the official zone number    
In [ ]:
zone_letter.upper() < 'N'
In [ ]:
'S' < 'N'
In [ ]:
def get_utm_proj4(zone_number, zone_letter):
    parts = []
    parts.extend([
        '+proj=utm',
        '+zone=%s' % zone_number,        
    ])
    if zone_letter.upper() < 'N':
        parts.append('+south')
    parts.extend([
        '+ellps=WGS84',
        '+datum=WGS84',
        '+units=m',
        '+no_defs',
    ])
    return ' '.join(parts)

target_proj4 = get_utm_proj4(zone_number, zone_letter)
target_proj4
In [ ]:
import geometryIO
proj4, geometries, field_packs, field_definitions = geometryIO.load(
    customer_geotable_path, targetProj4=target_proj4)
customer_geometries = geometries

proj4, geometries, field_packs, field_definitions = geometryIO.load(
    road_geotable_path, targetProj4=target_proj4)
road_geometries = geometries

proj4, geometries, field_packs, field_definitions = geometryIO.load(
    obstacle_geotable_path, targetProj4=target_proj4)
obstacle_geometries = geometries
+ Get proj4 in utm
+ Convert coordinates to utm
In [ ]:
str(customer_geometries[0])
In [ ]:
# This visualization is probably not oriented properly
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection

ColorfulGeometryCollection([
    GeometryCollection(obstacle_geometries),
    GeometryCollection(road_geometries),
    GeometryCollection(customer_geometries),
], colors=[
    'gray',
    'black',
    'yellow',
])
In [ ]:
import utm
from shapely.geometry import GeometryCollection

def get_utm_proj4_from_lonlat_geometries(lonlat_geometries):
    lonlat_point = GeometryCollection(lonlat_geometries).centroid
    longitude = lonlat_point.x
    latitude = lonlat_point.y
    return get_utm_proj4_from_latlon(latitude, longitude)

def get_utm_proj4_from_latlon(latitude, longitude):
    x, y, zone_number, zone_letter = utm.from_latlon(latitude, longitude)
    return get_utm_proj4(zone_number, zone_letter)

def get_utm_proj4(zone_number, zone_letter):
    parts = []
    parts.extend([
        '+proj=utm',
        '+zone=%s' % zone_number,        
    ])
    if zone_letter.upper() < 'N':
        parts.append('+south')
    parts.extend([
        '+ellps=WGS84',
        '+datum=WGS84',
        '+units=m',
        '+no_defs',
    ])
    return ' '.join(parts)
In [ ]:
def load_instances(geotable_path, target_proj4, TargetClass, default_value_by_name=None):
    geometries, field_packs, field_definitions = geometryIO.load(
        geotable_path, targetProj4=target_proj4)[1:]
    instances = []
    for index, (geometry, field_pack) in enumerate(zip(geometries, field_packs)):
        d = {}
        for field_value, (field_name, field_type) in zip(field_pack, field_definitions):
            d[field_name] = field_value
        instance = TargetClass(
            id=d.pop('id', index),
            geometry=geometry,
            attributes=d)
        for k, v in (default_value_by_name or {}).items():
            setattr(instance, k, d.pop(k, v))
        instances.append(instance)
    return instances
In [ ]:
import geometryIO

def load_customers(geotable_path, target_proj4, demand_in_kwh_per_day):
    return load_instances(geotable_path, target_proj4, Customer, {
        'demand_in_kwh_per_day': demand_in_kwh_per_day,
    })

utm_proj4 = get_utm_proj4_from_lonlat_geometries(geometryIO.load(
    customer_geotable_path, targetProj4=geometryIO.proj4LL)[1])
customers = load_customers(customer_geotable_path, utm_proj4, 100)
customers[0]
In [ ]:
def load_roads(geotable_path, target_proj4):
    return load_instances(geotable_path, target_proj4, Road)

roads = load_roads(road_geotable_path, utm_proj4)
In [ ]:
def load_obstacles(geotable_path, target_proj4):
    return load_instances(geotable_path, target_proj4, Obstacle)

obstacles = load_obstacles(obstacle_geotable_path, target_proj4)
In [ ]:
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection

def get_geometries(instances):
    return [x.geometry for x in instances]

def visualize_geometries(*instance_lists, colors=None):
    return ColorfulGeometryCollection([
        GeometryCollection(get_geometries(x)) for x in instance_lists
    ], colors)

visualize_geometries(obstacles, roads, customers, colors=[
    'gray', 'black', 'yellow'])

Place Service Drop Poles

  • Maximize the percentage of customers who are within a maximum distance to a service drop pole.
  • Minimize the number of service drop poles.
  • Minimize the distance from a customer to the nearest service drop pole.
  • Limit the maximum distance from a customer to the nearest service drop pole.
  • Limit the maximum number of customers per service drop pole.
In [ ]:
from copy import copy
from shapely.geometry import LineString
from shapely.ops import polygonize, unary_union

def get_source_polygons(target_geometries, maximum_distance):
    # Convert target_geometries into target_polygons using maximum_distance
    target_polygons = [x.buffer(maximum_distance) for x in target_geometries]
    # Identify overlapping areas
    sliced_polygons = get_disjoint_polygons(target_polygons)
    for polygon in sliced_polygons:
        candidate_polygons = []
        for target_polygon in target_polygons:
            if target_polygon.contains(polygon.centroid):
                candidate_polygons.append(target_polygon)
        polygon.candidate_polygons = candidate_polygons
    # Sort overlapping areas by overlap count
    sorted_polygons = sorted(
        sliced_polygons, key=lambda x: -len(x.candidate_polygons))
    # Assign target_polygons to each sorted_polygon
    social_polygons, lonely_polygons = [], []
    for polygon in sorted_polygons:
        connected_polygons = []
        for candidate_polygon in polygon.candidate_polygons:
            try:
                target_polygons.remove(candidate_polygon)
            except ValueError:
                continue
            connected_polygons.append(candidate_polygon)
        connection_count = len(connected_polygons)
        if connection_count > 1:
            social_polygons.append(polygon)
        elif connection_count == 1:
            lonely_polygons.append(polygon)
        polygon.connected_polygons = connected_polygons
    return social_polygons + lonely_polygons

def get_disjoint_polygons(overlapping_polygons):
    'Split overlapping polygons into disjoint polygons'
    rings = [LineString(list(
        x.exterior.coords)) for x in overlapping_polygons]
    return list(polygonize(unary_union(rings)))
In [ ]:
maximum_drop_line_length_in_meters = 50
maximum_drop_line_count_per_pole = 5
In [ ]:
service_drop_pole_polygons = get_source_polygons(
    customer_geometries, maximum_drop_line_length_in_meters)
In [ ]:
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection

ColorfulGeometryCollection([
    GeometryCollection(service_drop_pole_polygons),
    GeometryCollection(customer_geometries),    
], colors=['green', 'yellow'])
In [ ]:
from math import ceil

def get_service_drop_pole_count(
        source_polygons, maximum_connection_count_per_pole):
    service_drop_pole_count = 0
    for polygon in source_polygons:
        connection_count = len(polygon.connected_polygons)
        pole_count = ceil(connection_count / float(
            maximum_connection_count_per_pole))
        service_drop_pole_count += pole_count
    return service_drop_pole_count
    
get_service_drop_pole_count(
    service_drop_pole_polygons,
    maximum_drop_line_count_per_pole)
In [ ]:
len(service_drop_pole_polygons)

How can we convert service drop polygons into service drop points? I think we can use k-means.

In [ ]:
def get_source_polygons_with_connections(target_instances, maximum_distance):
    target_geometries = get_geometries(target_instances)
    # Convert target_geometries into target_polygons using maximum_distance
    target_polygons = [x.buffer(maximum_distance) for x in target_geometries]
    # Identify overlapping areas
    sliced_polygons = get_disjoint_polygons(target_polygons)
    for polygon in sliced_polygons:
        candidates = []
        for target_instance, target_polygon in zip(target_instances, target_polygons):
            if target_polygon.contains(polygon.centroid):
                candidates.append(target_instance)
        polygon.candidates = candidates
    # Sort overlapping areas by overlap count
    sorted_polygons = sorted(sliced_polygons, key=lambda x: -len(x.candidates))
    # Assign target_polygons to each sorted_polygon
    target_instances = copy(target_instances)
    social_polygons, lonely_polygons = [], []
    for polygon in sorted_polygons:
        connections = []
        for target_instance in polygon.candidates:
            try:
                target_instances.remove(target_instance)
            except ValueError:
                continue
            connections.append(target_instance)
        connection_count = len(connections)
        if connection_count > 1:
            social_polygons.append(polygon)
        elif connection_count == 1:
            lonely_polygons.append(polygon)
        polygon.connections = connections
    return social_polygons + lonely_polygons
In [ ]:
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection, Point

example_customers = [
    Customer(geometry=Point(-1, 0)),
    Customer(geometry=Point(+1, 0)),
    Customer(geometry=Point(0, +2)),
]
ColorfulGeometryCollection([
    GeometryCollection(get_source_polygons_with_connections(
        example_customers, 1.5)),
    GeometryCollection(get_geometries(
        example_customers)),
], colors=['green', 'yellow'])
In [ ]:
maximum_drop_line_length_in_meters = 50

drop_pole_polygons = get_source_polygons_with_connections(
    customers, maximum_drop_line_length_in_meters)
In [ ]:
len(drop_pole_polygons[0].connections)
In [ ]:
import numpy as np
from sklearn.cluster import KMeans

kmeans = KMeans(n_clusters=2).fit(np.array([
    [0, 0],
    [1, 1],
    [9, 9],
]))
kmeans.labels_
In [ ]:
import numpy as np
from collections import defaultdict
from shapely.geometry import Point
from sklearn.cluster import KMeans

def get_instance_clusters(instances, cluster_count):
    geometries = get_geometries(instances)
    points = [x.centroid for x in geometries]
    xys = np.array([(point.x, point.y) for point in points])
    kmeans = KMeans(n_clusters=cluster_count).fit(xys)
    instances_by_label = defaultdict(list)
    for instance, label in zip(instances, kmeans.labels_):
        instances_by_label[label].append(instance)
    return list(instances_by_label.values())

get_instance_clusters([
    Customer(id=1, geometry=Point(0, 0)),
    Customer(id=2, geometry=Point(1, 1)),
    Customer(id=3, geometry=Point(9, 9)),
], 2)
In [ ]:
def estimate_drop_pole_count(
        drop_pole_polygon, maximum_drop_line_count_per_pole):
    connection_count = len(drop_pole_polygon.connections)
    if not connection_count:
        return 1
    return ceil(connection_count / float(
        maximum_drop_line_count_per_pole))

estimate_drop_pole_count(
    drop_pole_polygons[0], maximum_drop_line_count_per_pole)
_ Nelder-Mead
_ Powell
_ CG
_ BFGS
_ Newton-CG
L-BFGS-B
_ TNC
_ COBYLA
_ SLSQP
_ dogleg
_ trust-ncg
_ trust-exact
_ trust-krylov

+ Choose an optimization algorithm
In [ ]:
from scipy.optimize import minimize

f = lambda x: x ** 2
minimize(f, 10, method='L-BFGS-B').x[0]
In [ ]:
from shapely.geometry import Point
Point(0, 0).coords[0]
In [ ]:
x = Point(0, 0)
x.distance(Point(1, 1))
In [ ]:
from shapely.geometry import Point

points = [
    Point(-1, 0),
    Point(0, +2),
    Point(+1, 0),
]

def sum_distances(xy):
    return sum(Point(xy).distance(p) for p in points)

sum_distances((0, 0))
In [ ]:
minimize(sum_distances, points[0], method='L-BFGS-B').x
In [ ]:
from scipy.optimize import minimize

def place_store(geometries):
    'Return the point that minimizes the sum of distances to each geometry'
    def sum_distances(xy):
        return sum(Point(xy).distance(g) for g in geometries)
    xy = geometries[0].centroid.coords[0]
    return Point(minimize(sum_distances, xy, method='L-BFGS-B').x)

place_store(points).coords[0]
In [ ]:
def make_drop_pole(customers):
    pole_point = place_store(get_geometries(customers))
    pole = Pole(geometry=pole_point)
    pole.customers = customers
    return pole
In [ ]:
def get_drop_poles(
        customers,
        maximum_drop_line_length_in_meters,
        maximum_drop_line_count_per_pole):
    drop_pole_polygons = get_source_polygons_with_connections(
        customers, maximum_drop_line_length_in_meters)

    drop_poles = []
    for polygon in drop_pole_polygons:
        connection_count = len(polygon.connections)
        pole_count = estimate_drop_pole_count(
            polygon, maximum_drop_line_count_per_pole)
        if pole_count > 1:
            clusters = get_instance_clusters(
                polygon.connections, cluster_count=pole_count)
            for customers in clusters:
                drop_poles.append(make_drop_pole(customers))
        else:
            drop_poles.append(make_drop_pole(polygon.connections))

    for index, pole in enumerate(drop_poles):
        pole.id = index
        for customer in pole.customers:
            customer.pole_id = pole.id
            customer.drop_line_length = Point(
                pole.xy).distance(customer.geometry.centroid)
    return drop_poles

drop_poles = get_drop_poles(
    customers,
    maximum_drop_line_length_in_meters,
    maximum_drop_line_count_per_pole)
visualize_geometries(drop_poles, customers, colors=['red', 'yellow'])
In [ ]:
drop_poles[:3]
In [ ]:
len(drop_poles[0].customers)

Place Distribution Poles (1st Implementation)

  • Maximize the percentage of service drop poles connected by distribution cable.
  • Minimize the distribution cable length.
  • Limit the maximum distance between poles.
In [ ]:
import networkx as nx
from itertools import combinations
from scipy.spatial.distance import euclidean as get_distance

drop_pole_points = get_geometries(drop_poles)
graph = nx.Graph()
for p1, p2 in combinations(drop_pole_points, 2):
    distance = get_distance(p1, p2)
    graph.add_edge(
        tuple(p1.coords[0]),
        tuple(p2.coords[0]), weight=distance)
tree = nx.minimum_spanning_tree(graph)
edges = tree.edges()
len(edges)
In [ ]:
list(edges)[0]
In [ ]:
drop_poles[0].geometry.coords[0]
In [ ]:
import networkx as nx
from itertools import combinations
from scipy.spatial.distance import euclidean as get_distance
from shapely.geometry import LineString

def get_distribution_lines(drop_poles):
    drop_pole_points = get_geometries(drop_poles)
    graph = nx.Graph()
    for pole1, pole2 in combinations(drop_poles, 2):
        distance = get_distance(pole1.geometry, pole2.geometry)
        graph.add_edge(pole1.id, pole2.id, weight=distance)
    tree = nx.minimum_spanning_tree(graph)
    distribution_lines = []
    pole_by_id = {x.id: x for x in drop_poles}
    for pole1_id, pole2_id in tree.edges():
        pole1_coordinates = pole_by_id[pole1_id].geometry.coords[0]
        pole2_coordinates = pole_by_id[pole2_id].geometry.coords[0]
        line_coordinates = pole1_coordinates, pole2_coordinates
        l = LineString(line_coordinates)
        line = Line(
            id='%s-%s' % (pole1_id, pole2_id),
            geometry=l)
        line.pole1_id = pole1_id
        line.pole2_id = pole2_id
        line.length = l.length
        distribution_lines.append(line)
    return distribution_lines

distribution_lines = get_distribution_lines(drop_poles)
visualize_geometries(
    distribution_lines, drop_poles, colors=['orange', 'red'])
In [ ]:
print(list(distribution_lines[0].geometry.coords))
print(list(distribution_lines[0].geometry.coords[-1]))
In [ ]:
line = LineString([(0, 0), (1, 1)])
line.coords[-1]
In [ ]:
line.interpolate(line.length).coords[-1]
In [ ]:
from shapely.geometry import LineString, MultiPoint, Point

def get_distribution_poles(
        distribution_lines, distribution_pole_interval_in_meters):
    distribution_poles = []
    for distribution_line in distribution_lines:
        line = distribution_line.geometry
        line_length_in_meters = line.length
        distribution_pole_count = int(
            line_length_in_meters / distribution_pole_interval_in_meters)
        
        segment_count = distribution_pole_count + 1
        adjusted_interval_in_meters = line_length_in_meters / float(
            segment_count)
        
        def make_pole(index, point):
            return Pole(
                id='%s-%s-%s' % (
                    distribution_line.pole1_id,
                    distribution_line.pole2_id, index),
                geometry=point)
        
        for segment_index in range(1, segment_count):
            distribution_poles.append(make_pole(
                segment_index,
                line.interpolate(
                    adjusted_interval_in_meters * segment_index)))
    return distribution_poles

distribution_poles = get_distribution_poles(distribution_lines, 75)
visualize_geometries(
    distribution_poles, drop_poles, colors=['orange', 'red'])

20171116-0000 - 20171116-0100: 60 minutes

+ Sort pieces by overlap
+ Write small assertion test for the bug pinpointed by Deril
+ Replace centroid with scipy.optimize.minimize

Place Batteries (1st Implementation)

  • Minimize the number of batteries.
  • Minimize the distance of a battery to each service drop pole.
  • Limit the maximum distance of a battery to a service drop pole.
In [ ]:
def get_batteries(drop_poles, maximum_battery_line_length_in_meters):
    battery_polygons = get_source_polygons_with_connections(
        drop_poles, maximum_battery_line_length_in_meters)
    batteries = []
    for index, battery_polygon in enumerate(battery_polygons):
        connections = battery_polygon.connections
        battery_point = place_store(get_geometries(connections))
        battery = Battery(id=index, geometry=battery_point)
        battery.drop_poles = connections
        batteries.append(battery)
    return batteries

batteries = get_batteries(
    drop_poles, maximum_battery_line_length_in_meters=100)
visualize_geometries(
    distribution_lines, drop_poles, batteries, colors=[
        'orange', 'red', 'blue'])

20171117-1815 - 20171117-1845: 30 minutes

_ Project each battery to the nearest distribution line

Place Street Lamps

  • Minimize the summed distance of a customer to a street lamp over all customers.
  • Place a street lamp on selected poles to satisfy the above constraint.
  • Limit the maximum distance of a street lamp to a customer.
In [ ]:
maximum_lamp_pole_distance_in_meters = 200
poles = drop_poles + distribution_poles
In [ ]:
import numpy as np

def get_best_pole(
        customer_tree, poles, maximum_lamp_pole_distance_in_meters):
    best_metric = np.inf, np.inf
    best_pole = None
    for pole in poles:
        metric = compute_lamp_pole_metric(customer_tree, pole)
        if metric < best_metric:
            best_metric = metric
            best_pole = pole
    return best_pole

def compute_lamp_pole_metric(customer_tree, pole):
    customer_count = len(customer_tree.data)
    pole_point = pole.geometry
    pole_xy = pole_point.x, pole_point.y
    distances, indices = customer_tree.query(
        pole_xy,
        k=customer_count,
        distance_upper_bound=maximum_lamp_pole_distance_in_meters)
    if not hasattr(distances, '__iter__'):
        distances = [distances]
        indices = [indices]
    distances = [x for x in distances if x < np.inf]
    indices = [x for x in indices if x != customer_count]
    return -len(indices), sum(distances)
In [ ]:
from copy import copy
from scipy.spatial import KDTree

def get_lamp_poles(customers, poles):
    remaining_customers = copy(customers)
    remaining_poles = copy(poles)
    lamp_poles = []
    while remaining_customers:
        customer_tree = KDTree([(_.x, _.y) for _ in get_geometries(
            remaining_customers)])
        pole = get_best_pole(
            customer_tree,
            remaining_poles,
            maximum_lamp_pole_distance_in_meters)
        lamp_poles.append(pole)
        # Remove lamp pole from remaining_poles
        remaining_poles.remove(pole)
        # Remove satisfied customers from remaining_customers
        pole_point = pole.geometry
        pole_xy = pole_point.x, pole_point.y
        indices = customer_tree.query_ball_point(
            pole_xy,
            maximum_lamp_pole_distance_in_meters)
        for customer in np.array(remaining_customers)[indices]:
            remaining_customers.remove(customer)
    for pole in lamp_poles:
        pole.has_street_lamp = True
    return lamp_poles

get_lamp_poles(customers, poles)

Place Equipment (1st implementation)

  • Estimate the demand in kWh/day for each battery.
  • Estimate the number of solar panels needed to satisfy the daily demand for each battery.
  • Place a solar panel on poles closest to a battery.
In [ ]:
# Estimate the demand in kWh/day for each battery
for battery in batteries:
    demand_in_kwh_per_day = 0
    for pole in battery.drop_poles:
        for customer in pole.customers:
            demand_in_kwh_per_day += customer.demand_in_kwh_per_day
    battery.demand_in_kwh_per_day = demand_in_kwh_per_day
batteries[0].demand_in_kwh_per_day
In [ ]:
minimum_solar_pole_count_per_kwh = 5
In [ ]:
distribution_lines[0].__dict__
In [ ]:
# Estimate the number of solar panels needed
# Place a solar panel on poles closest to a battery
import networkx as nx
from networkx.algorithms.shortest_paths import shortest_path_length
from scipy.spatial import KDTree

distribution_graph = nx.Graph()
for x in distribution_lines:
    distribution_graph.add_edge(x.pole1_id, x.pole2_id)
pole_by_id = {x.id: x for x in poles}    
    
def get_nearest_poles_on_distribution_graph(pole, k):
    packs = []
    for pole_id in distribution_graph:
        path_length = shortest_path_length(
            distribution_graph, pole_id, pole.id)
        packs.append((path_length, pole_id))
    sorted_packs = sorted(packs)
    selected_packs = sorted_packs[:k]
    return [
        pole_by_id[pole_id] for path_length, pole_id in selected_packs]

get_nearest_poles_on_distribution_graph(poles[0], 2)
In [ ]:
solar_poles = []
pole_tree = KDTree([_.xy for _ in poles])
for battery in batteries:
    solar_pole_count = \
        minimum_solar_pole_count_per_kwh * \
        battery.demand_in_kwh_per_day
    distance, index = pole_tree.query(battery.xy)
    battery_pole = poles[index]
    nearest_poles = get_nearest_poles_on_distribution_graph(
        battery_pole, solar_pole_count)
    for pole in nearest_poles:
        pole.has_solar_panel = True
        if pole not in solar_poles:
            solar_poles.append(pole)
    battery.solar_poles = nearest_poles
    
len(solar_poles)    
In [ ]:
# !!!
expected_solar_pole_count = sum(
    len(x.solar_poles) for x in batteries)
expected_solar_pole_count

Decide Pole Types

  • Use pole type A when the angle of the incoming and outgoing distribution line is smaller than X degrees.
  • Use pole type A for solar panel poles.
  • Use pole type B otherwise.
In [ ]:
import numpy as np

def compute_angle(a_xy, b_xy, c_xy):
    # https://stackoverflow.com/a/31735642/192092
    b_xy = np.array(b_xy)
    angle1 = np.arctan2(*(a_xy - b_xy)[::-1])
    angle2 = np.arctan2(*(c_xy - b_xy)[::-1])
    angle_in_degrees = np.rad2deg(angle1 - angle2)
    if angle_in_degrees < 0:
        angle_in_degrees *= -1
    if angle_in_degrees > 180:
        angle_in_degrees = 360 - angle_in_degrees
    return angle_in_degrees

print(compute_angle((2, 0), (0, 0), (-1, 1)))
print(compute_angle((2, 0), (0, 0), (1, 1)))
print(compute_angle((3, 1), (1, 1), (0, 2)))
In [ ]:
pole = poles[0]

# Find distribution lines corresponding to pole
pole_lines = []
for distribution_line in distribution_lines:
    pole1_id = distribution_line.pole1_id
    pole2_id = distribution_line.pole2_id
    if pole.id in (pole1_id, pole2_id):
        pole_lines.append(distribution_line)
pole_lines
In [ ]:
# Compute angle
l1 = pole_lines[0].geometry
l2 = pole_lines[1].geometry
b_point = l1.intersection(l2)
b_xy = b_point.x, b_point.y
b_xy
In [ ]:
def get_other_endpoint_xy(line, endpoint_xy):
    endpoint_xys = list(line.coords)
    endpoint_xys.remove(endpoint_xy)
    return endpoint_xys[0]

a_xy = get_other_endpoint_xy(l1, b_xy)
c_xy = get_other_endpoint_xy(l2, b_xy)
print(a_xy)
print(b_xy)
print(c_xy)
In [ ]:
l1_xys = list(l1.coords)
l1_xys.remove(b_xy)
a_xy = l1_xys[0]
a_xy
In [ ]:
compute_angle(a_xy, b_xy, c_xy)
In [ ]:
def find_pole_lines(distribution_lines, pole):
    pole_lines = []
    for distribution_line in distribution_lines:
        pole1_id = distribution_line.pole1_id
        pole2_id = distribution_line.pole2_id
        if pole.id in (pole1_id, pole2_id):
            pole_lines.append(distribution_line)
    return pole_lines

def get_pole_angle_points(pole_lines):
    l1 = pole_lines[0].geometry
    l2 = pole_lines[1].geometry
    b_point = l1.intersection(l2)
    b_xy = b_point.x, b_point.y
    a_xy = get_other_endpoint_xy(l1, b_xy)
    c_xy = get_other_endpoint_xy(l2, b_xy)
    return a_xy, b_xy, c_xy

def get_other_endpoint_xy(line, endpoint_xy):
    endpoint_xys = list(line.coords)
    endpoint_xys.remove(endpoint_xy)
    return endpoint_xys[0]
In [ ]:
# Use pole type A when the angle of the incoming and outcoming distribution line is smaller than X degrees
# Use pole type A for solar panel poles
# Use pole type B otherwise

target_angle = 120

for pole in poles:
    pole_lines = find_pole_lines(distribution_lines, pole)
    if len(pole_lines) < 2:
        continue
    angle = compute_angle(*get_pole_angle_points(pole_lines))
    if angle < target_angle:
        pole.has_corner_angle = True
        
for pole in poles:
    if pole.has_corner_angle or pole.has_solar_panel:
        pole.type_id = 'A'
    else:
        pole.type_id = 'B'

Save Poles, Batteries, Customers, Distribution Lines

  • Table of poles
    • Pole ID
    • Pole type
    • Customer count
    • Has solar panel
    • Has street lamp
    • Has wireless internet
    • Has corner angle
    • Longitude
    • Latitude
In [ ]:
import utm
from os.path import join
from pandas import DataFrame
target_folder = '/tmp'
In [ ]:
x, y = poles[0].xy
utm.to_latlon(x, y, zone_number, zone_letter)
In [ ]:
def get_lonlat(xy):
    x, y = xy
    lat, lon = utm.to_latlon(x, y, zone_number, zone_letter)
    return lon, lat
In [ ]:
rows = []
for pole in poles:
    longitude, latitude = get_lonlat(pole.xy)
    rows.append([
        pole.id,
        pole.type_id,
        len(pole.customers or []),
        pole.has_solar_panel,
        pole.has_street_lamp,
        pole.has_wireless_internet,
        pole.has_corner_angle,
        longitude,
        latitude,
    ])
target_path = join(target_folder, 'poles.csv')    
DataFrame(rows, columns=[
    'pole_id',
    'type_id',
    'customer_count',
    'has_solar_panel',
    'has_street_lamp',
    'has_wireless_internet',
    'has_corner_angle',
    'longitude',
    'latitude',
]).to_csv(target_path, index=False) 
  • Table of batteries
    • Battery type
    • Longitude
    • Latitude
In [ ]:
rows = []
for battery in batteries:
    longitude, latitude = get_lonlat(battery.xy)
    rows.append([
        battery.id,
        battery.demand_in_kwh_per_day,
        len(battery.solar_poles),
        longitude,
        latitude,
    ])
target_path = join(target_folder, 'batteries.csv')        
DataFrame(rows, columns=[
    'battery_id',
    'demand_in_kwh_per_day',
    'solar_panel_count',
    'longitude',
    'latitude',
]).to_csv(target_path, index=False)    
  • Table of customers
    • Customer ID
    • Demand in kWh/day
    • Pole ID
    • Service drop line length
    • Longitude
    • Latitude
In [ ]:
rows = []
for customer in customers:
    point = customer.geometry.centroid
    customer_xy = point.x, point.y
    longitude, latitude = get_lonlat(customer_xy)
    rows.append([
        customer.id,
        customer.demand_in_kwh_per_day,
        customer.pole_id,
        customer.drop_line_length,
        longitude,
        latitude,
    ])
target_path = join(target_folder, 'customers.csv')        
DataFrame(rows, columns=[
    'customer_id',
    'demand_in_kwh_per_day',
    'pole_id',
    'drop_line_length',
    'longitude',
    'latitude',
]).to_csv(target_path, index=False)    
  • Table of service distribution lines
    • Pole ID 1
    • Pole ID 2
    • Service distribution line length
In [ ]:
rows = []
for distribution_line in distribution_lines:
    rows.append([
        distribution_line.pole1_id,
        distribution_line.pole2_id,
        distribution_line.length,
    ])
target_path = join(target_folder, 'lines.csv')
DataFrame(rows, columns=[
    'pole1_id',
    'pole2_id',
    'length',
]).to_csv(target_path, index=False)    

20171128-2300 - 20171128-2315: 15 minutes

Last week, we confirmed that there should be enough solar panels to satisfy the demand per day.

+ Ask about discrepancy between actual solar pole count and expected solar pole count

Last week, we confirmed that the end poles should use the shortest pole type.

+ Ask about pole type for end poles

20171129-1230 - 20171129-1315: 45 minutes

Tasks

Finalize public logs
Make sure all parameters are in the first code cell
Make sure that 1st implementation works with all provided datasets
Re-deploy tool