Prototype Solar Mini Grid Layout (1st Implementation)




Pay Notebook Creator: Roy Hyunjin Han250
Set Container: Numerical CPU with TINY Memory for 10 Minutes 0
Total0

Prototype Solar Mini Grid Layout Without Roads Without Obstacles

This tool was commissioned by Electric Vine Industries, an innovative solar mini grid company.

{ customer_geotable : Customers ? Remember to specify demand_in_kwh_per_day for each customer }

{ minimum_solar_pole_count_per_kwh : Minimum Solar Pole Count per kWh ? Set the minimum number of solar poles required per kWh of demand }

{ maximum_drop_line_length_in_meters : Maximum Service Drop Line Length in Meters ? Set the maximum distance of a service drop pole to a customer }

{ maximum_drop_line_count_per_pole : Maximum Service Drop Line Count per Pole ? Set the maximum number of service drop lines per pole }

{ maximum_battery_line_length_in_meters : Maximum Battery Line Length in Meters ? Set the maximum distance of a battery to a service drop pole as measured along a distribution line }

{ maximum_lamp_pole_distance_in_meters : Maximum Lamp Pole Distance in Meters ? Set the maximum distance of a lamp pole to a customer }

{ maximum_angle_for_pole_type_a : Maximum Angle for Pole Type A ? Set the maximum angle between the incoming and outgoing distribution lines for pole type A }

{ maximum_distribution_pole_interval_in_meters : Maximum Distribution Pole Interval in Meters ? Set the maximum distance between distribution poles }

In [ ]:
# Press the Blue Button to preview this as a CrossCompute tool
customer_geotable_path = 'customers.shp.zip'
minimum_solar_pole_count_per_kwh = 5
maximum_drop_line_length_in_meters = 50
maximum_drop_line_count_per_pole = 5
maximum_battery_line_length_in_meters = 100
maximum_lamp_pole_distance_in_meters = 100
maximum_angle_for_pole_type_a = 120
maximum_distribution_pole_interval_in_meters = 75
target_folder = '/tmp'

Define Classes and Functions

In [ ]:
import attr
from shapely.geometry.base import BaseGeometry

@attr.s
class GeometryMixin(object):
    id = attr.ib(default=0)
    geometry = attr.ib(default=attr.Factory(BaseGeometry))
    attributes = attr.ib(default=attr.Factory(dict))

class PointMixin(GeometryMixin):
    @property
    def xy(self):
        return self.geometry.x, self.geometry.y
In [ ]:
class Customer(GeometryMixin):
    demand_in_kwh_per_day = 0
    
class Battery(PointMixin):
    type_id = None
    drop_poles = None
    demand_in_kwh_per_day = 0

class Line(GeometryMixin):
    pole1_id = None
    pole2_id = None
    length = 0
    
class Pole(PointMixin):
    type_id = None
    customers = None
    has_solar_panel = False
    has_street_lamp = False
    has_corner_angle = False
In [ ]:
from geotable import ColorfulGeometryCollection
from shapely.geometry import GeometryCollection

def get_geometries(instances):
    return [x.geometry for x in instances]

def visualize_geometries(*instance_lists, colors=None):
    return ColorfulGeometryCollection([
        GeometryCollection(get_geometries(x)) for x in instance_lists
    ], colors)

Load Customers

  • Choose UTM zone.
  • Load geometries using the chosen UTM projection.
In [ ]:
def load_instances(
        geotable_path, target_proj4, TargetClass, default_value_by_name=None):
    geometries, field_packs, field_definitions = geometryIO.load(
        geotable_path, targetProj4=target_proj4)[1:]
    instances = []
    for index, (geometry, field_pack) in enumerate(zip(geometries, field_packs)):
        d = {}
        for field_value, (field_name, field_type) in zip(field_pack, field_definitions):
            d[field_name] = field_value
        instance = TargetClass(
            id=d.pop('id', index),
            geometry=geometry,
            attributes=d)
        for k, v in (default_value_by_name or {}).items():
            setattr(instance, k, d.pop(k, v))
        instances.append(instance)
    return instances
In [ ]:
import utm
from shapely.geometry import GeometryCollection

def get_utm_zone_from_lonlat_geometries(lonlat_geometries):
    lonlat_point = GeometryCollection(lonlat_geometries).centroid
    longitude = lonlat_point.x
    latitude = lonlat_point.y
    x, y, zone_number, zone_letter = utm.from_latlon(
        latitude, longitude)
    return zone_number, zone_letter

def get_utm_proj4(zone_number, zone_letter):
    parts = []
    parts.extend([
        '+proj=utm',
        '+zone=%s' % zone_number,        
    ])
    if zone_letter.upper() < 'N':
        parts.append('+south')
    parts.extend([
        '+ellps=WGS84',
        '+datum=WGS84',
        '+units=m',
        '+no_defs',
    ])
    return ' '.join(parts)
In [ ]:
import geometryIO
from geotable import ColorfulGeometryCollection

def load_customers(geotable_path, target_proj4, demand_in_kwh_per_day):
    return load_instances(geotable_path, target_proj4, Customer, {
        'demand_in_kwh_per_day': demand_in_kwh_per_day,
    })

lonlat_geometries = geometryIO.load(
    customer_geotable_path, targetProj4=geometryIO.proj4LL)[1]
zone_number, zone_letter = get_utm_zone_from_lonlat_geometries(
    lonlat_geometries)
utm_proj4 = get_utm_proj4(zone_number, zone_letter)
customers = load_customers(customer_geotable_path, utm_proj4, 100)
visualize_geometries(customers, colors=['yellow'])

Place Service Drop Poles

  • Maximize the percentage of customers who are within a maximum distance to a service drop pole.
  • Minimize the number of service drop poles.
  • Minimize the distance from a customer to the nearest service drop pole.
  • Limit the maximum distance from a customer to the nearest service drop pole.
  • Limit the maximum number of customers per service drop pole.
In [ ]:
from shapely.geometry import LineString
from shapely.ops import polygonize, unary_union

def get_disjoint_polygons(overlapping_polygons):
    'Split overlapping polygons into disjoint polygons'
    rings = [LineString(list(
        x.exterior.coords)) for x in overlapping_polygons]
    return list(polygonize(unary_union(rings)))
In [ ]:
from copy import copy

def get_source_polygons_with_connections(target_instances, maximum_distance):
    target_geometries = get_geometries(target_instances)
    # Convert target_geometries into target_polygons using maximum_distance
    target_polygons = [x.buffer(maximum_distance) for x in target_geometries]
    # Identify overlapping areas
    sliced_polygons = get_disjoint_polygons(target_polygons)
    for polygon in sliced_polygons:
        candidates = []
        for target_instance, target_polygon in zip(target_instances, target_polygons):
            if target_polygon.contains(polygon.centroid):
                candidates.append(target_instance)
        polygon.candidates = candidates
    # Sort overlapping areas by overlap count
    sorted_polygons = sorted(sliced_polygons, key=lambda x: -len(x.candidates))
    # Assign target_polygons to each sorted_polygon
    target_instances = copy(target_instances)
    social_polygons, lonely_polygons = [], []
    for polygon in sorted_polygons:
        connections = []
        for target_instance in polygon.candidates:
            try:
                target_instances.remove(target_instance)
            except ValueError:
                continue
            connections.append(target_instance)
        connection_count = len(connections)
        if connection_count > 1:
            social_polygons.append(polygon)
        elif connection_count == 1:
            lonely_polygons.append(polygon)
        polygon.connections = connections
    return social_polygons + lonely_polygons
In [ ]:
from math import ceil

def estimate_drop_pole_count(
        drop_pole_polygon, maximum_drop_line_count_per_pole):
    connection_count = len(drop_pole_polygon.connections)
    if not connection_count:
        return 1
    return ceil(connection_count / float(
        maximum_drop_line_count_per_pole))
In [ ]:
import numpy as np
from collections import defaultdict
from shapely.geometry import Point
from sklearn.cluster import KMeans

def get_instance_clusters(instances, cluster_count):
    geometries = get_geometries(instances)
    points = [x.centroid for x in geometries]
    xys = np.array([(point.x, point.y) for point in points])
    kmeans = KMeans(n_clusters=cluster_count).fit(xys)
    instances_by_label = defaultdict(list)
    for instance, label in zip(instances, kmeans.labels_):
        instances_by_label[label].append(instance)
    return list(instances_by_label.values())
In [ ]:
from scipy.optimize import minimize

def place_store(geometries):
    'Return the point that minimizes the sum of distances to each geometry'
    def sum_distances(xy):
        return sum(Point(xy).distance(g) for g in geometries)
    xy = geometries[0].centroid.coords[0]
    return Point(minimize(sum_distances, xy, method='L-BFGS-B').x)
In [ ]:
def make_drop_pole(customers):
    pole_point = place_store(get_geometries(customers))
    pole = Pole(geometry=pole_point)
    pole.customers = customers
    return pole
In [ ]:
def get_drop_poles(
        customers,
        maximum_drop_line_length_in_meters,
        maximum_drop_line_count_per_pole):
    drop_pole_polygons = get_source_polygons_with_connections(
        customers, maximum_drop_line_length_in_meters)

    drop_poles = []
    for polygon in drop_pole_polygons:
        connection_count = len(polygon.connections)
        pole_count = estimate_drop_pole_count(
            polygon, maximum_drop_line_count_per_pole)
        if pole_count > 1:
            clusters = get_instance_clusters(
                polygon.connections, cluster_count=pole_count)
            for customers in clusters:
                drop_poles.append(make_drop_pole(customers))
        else:
            drop_poles.append(make_drop_pole(polygon.connections))

    for index, pole in enumerate(drop_poles):
        pole.id = index
        for customer in pole.customers:
            customer.pole_id = pole.id
            customer.drop_line_length = Point(
                pole.xy).distance(customer.geometry.centroid)
    return drop_poles

drop_poles = get_drop_poles(
    customers,
    maximum_drop_line_length_in_meters,
    maximum_drop_line_count_per_pole)
visualize_geometries(drop_poles, customers, colors=['red', 'yellow'])

Place Distribution Poles (1st Implementation)

  • Maximize the percentage of service drop poles connected by distribution cable.
  • Minimize the distribution cable length.
  • Limit the maximum distance between poles.
In [ ]:
import networkx as nx
from itertools import combinations
from scipy.spatial.distance import euclidean as get_distance
from shapely.geometry import LineString

def get_distribution_lines(drop_poles):
    drop_pole_points = get_geometries(drop_poles)
    graph = nx.Graph()
    for pole1, pole2 in combinations(drop_poles, 2):
        distance = get_distance(pole1.geometry, pole2.geometry)
        graph.add_edge(pole1.id, pole2.id, weight=distance)
    tree = nx.minimum_spanning_tree(graph)
    distribution_lines = []
    pole_by_id = {x.id: x for x in drop_poles}
    for pole1_id, pole2_id in tree.edges():
        pole1_coordinates = pole_by_id[pole1_id].geometry.coords[0]
        pole2_coordinates = pole_by_id[pole2_id].geometry.coords[0]
        line_coordinates = pole1_coordinates, pole2_coordinates
        l = LineString(line_coordinates)
        line = Line(
            id='%s-%s' % (pole1_id, pole2_id),
            geometry=l)
        line.pole1_id = pole1_id
        line.pole2_id = pole2_id
        line.length = l.length
        distribution_lines.append(line)
    return distribution_lines

distribution_lines = get_distribution_lines(drop_poles)
visualize_geometries(
    distribution_lines, drop_poles, colors=['orange', 'red'])
In [ ]:
from shapely.geometry import LineString, MultiPoint, Point

def get_distribution_poles(
        distribution_lines,
        maximum_distribution_pole_interval_in_meters):
    distribution_poles = []
    for distribution_line in distribution_lines:
        line = distribution_line.geometry
        line_length_in_meters = line.length
        distribution_pole_count = int(line_length_in_meters / float(
            maximum_distribution_pole_interval_in_meters))
        
        segment_count = distribution_pole_count + 1
        adjusted_interval_in_meters = line_length_in_meters / float(
            segment_count)
        
        def make_pole(index, point):
            return Pole(
                id='%s-%s-%s' % (
                    distribution_line.pole1_id,
                    distribution_line.pole2_id, index),
                geometry=point)
        
        for segment_index in range(1, segment_count):
            distribution_poles.append(make_pole(
                segment_index,
                line.interpolate(
                    adjusted_interval_in_meters * segment_index)))
    return distribution_poles

distribution_poles = get_distribution_poles(
    distribution_lines,
    maximum_distribution_pole_interval_in_meters)
visualize_geometries(
    distribution_poles, drop_poles, colors=['orange', 'red'])

Place Batteries (1st Implementation)

  • Minimize the number of batteries.
  • Minimize the distance of a battery to each service drop pole.
  • Limit the maximum distance of a battery to a service drop pole.
In [ ]:
def get_batteries(drop_poles, maximum_battery_line_length_in_meters):
    battery_polygons = get_source_polygons_with_connections(
        drop_poles, maximum_battery_line_length_in_meters)
    batteries = []
    for index, battery_polygon in enumerate(battery_polygons):
        connections = battery_polygon.connections
        battery_point = place_store(get_geometries(connections))
        battery = Battery(id=index, geometry=battery_point)
        battery.drop_poles = connections
        batteries.append(battery)
    return batteries

batteries = get_batteries(
    drop_poles, maximum_battery_line_length_in_meters)
visualize_geometries(
    distribution_lines, drop_poles, batteries, colors=[
        'orange', 'red', 'blue'])

Place Street Lamps

  • Minimize the summed distance of a customer to a street lamp over all customers.
  • Place a street lamp on selected poles to satisfy the above constraint.
  • Limit the maximum distance of a street lamp to a customer.
In [ ]:
poles = drop_poles + distribution_poles
In [ ]:
import numpy as np

def get_best_pole(
        customer_tree, poles, maximum_lamp_pole_distance_in_meters):
    best_metric = np.inf, np.inf
    best_pole = None
    for pole in poles:
        metric = compute_lamp_pole_metric(
            customer_tree, pole,
            maximum_lamp_pole_distance_in_meters)
        if metric < best_metric:
            best_metric = metric
            best_pole = pole
    return best_pole
In [ ]:
def compute_lamp_pole_metric(
        customer_tree, pole, maximum_lamp_pole_distance_in_meters):
    customer_count = len(customer_tree.data)
    pole_point = pole.geometry
    pole_xy = pole_point.x, pole_point.y
    distances, indices = customer_tree.query(
        pole_xy,
        k=customer_count,
        distance_upper_bound=maximum_lamp_pole_distance_in_meters)
    if not hasattr(distances, '__iter__'):
        distances = [distances]
        indices = [indices]
    distances = [x for x in distances if x < np.inf]
    indices = [x for x in indices if x != customer_count]
    return -len(indices), sum(distances)
In [ ]:
from copy import copy
from scipy.spatial import KDTree

def get_lamp_poles(
        customers, poles, maximum_lamp_pole_distance_in_meters):
    remaining_customers = copy(customers)
    remaining_poles = copy(poles)
    lamp_poles = []
    while remaining_customers:
        customer_tree = KDTree([(_.x, _.y) for _ in get_geometries(
            remaining_customers)])
        pole = get_best_pole(
            customer_tree,
            remaining_poles,
            maximum_lamp_pole_distance_in_meters)
        lamp_poles.append(pole)
        # Remove lamp pole from remaining_poles
        remaining_poles.remove(pole)
        # Remove satisfied customers from remaining_customers
        pole_point = pole.geometry
        pole_xy = pole_point.x, pole_point.y
        indices = customer_tree.query_ball_point(
            pole_xy,
            maximum_lamp_pole_distance_in_meters)
        for customer in np.array(remaining_customers)[indices]:
            remaining_customers.remove(customer)
    for pole in lamp_poles:
        pole.has_street_lamp = True
    return lamp_poles
In [ ]:
lamp_poles = get_lamp_poles(
    customers, poles, maximum_lamp_pole_distance_in_meters)
visualize_geometries(
    customers, lamp_poles, colors=['yellow', 'cyan'])

Place Equipment (1st implementation)

  • Estimate the demand in kWh/day for each battery.
  • Estimate the number of solar panels needed to satisfy the daily demand for each battery.
  • Place a solar panel on poles closest to a battery.
In [ ]:
# Estimate the demand in kWh/day for each battery
for battery in batteries:
    demand_in_kwh_per_day = 0
    for pole in battery.drop_poles:
        for customer in pole.customers:
            demand_in_kwh_per_day += customer.demand_in_kwh_per_day
    battery.demand_in_kwh_per_day = demand_in_kwh_per_day
In [ ]:
# Estimate the number of solar panels needed
# Place a solar panel on poles closest to a battery
import networkx as nx
from networkx.algorithms.shortest_paths import shortest_path_length
from scipy.spatial import KDTree

distribution_graph = nx.Graph()
for x in distribution_lines:
    distribution_graph.add_edge(x.pole1_id, x.pole2_id)
pole_by_id = {x.id: x for x in poles}    
    
def get_nearest_poles_on_distribution_graph(pole, k):
    packs = []
    for pole_id in distribution_graph:
        path_length = shortest_path_length(
            distribution_graph, pole_id, pole.id)
        packs.append((path_length, pole_id))
    sorted_packs = sorted(packs)
    selected_packs = sorted_packs[:k]
    return [
        pole_by_id[pole_id] for path_length, pole_id in selected_packs]

get_nearest_poles_on_distribution_graph(poles[0], 2)
In [ ]:
solar_poles = []
pole_tree = KDTree([_.xy for _ in poles])
for battery in batteries:
    solar_pole_count = \
        minimum_solar_pole_count_per_kwh * \
        battery.demand_in_kwh_per_day
    distance, index = pole_tree.query(battery.xy)
    battery_pole = poles[index]
    nearest_poles = get_nearest_poles_on_distribution_graph(
        battery_pole, solar_pole_count)
    for pole in nearest_poles:
        pole.has_solar_panel = True
        if pole not in solar_poles:
            solar_poles.append(pole)
    battery.solar_poles = nearest_poles
In [ ]:
visualize_geometries(
    distribution_lines, batteries, solar_poles, colors=[
        'orange', 'red', 'cyan'])

Decide Pole Types

  • Use pole type A when the angle of the incoming and outgoing distribution line is smaller than X degrees.
  • Use pole type A for solar panel poles.
  • Use pole type B otherwise.
In [ ]:
import numpy as np

def compute_angle(a_xy, b_xy, c_xy):
    # https://stackoverflow.com/a/31735642/192092
    b_xy = np.array(b_xy)
    angle1 = np.arctan2(*(a_xy - b_xy)[::-1])
    angle2 = np.arctan2(*(c_xy - b_xy)[::-1])
    angle_in_degrees = np.rad2deg(angle1 - angle2)
    if angle_in_degrees < 0:
        angle_in_degrees *= -1
    if angle_in_degrees > 180:
        angle_in_degrees = 360 - angle_in_degrees
    return angle_in_degrees
In [ ]:
def find_pole_lines(distribution_lines, pole):
    pole_lines = []
    for distribution_line in distribution_lines:
        pole1_id = distribution_line.pole1_id
        pole2_id = distribution_line.pole2_id
        if pole.id in (pole1_id, pole2_id):
            pole_lines.append(distribution_line)
    return pole_lines

def get_pole_angle_points(pole_lines):
    l1 = pole_lines[0].geometry
    l2 = pole_lines[1].geometry
    b_point = l1.intersection(l2)
    b_xy = b_point.x, b_point.y
    a_xy = get_other_endpoint_xy(l1, b_xy)
    c_xy = get_other_endpoint_xy(l2, b_xy)
    return a_xy, b_xy, c_xy

def get_other_endpoint_xy(line, endpoint_xy):
    endpoint_xys = list(line.coords)
    endpoint_xys.remove(endpoint_xy)
    return endpoint_xys[0]
In [ ]:
for pole in poles:
    pole_lines = find_pole_lines(distribution_lines, pole)
    if len(pole_lines) < 2:
        continue
    angle = compute_angle(*get_pole_angle_points(pole_lines))
    if angle < maximum_angle_for_pole_type_a:
        pole.has_corner_angle = True
        
for pole in poles:
    if pole.has_corner_angle or pole.has_solar_panel:
        pole.type_id = 'A'
    else:
        pole.type_id = 'B'

Save Poles, Batteries, Customers, Distribution Lines

In [ ]:
import utm
from os.path import join
from pandas import DataFrame

def get_lonlat(xy):
    return utm.to_latlon(*xy, zone_number, zone_letter)[::-1]
In [ ]:
target_path = join(target_folder, 'poles.csv')    
rows = []
for pole in poles:
    longitude, latitude = get_lonlat(pole.xy)
    rows.append([
        pole.id,
        pole.type_id,
        len(pole.customers or []),
        pole.has_solar_panel,
        pole.has_street_lamp,
        pole.has_corner_angle,
        longitude,
        latitude,
    ])
DataFrame(rows, columns=[
    'pole_id',
    'type_id',
    'customer_count',
    'has_solar_panel',
    'has_street_lamp',
    'has_corner_angle',
    'longitude',
    'latitude',
]).to_csv(target_path, index=False) 
print('pole_table_path = %s' % target_path)
In [ ]:
target_path = join(target_folder, 'batteries.csv')        
rows = []
for battery in batteries:
    longitude, latitude = get_lonlat(battery.xy)
    rows.append([
        battery.id,
        battery.demand_in_kwh_per_day,
        len(battery.solar_poles),
        longitude,
        latitude,
    ])
DataFrame(rows, columns=[
    'battery_id',
    'demand_in_kwh_per_day',
    'solar_panel_count',
    'longitude',
    'latitude',
]).to_csv(target_path, index=False)
print('battery_table_path = %s' % target_path)
In [ ]:
target_path = join(target_folder, 'customers.csv')        
rows = []
for customer in customers:
    point = customer.geometry.centroid
    customer_xy = point.x, point.y
    longitude, latitude = get_lonlat(customer_xy)
    rows.append([
        customer.id,
        customer.demand_in_kwh_per_day,
        customer.pole_id,
        customer.drop_line_length,
        longitude,
        latitude,
    ])
DataFrame(rows, columns=[
    'customer_id',
    'demand_in_kwh_per_day',
    'pole_id',
    'drop_line_length',
    'longitude',
    'latitude',
]).to_csv(target_path, index=False)
print('customer_table_path = %s' % target_path)
In [ ]:
target_path = join(target_folder, 'lines.csv')
rows = []
for distribution_line in distribution_lines:
    rows.append([
        distribution_line.pole1_id,
        distribution_line.pole2_id,
        distribution_line.length,
    ])
DataFrame(rows, columns=[
    'pole1_id',
    'pole2_id',
    'length',
]).to_csv(target_path, index=False)
print('line_table_path = %s' % target_path)
In [ ]:
target_path = join(target_folder, 'map.csv')
rows = []
for distribution_line in distribution_lines:
    l = distribution_line.geometry
    rows.append([
        'Distribution Line %s' % distribution_line.id,
        LineString([
            get_lonlat(l.coords[0]),
            get_lonlat(l.coords[1])]).wkt,
    ])
for customer in customers:
    p = customer.geometry.centroid
    customer_xy = p.x, p.y
    rows.append([
        'Customer %s' % customer.id,
        Point(get_lonlat(customer_xy)).wkt,
    ])
DataFrame(rows, columns=[
    'Description',
    'WKT',
]).to_csv(target_path, index=False)
print('map_geotable_path = %s' % target_path)

Proposed Solar Mini Grid Layout

{ map_geotable : Distribution Lines and Customers }

{ customer_table : Customers }

{ pole_table : Poles }

{ line_table : Distribution Lines }

{ battery_table : Batteries }