Un'Implementazione di Grafi Spaziali Neurale per l'Inferenza Funzionale Urbana utilizzando city2graph, OSMnx e PyTorch Geometric
La tecnologia oggi permette di esprimere nuove possibilità nell'analisi e nell'apprendimento da dati spaziali, grazie alla crescente applicazione dei modelli di Artificial Intelligence e Machine Learning. In questo tutorial, viene sviluppata una pipeline di apprendimento su grafi spaziali end-to-end utilizzando la libreria city2graph. L’obiettivo è costruire un modello che preveda le funzioni di un punto d'interesse (POI) urbano basandosi sulla struttura spaziale.
Premessa
Il tutorial copre diverse fasi: inizialmente i dati POI urbani vengono raccolti da OpenStreetMap. Successivamente vengono ingegnerizzate le funzionalità (featurizzato), costruiti diversi tipi di grafi di prossimità, e create strutture grafiche eterogenee e omogenee. Tutti questi dati vengono convertiti in un formato compatibile con PyTorch Geometric, e infine viene addestrato un modello GraphSAGE per prevedere le categorie POI.
Installazione di city2graph e Importazione delle Librerie
Prima di iniziare, è necessario installare e importare le librerie utilizzate nel tutorial. Il codice mostra come eseguire l'installazione automaticamente, inclusa la verifica della disponibilità di city2graph e di PyTorch Geometric per garantire la corretta esecuzione del workflow.
Il settaggio di un seed casuale assicura la riproducibilità del modello durante la costruzione del grafo e l'addestramento in generale.
!pip -q install "city2graph[cpu]" osmnx contextily scikit-learn 2>/dev/null
import warnings, numpy as np, pandas as pd, geopandas as gpd
warnings.filterwarnings("ignore")
from shapely.geometry import Point
import matplotlib.pyplot as plt
import city2graph as c2g
print("city2graph version:", getattr(c2g, "_version_", "unknown"))
print("PyTorch / PyG available:", c2g.istorchavailable())
import torch
import torch.nn.functional as F
from torchgeometric.nn import SAGEConv, tohetero
from torchgeometric.utils import toundirected
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import accuracyscore, f1score
from sklearn.decomposition import PCA
SEED = 42
np.random.seed(SEED); torch.manual_seed(SEED)
Raccolta dei Dati POI da OpenStreetMap
Il tutorial raccolta dati reali da OpenStreetMap, concentrandosi sull'area di Shibuya, Tokyo. Vengono raccolti dati per diverse categorie di interesse urbano, come ristoranti, negozi, scuole, ecc. Vengono scaricate anche le informazioni della rete stradale a piedi. Nel caso in cui OpenStreetMap non risponda, il tutorial include un set di dati sintetici generati per non compromettere la funzionalità del codice.
CENTER = (35.6595, 139.7005)
DIST_M = 1100
TAG_QUERIES = {
"food": {"amenity": ["restaurant", "cafe", "fast_food", "bar", "pub"]},
"retail": {"shop": True},
"education": {"amenity": ["school", "university", "college", "kindergarten", "library"]},
"health": {"amenity": ["hospital", "clinic", "pharmacy", "doctors", "dentist"]},
}
def to_points(gdf):
g = gdf.copy()
g["geometry"] = g.geometry.representative_point()
return g
poigdf, segmentsgdf = None, None
try:
import osmnx as ox
ox.settings.use_cache = True
ox.settings.log_console = False
frames = []
for label, tags in TAG_QUERIES.items():
try:
f = ox.featuresfrompoint(CENTER, tags=tags, dist=DIST_M)
f = f[f.geometry.notna()]
if len(f):
f = to_points(f)[["geometry"]].copy()
f["category"] = label
frames.append(f)
except Exception as e:
print(f" (skip {label}: {e})")
if not frames:
raise RuntimeError("No POIs returned from Overpass.")
poigdf = gpd.GeoDataFrame(pd.concat(frames, ignoreindex=True), crs="EPSG:4326")
G = ox.graphfrompoint(CENTER, dist=DISTM, networktype="walk")
segmentsgdf = ox.graphtogdfs(G, nodes=False, edges=True).resetindex(drop=True)[["geometry"]]
print(f"OSM acquisition OK -> {len(poigdf)} POIs, {len(segmentsgdf)} street segments")
except Exception as e:
print(f"OSM unavailable ({e}) -> generating synthetic clustered POIs.")
rng = np.random.default_rng(SEED)
cats = list(TAG_QUERIES.keys())
centers = rng.uniform(-0.01, 0.01, size=(8, 2)) + np.array(CENTER[::-1])
rows = []
for ci, c in enumerate(centers):
dom = cats[ci % len(cats)]
n = rng.integers(40, 90)
pts = c + rng.normal(0, 0.0016, size=(n, 2))
for (lon, lat) in pts:
cat = dom if rng.random() < 0.75 else rng.choice(cats)
rows.append({"geometry": Point(lon, lat), "category": cat})
poi_gdf = gpd.GeoDataFrame(rows, crs="EPSG:4326")
segments_gdf = None
print(f"Synthetic dataset -> {len(poi_gdf)} POIs")
if len(poi_gdf) > 700:
poigdf = poigdf.sample(700, randomstate=SEED).resetindex(drop=True)
metriccrs = poigdf.estimateutmcrs()
poigdf = poigdf.tocrs(metriccrs).reset_index(drop=True)
if segments_gdf is not None:
segmentsgdf = segmentsgdf.tocrs(metriccrs)
print("Class balance:\n", poigdf["category"].valuecounts())
Ingegnerizzazione delle Feature Spaziali
Ogni punti di interesse è dotato di coordinate, calco di densità locale e distanza dalla strada più vicina. Viene anche assegnata una funzione per assegnare un identificativo a ciascuna categoria, necessario per il modello.
poigdf["cx"] = poigdf.geometry.x
poigdf["cy"] = poigdf.geometry.y
coords = poigdf[["cx", "cy"]].tonumpy()
nn = NearestNeighbors(radius=150.0).fit(coords)
poigdf["localdensity"] = [len(idx) - 1 for idx in nn.radiusneighbors(coords, returndistance=False)]
if segmentsgdf is not None and len(segmentsgdf):
try:
joined = gpd.sjoinnearest(poigdf[["geometry"]], segments_gdf[["geometry"]],
distancecol="diststreet")
poigdf["diststreet"] = joined.groupby(level=0)["diststreet"].min().reindex(poigdf.index).fillna(0.0)
except Exception:
poi_g