Skip to content
Draft
15 changes: 13 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2025 Status Research & Development GmbH. Licensed under
# Copyright (c) 2018-2026 Status Research & Development GmbH. Licensed under
# either of:
# - Apache License, version 2.0
# - MIT license
Expand Down Expand Up @@ -433,8 +433,19 @@ evmstate_test: | build deps evmstate
txparse: | build deps
$(ENV_SCRIPT) nim c $(NIM_PARAMS) "tools/txparse/$@.nim"

# build syncer debugging and analysis tools
SYNCER_TOOLS_DIR := tools/syncer
SYNCER_TOOLS := $(foreach name,trace inspect replay,syncer_test_client_$(name))
.PHONY: syncer-tools syncer-tools-clean $(SYNCER_TOOLS)
syncer-tools: $(SYNCER_TOOLS)
syncer-tools-clean:
rm -f $(foreach exe,$(SYNCER_TOOLS),build/$(exe))
$(SYNCER_TOOLS): | build deps rocksdb
echo -e $(BUILD_MSG) "build/$@"
$(ENV_SCRIPT) nim c $(NIM_PARAMS) -o:build/$@ "$(SYNCER_TOOLS_DIR)/$@.nim"

# usual cleaning
clean: | clean-common
clean: | clean-common syncer-tools-clean
rm -rf build/{nimbus,nimbus_execution_client,nimbus_portal_client,fluffy,portal_bridge,libverifproxy,nimbus_verified_proxy,$(TOOLS_CSV),$(PORTAL_TOOLS_CSV),all_tests,test_kvstore_rocksdb,test_rpc,all_portal_tests,all_history_network_custom_chain_tests,test_portal_testnet,utp_test_app,utp_test,*.dSYM}
rm -rf tools/t8n/{t8n,t8n_test}
rm -rf tools/evmstate/{evmstate,evmstate_test}
Expand Down
79 changes: 68 additions & 11 deletions execution_chain/sync/beacon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import
pkg/stew/[interval_set, sorted_set],
../core/chain,
../networking/p2p,
./beacon/worker/headers/headers_target,
./beacon/[beacon_desc, worker],
./beacon/worker/classify,
./beacon/worker/blocks/[blocks_fetch, blocks_import],
./beacon/worker/headers/[headers_fetch, headers_target],
./beacon/worker/[classify, update],
./[sync_sched, wire_protocol]

export
Expand Down Expand Up @@ -49,34 +50,64 @@ proc addBeaconSyncProtocol(desc: BeaconSyncRef; PROTO: type) =

desc.addSyncProtocol(PROTO, acceptPeer, initWorker)

# ------------------------------------------------------------------------------
# Interceptable handlers
# ------------------------------------------------------------------------------

proc schedDaemonCB(
ctx: BeaconCtxRef;
): Future[Duration]
{.async: (raises: []).} =
return worker.runDaemon(ctx, "Daemon") # async/template

proc schedStartCB(buddy: BeaconPeerRef): bool =
return worker.start(buddy, "Start")

proc schedStopCB(buddy: BeaconPeerRef) =
worker.stop(buddy, "Stop")

proc schedPoolCB(buddy: BeaconPeerRef; last: bool; laps: int): bool =
return worker.runPool(buddy, last, laps, "SyncMode")

proc schedPeerCB(
buddy: BeaconPeerRef;
rank: PeerRanking;
): Future[Duration]
{.async: (raises: []).} =
return worker.runPeer(buddy, rank, "Peer") # async/template

proc noOpFn(buddy: BeaconPeerRef) = discard
proc noOpEx(self: BeaconHandlersSyncRef) = discard

# ------------------------------------------------------------------------------
# Virtual methods/interface, `mixin` functions
# ------------------------------------------------------------------------------

proc runSetup(ctx: BeaconCtxRef): bool =
worker.setup(ctx, "Setup")
return worker.setup(ctx, "Setup")

proc runRelease(ctx: BeaconCtxRef) =
worker.release(ctx, "Release")

proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
return worker.runDaemon(ctx, "Daemon")

proc runTicker(ctx: BeaconCtxRef) =
worker.runTicker(ctx, "Ticker")


proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
return await ctx.handler.schedDaemon(ctx)

proc runStart(buddy: BeaconPeerRef): bool =
worker.start(buddy, "Start")
return buddy.ctx.handler.schedStart(buddy)

proc runStop(buddy: BeaconPeerRef) =
worker.stop(buddy, "Stop")
buddy.ctx.handler.schedStop(buddy)

proc runPool(buddy: BeaconPeerRef; last: bool; laps: int): bool =
worker.runPool(buddy, last, laps, "SyncMode")
return buddy.ctx.handler.schedPool(buddy, last, laps)

proc runPeer(buddy: BeaconPeerRef): Future[Duration] {.async: (raises: []).} =
let rank = buddy.classifyForFetching()
return worker.runPeer(buddy, rank, "Peer")
return await buddy.ctx.handler.schedPeer(buddy, rank)

# ------------------------------------------------------------------------------
# Public functions
Expand Down Expand Up @@ -124,6 +155,25 @@ proc config*(

desc.ctx.pool.chain = chain

# Set up handlers so they can be overlayed
desc.ctx.pool.handlers = BeaconHandlersSyncRef(
version: 0,
activate: updateActivateCB,
suspend: updateSuspendCB,
schedDaemon: schedDaemonCB,
schedStart: schedStartCB,
schedStop: schedStopCB,
schedPool: schedPoolCB,
schedPeer: schedPeerCB,
getBlockHeaders: getBlockHeadersCB,
syncBlockHeaders: noOpFn,
getBlockBodies: getBlockBodiesCB,
syncBlockBodies: noOpFn,
importBlock: importBlockCB,
syncImportBlock: noOpFn,
startSync: noOpEx,
stopSync: noOpEx)

if not desc.lazyConfigHook.isNil:
desc.lazyConfigHook(desc)
desc.lazyConfigHook = nil
Expand All @@ -149,14 +199,21 @@ proc start*(desc: BeaconSyncRef; standBy = false): bool =
doAssert not desc.ctx.isNil
let save = desc.ctx.pool.standByMode
desc.ctx.pool.standByMode = standBy # the ticker sees this when starting
if desc.startSync(standBy):
if desc.isRunning:
if desc.startSync(standBy):
return true
elif desc.startSync(standBy):
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
w.startSync(w)
return true
desc.ctx.pool.standByMode = save
# false

proc stop*(desc: BeaconSyncRef) {.async.} =
doAssert not desc.ctx.isNil
if desc.isRunning:
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
w.stopSync(w)
await desc.stopSync()
desc.ctx.pool.reset # also clears stand-by mode

Expand Down
8 changes: 7 additions & 1 deletion execution_chain/sync/beacon/beacon_desc.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Copyright (c) 2023-2026 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
Expand All @@ -25,4 +25,10 @@ type
## Instance descriptor, extends scheduler object
lazyConfigHook*: BeaconSyncConfigHook

BeaconHandlersSyncRef* = ref object of BeaconHandlersRef
## Add start/stop helpers to function list. By default, this functiona
## are no-ops.
startSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}
stopSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}

# End
19 changes: 17 additions & 2 deletions execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,27 @@ import
../../../../networking/p2p,
../../../wire_protocol/types,
../[helpers, update, worker_desc],
./[blocks_fetch, blocks_helpers, blocks_import, blocks_unproc]
./[blocks_fetch, blocks_helpers, blocks_unproc]

# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------

template importBlock(
buddy: BeaconPeerRef;
blk: EthBlock;
effPeerID: Hash;
): Result[Duration,BeaconError] =
## Async/template
##
## Wrapper around `importBlock()` handler
##
let
ctx = buddy.ctx
rc = await ctx.handler.importBlock(buddy, blk, effPeerID)
ctx.handler.syncImportBlock(buddy) # debugging, trace, replay
rc

proc getNthHash(ctx: BeaconCtxRef; blocks: seq[EthBlock]; n: int): Hash32 =
ctx.hdrCache.getHash(blocks[n].header.number).valueOr:
return zeroHash32
Expand Down Expand Up @@ -208,7 +223,7 @@ template blocksImport*(

for n in 0 ..< blocks.len:
let nthBn = blocks[n].header.number
discard (await buddy.importBlock(blocks[n], peerID)).valueOr:
buddy.importBlock(blocks[n], peerID).isOkOr:
if error.excp != ECancelledError:
isError = true

Expand Down
24 changes: 18 additions & 6 deletions execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Copyright (c) 2023-2026 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
Expand All @@ -19,9 +19,21 @@ import
./blocks_helpers

# ------------------------------------------------------------------------------
# Private helpers
# Private helper
# -----------------------------------------------------------------------------

template getBlockBodies(
buddy: BeaconPeerRef;
req: BlockBodiesRequest;
): Result[FetchBodiesData,BeaconError] =
## Async/template
##
## Wrapper around `getBlockBodies()` handler
##
let rc = await buddy.ctx.handler.getBlockBodies(buddy, req)
buddy.ctx.handler.syncBlockBodies(buddy) # debugging, sync, replay
rc

proc maybeSlowPeerError(
buddy: BeaconPeerRef;
elapsed: Duration;
Expand Down Expand Up @@ -51,10 +63,10 @@ func errStr(rc: Result[FetchBodiesData,BeaconError]): string =
result = "n/a"

# ------------------------------------------------------------------------------
# Private function(s)
# ------------------------------------------------------------------------------
# Public handler
# -----------------------------------------------------------------------------

proc getBlockBodies(
proc getBlockBodiesCB*(
buddy: BeaconPeerRef;
req: BlockBodiesRequest;
): Future[Result[FetchBodiesData,BeaconError]]
Expand Down Expand Up @@ -117,7 +129,7 @@ template fetchBodies*(
trace sendInfo, peer, startHash=startHash.short, nReq,
nErrors=buddy.nErrors.fetch.bdy

let rc = await buddy.getBlockBodies(request)
let rc = buddy.getBlockBodies(request)
var elapsed: Duration
if rc.isOk:
elapsed = rc.value.elapsed
Expand Down
9 changes: 6 additions & 3 deletions execution_chain/sync/beacon/worker/blocks/blocks_import.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Copyright (c) 2023-2026 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
Expand All @@ -16,11 +16,14 @@ import
../../../wire_protocol,
../worker_desc

logScope:
topics = "beacon sync"

# ------------------------------------------------------------------------------
# Public function
# Public handler
# ------------------------------------------------------------------------------

proc importBlock*(
proc importBlockCB*(
buddy: BeaconPeerRef;
blk: EthBlock;
effPeerID: Hash;
Expand Down
21 changes: 17 additions & 4 deletions execution_chain/sync/beacon/worker/headers/headers_fetch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,20 @@ import

# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
# -----------------------------------------------------------------------------

template getBlockHeaders(
buddy: BeaconPeerRef;
req: BlockHeadersRequest;
bn: BlockNumber;
): Result[FetchHeadersData,BeaconError] =
## Async/template
##
## Wrapper around `getBlockHeaders()` handler
##
let rc = await buddy.ctx.handler.getBlockHeaders(buddy, req, bn)
buddy.ctx.handler.syncBlockHeaders(buddy) # debugging, sync, replay
rc

proc maybeSlowPeerError(
buddy: BeaconPeerRef;
Expand Down Expand Up @@ -51,10 +64,10 @@ func errStr(rc: Result[FetchHeadersData,BeaconError]): string =
result = "n/a"

# ------------------------------------------------------------------------------
# Private function(s)
# Public Handler
# ------------------------------------------------------------------------------

proc getBlockHeaders(
proc getBlockHeadersCB*(
buddy: BeaconPeerRef;
req: BlockHeadersRequest;
bn: BlockNumber;
Expand Down Expand Up @@ -129,7 +142,7 @@ template fetchHeadersReversed*(
trace sendInfo & " reverse", peer, req=($ivReq), nReq=req.maxResults, hash,
state=($buddy.syncState), nErrors=buddy.nErrors.fetch.hdr

let rc = await buddy.getBlockHeaders(req, BlockNumber ivReq.maxPt)
let rc = buddy.getBlockHeaders(req, BlockNumber ivReq.maxPt)
var elapsed: Duration
if rc.isOk:
elapsed = rc.value.elapsed
Expand Down
8 changes: 4 additions & 4 deletions execution_chain/sync/beacon/worker/start_stop.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Copyright (c) 2023-2026 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
Expand All @@ -14,7 +14,7 @@ import
pkg/[chronicles, chronos, eth/common, metrics],
../../../networking/p2p,
../../wire_protocol,
./[blocks, headers, update, worker_desc]
./[blocks, headers, worker_desc]

type
SyncStateData = tuple
Expand Down Expand Up @@ -67,8 +67,8 @@ proc setupServices*(ctx: BeaconCtxRef; info: static[string]) =

# Set up the notifier informing when a new syncer session has started.
ctx.hdrCache.start proc() =
# Activates the syncer. Work will be picked up by peers when available.
ctx.updateActivateSyncer()
# This directive captures `ctx` for calling the activation handler.
ctx.handler.activate(ctx)

# Provide progress info call back handler
ctx.pool.chain.com.beaconSyncerProgress = proc(): SyncStateData =
Expand Down
Loading
Loading