Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@
GossipsubMessageEvent,
)
from lean_spec.subspecs.networking.gossipsub.parameters import GossipsubParameters
from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic, TopicKind
from lean_spec.subspecs.networking.gossipsub.topic import (
ForkMismatchError,
GossipTopic,
TopicKind,
)
from lean_spec.subspecs.networking.peer import PeerInfo
from lean_spec.subspecs.networking.reqresp.handler import (
REQRESP_PROTOCOL_IDS,
BlockLookup,
Expand Down Expand Up @@ -347,13 +352,15 @@ def decode_message(
Raises:
GossipMessageError: If the message cannot be decoded.
"""
# Step 1: Parse topic to determine message type.
# Step 1: Parse topic and validate fork compatibility.
#
# The topic string contains the fork digest and message kind.
# Invalid topics are rejected before any decompression work.
# This prevents wasting CPU on malformed messages.
# Invalid topics and wrong-fork messages are rejected before decompression.
# This prevents wasting CPU on malformed or incompatible messages.
try:
topic = GossipTopic.from_string(topic_str)
topic = GossipTopic.from_string_validated(topic_str, self.fork_digest)
except ForkMismatchError:
raise # Re-raise ForkMismatchError without wrapping
except ValueError as e:
raise GossipMessageError(f"Invalid topic: {e}") from e

Expand Down Expand Up @@ -399,10 +406,13 @@ def get_topic(self, topic_str: str) -> GossipTopic:
Parsed GossipTopic.

Raises:
ForkMismatchError: If the topic's fork_digest doesn't match.
GossipMessageError: If the topic is invalid.
"""
try:
return GossipTopic.from_string(topic_str)
return GossipTopic.from_string_validated(topic_str, self.fork_digest)
except ForkMismatchError:
raise # Re-raise ForkMismatchError without wrapping
except ValueError as e:
raise GossipMessageError(f"Invalid topic: {e}") from e

Expand Down Expand Up @@ -617,6 +627,12 @@ class LiveNetworkEventSource:
Supports both yamux (TCP) and QUIC connection types.
"""

_peer_info: dict[PeerId, PeerInfo] = field(default_factory=dict)
"""Cache of peer information including status and ENR.

Populated after status exchange. Used for fork compatibility checks.
"""

_our_status: Status | None = None
"""Our current chain status for handshakes.

Expand Down Expand Up @@ -737,6 +753,18 @@ def set_block_lookup(self, lookup: BlockLookup) -> None:
"""
self._reqresp_handler.block_lookup = lookup

def get_peer_info(self, peer_id: PeerId) -> PeerInfo | None:
"""
Get cached peer info.

Args:
peer_id: Peer identifier.

Returns:
PeerInfo if cached, None otherwise.
"""
return self._peer_info.get(peer_id)

def subscribe_gossip_topic(self, topic: str) -> None:
"""
Subscribe to a gossip topic.
Expand Down Expand Up @@ -805,6 +833,8 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None:

logger.debug("Processed gossipsub message %s from %s", topic.kind.value, event.peer_id)

except ForkMismatchError as e:
logger.warning("Rejected gossip from wrong fork: %s", e)
except GossipMessageError as e:
logger.warning("Failed to process gossipsub message: %s", e)

Expand Down Expand Up @@ -1041,6 +1071,12 @@ async def _exchange_status(
peer_status = await self.reqresp_client.send_status(peer_id, self._our_status)

if peer_status is not None:
# Cache peer's status for fork compatibility checks.
if peer_id not in self._peer_info:
self._peer_info[peer_id] = PeerInfo(peer_id=peer_id)
self._peer_info[peer_id].status = peer_status
self._peer_info[peer_id].update_last_seen()

await self._events.put(PeerStatusEvent(peer_id=peer_id, status=peer_status))
logger.debug(
"Received status from %s: head=%s",
Expand Down Expand Up @@ -1089,6 +1125,7 @@ async def disconnect(self, peer_id: PeerId) -> None:
peer_id: Peer to disconnect.
"""
conn = self._connections.pop(peer_id, None)
self._peer_info.pop(peer_id, None) # Clean up peer info cache
if conn is not None:
self.reqresp_client.unregister_connection(peer_id)
await conn.close()
Expand Down
54 changes: 51 additions & 3 deletions src/lean_spec/subspecs/networking/discovery/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Iterator
from typing import TYPE_CHECKING, Iterator

from lean_spec.subspecs.networking.types import NodeId, SeqNumber
from lean_spec.subspecs.networking.types import ForkDigest, NodeId, SeqNumber

from .config import BUCKET_COUNT, K_BUCKET_SIZE
from .messages import Distance

if TYPE_CHECKING:
from lean_spec.subspecs.networking.enr import ENR


def xor_distance(a: NodeId, b: NodeId) -> int:
"""
Expand Down Expand Up @@ -130,6 +133,9 @@ class NodeEntry:
verified: bool = False
"""True if node has responded to at least one PING. Required for relay."""

enr: ENR | None = None
"""Full ENR record. Contains fork data for compatibility checks."""


@dataclass
class KBucket:
Expand Down Expand Up @@ -250,6 +256,12 @@ class RoutingTable:
Organizes nodes into 256 k-buckets by XOR distance from the local node.
Bucket i contains nodes with log2(distance) == i + 1.

Fork Filtering
--------------
When local_fork_digest is set, only peers with matching fork_digest
in their eth2 ENR data are accepted. This prevents storing peers
on different forks.

Lookup Algorithm
----------------
A 'lookup' locates the k closest nodes to a target ID. The initiator
Expand All @@ -274,6 +286,9 @@ class RoutingTable:
buckets: list[KBucket] = field(default_factory=lambda: [KBucket() for _ in range(BUCKET_COUNT)])
"""256 k-buckets indexed by log2 distance minus one."""

local_fork_digest: ForkDigest | None = None
"""Our fork_digest for filtering incompatible peers. None disables filtering."""

def bucket_index(self, node_id: NodeId) -> int:
"""
Get bucket index for a node ID.
Expand All @@ -293,19 +308,52 @@ def get_bucket(self, node_id: NodeId) -> KBucket:
"""Get the k-bucket containing nodes at this distance."""
return self.buckets[self.bucket_index(node_id)]

def is_fork_compatible(self, entry: NodeEntry) -> bool:
"""
Check if a node entry is fork-compatible.

If local_fork_digest is set, the entry must have an ENR with
eth2 data containing the same fork_digest.

Args:
entry: Node entry to check.

Returns:
- True if compatible or filtering disabled,
- False if fork_digest mismatch or missing eth2 data.
"""
if self.local_fork_digest is None:
return True

if entry.enr is None:
return False

eth2_data = entry.enr.eth2_data
if eth2_data is None:
return False

return eth2_data.fork_digest == self.local_fork_digest

def add(self, entry: NodeEntry) -> bool:
"""
Add a node to the routing table.

Rejects nodes that are on incompatible forks when fork filtering
is enabled (local_fork_digest is set).

Args:
entry: Node entry to add.

Returns:
- True if added/updated,
- False if bucket full or adding self.
- False if bucket full, adding self, or fork incompatible.
"""
if entry.node_id == self.local_id:
return False

if not self.is_fork_compatible(entry):
return False

return self.get_bucket(entry.node_id).add(entry)

def remove(self, node_id: NodeId) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions src/lean_spec/subspecs/networking/gossipsub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
BLOCK_TOPIC_NAME,
ENCODING_POSTFIX,
TOPIC_PREFIX,
ForkMismatchError,
GossipTopic,
TopicKind,
format_topic_string,
Expand All @@ -116,6 +117,7 @@
"ENCODING_POSTFIX",
"BLOCK_TOPIC_NAME",
"ATTESTATION_TOPIC_NAME",
"ForkMismatchError",
"format_topic_string",
"parse_topic_string",
# Parameters
Expand Down
58 changes: 58 additions & 0 deletions src/lean_spec/subspecs/networking/gossipsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@
from dataclasses import dataclass
from enum import Enum


class ForkMismatchError(ValueError):
"""Raised when a topic's fork_digest does not match the expected value."""

def __init__(self, expected: str, actual: str) -> None:
"""Initialize with expected and actual fork digests."""
self.expected = expected
self.actual = actual
super().__init__(f"Fork mismatch: expected {expected}, got {actual}")


TOPIC_PREFIX: str = "leanconsensus"
"""Network prefix for Lean consensus gossip topics.

Expand Down Expand Up @@ -146,6 +157,31 @@ def __bytes__(self) -> bytes:
"""
return str(self).encode("utf-8")

def validate_fork(self, expected_fork_digest: str) -> None:
"""
Validate that the topic's fork_digest matches expected.

Args:
expected_fork_digest: Expected fork digest (0x-prefixed hex).

Raises:
ForkMismatchError: If fork_digest does not match.
"""
if self.fork_digest != expected_fork_digest:
raise ForkMismatchError(expected_fork_digest, self.fork_digest)

def is_fork_compatible(self, expected_fork_digest: str) -> bool:
"""
Check if this topic is compatible with the expected fork.

Args:
expected_fork_digest: Expected fork digest (0x-prefixed hex).

Returns:
True if fork_digest matches, False otherwise.
"""
return self.fork_digest == expected_fork_digest

@classmethod
def from_string(cls, topic_str: str) -> GossipTopic:
"""Parse a topic string into a GossipTopic.
Expand Down Expand Up @@ -183,6 +219,28 @@ def from_string(cls, topic_str: str) -> GossipTopic:

return cls(kind=kind, fork_digest=fork_digest)

@classmethod
def from_string_validated(cls, topic_str: str, expected_fork_digest: str) -> GossipTopic:
"""Parse a topic string and validate fork compatibility.

Combines parsing and fork validation into a single operation.
Use this when receiving gossip messages to reject wrong-fork topics early.

Args:
topic_str: Full topic string to parse.
expected_fork_digest: Expected fork digest (0x-prefixed hex).

Returns:
Parsed GossipTopic instance.

Raises:
ValueError: If the topic string is malformed.
ForkMismatchError: If fork_digest does not match expected.
"""
topic = cls.from_string(topic_str)
topic.validate_fork(expected_fork_digest)
return topic

@classmethod
def block(cls, fork_digest: str) -> GossipTopic:
"""Create a block topic for the given fork.
Expand Down
42 changes: 36 additions & 6 deletions src/lean_spec/subspecs/networking/peer/info.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
"""Peer Information"""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import IntEnum, auto
from time import time
from typing import TYPE_CHECKING

from ..transport import PeerId
from ..types import ConnectionState, Multiaddr

if TYPE_CHECKING:
from ..enr import ENR
from ..reqresp import Status


class Direction(IntEnum):
"""
Expand All @@ -27,14 +34,16 @@ class Direction(IntEnum):
@dataclass
class PeerInfo:
"""
Minimal information about a known peer.
Information about a known peer.

Tracks identity, connection state, and cached protocol data.

The enr and status fields cache fork data from discovery and handshake:

Tracks only the essential data needed to manage peer connections:
identity, connection state, direction, and last activity timestamp.
- enr: Populated from discovery, contains eth2 fork_digest
- status: Populated after Status handshake, contains finalized/head checkpoints

This is intentionally minimal - additional fields (scoring, subnet
subscriptions, protocol metadata) can be added as features are
implemented.
These cached values enable fork compatibility checks at multiple layers.
"""

peer_id: PeerId
Expand All @@ -52,10 +61,31 @@ class PeerInfo:
last_seen: float = field(default_factory=time)
"""Unix timestamp of last successful interaction."""

enr: ENR | None = None
"""Cached ENR from discovery. Contains eth2 fork_digest for compatibility checks."""

status: Status | None = None
"""Cached Status from handshake. Contains finalized/head checkpoints."""

def is_connected(self) -> bool:
"""Check if peer has an active connection."""
return self.state == ConnectionState.CONNECTED

def update_last_seen(self) -> None:
"""Update the last seen timestamp to now."""
self.last_seen = time()

@property
def fork_digest(self) -> bytes | None:
"""
Get the peer's fork_digest from cached ENR.

Returns:
4-byte fork_digest or None if ENR/eth2 data unavailable.
"""
if self.enr is None:
return None
eth2_data = self.enr.eth2_data
if eth2_data is None:
return None
return bytes(eth2_data.fork_digest)
Loading
Loading