Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions v2/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"context"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -131,6 +132,8 @@ func (p *P2P) PeerIDs() []PeerID {
ids = append(ids, id)
}

slices.Sort(ids)

return ids
}

Expand Down Expand Up @@ -266,3 +269,22 @@ func (p *P2P) MessageInfo(peerID PeerID, content string) (map[string]any, error)

return info, nil
}

// PeerLog returns a copy of the log entries for the specified peer, allowing for inspection of message flow and events.
func (p *P2P) PeerLog(peerID PeerID, content string) (map[string][]logEntry, error) {
peer := p.peers[peerID]

if peer == nil {
return nil, fmt.Errorf("peer %s not found", peerID)
}

peer.mu.Lock()
defer peer.mu.Unlock()

logCopy := make(map[string][]logEntry)
for k, v := range peer.log {
logCopy[k] = append([]logEntry(nil), v...)
}

return logCopy, nil
}
74 changes: 71 additions & 3 deletions v2/p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type peer struct {
mu sync.Mutex // mutex to protect access to the peer's state

alive bool // indicates whether the peer is active in the network

log map[string][]logEntry
}

// edge represents a connection from one node to another in the P2P network.
Expand All @@ -29,6 +31,15 @@ type edge struct {
networkLatency float64 // latency for a message sent from this peer to the target peer, in milliseconds
}

type logEntry struct {
ID PeerID `json:"id"` // ID of the peer
Timestamp string `json:"timestamp"` // timestamp of the log entry
Type string `json:"type"` // type of log entry (e.g., "recv", "send")
From PeerID `json:"from"` // ID of the sender peer
To PeerID `json:"to"` // ID of the target peer
First bool `json:"first"` // indicates if this is the first time the message is seen
}

// newPeer creates a new Node with the given ID and node latency.
func newPeer(id PeerID, nodeLatency float64) *peer {
return &peer{
Expand All @@ -43,18 +54,25 @@ func newPeer(id PeerID, nodeLatency float64) *peer {

msgQueue: make(chan Message, 1000),
mu: sync.Mutex{},

log: make(map[string][]logEntry),
}
}

// eachRun starts the message handling routine for the peer.
func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) {
go func(ctx context.Context, wg *sync.WaitGroup) {
p.mu.Lock()
p.alive = true
p.mu.Unlock()

wg.Done()

select {
case <-ctx.Done():
p.mu.Lock()
p.alive = false
p.mu.Unlock()
return
default:
for msg := range p.msgQueue {
Expand All @@ -66,17 +84,39 @@ func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) {
}
p.recvFrom[msg.Content][msg.From] = struct{}{}

if _, ok := p.log[msg.Content]; !ok {
p.log[msg.Content] = make([]logEntry, 0)
}

if _, ok := p.seenAt[msg.Content]; !ok {
p.seenAt[msg.Content] = time.Now()
p.firstFrom[msg.Content] = msg.From
first = true

p.log[msg.Content] = append(p.log[msg.Content], logEntry{
ID: p.id,
Timestamp: timestamp(),
Type: "recv",
From: msg.From,
To: p.id,
First: true,
})
} else {
p.log[msg.Content] = append(p.log[msg.Content], logEntry{
ID: p.id,
Timestamp: timestamp(),
Type: "recv",
From: msg.From,
To: p.id,
First: false,
})
}
p.mu.Unlock()

if first {
go func(msg Message) {
time.Sleep(time.Duration(p.processingLatency) * time.Millisecond)
p.eachPublish(network, msg)
currentTime := time.Now()
p.eachPublish(network, msg, currentTime)
}(msg)
}
}
Expand All @@ -85,7 +125,7 @@ func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) {
}

// eachPublish sends the message to neighbors, excluding 'exclude' and already-sent targets.
func (p *peer) eachPublish(network *P2P, msg Message) {
func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) {
content := msg.Content
protocol := msg.Protocol
hopCount := msg.HopCount
Expand Down Expand Up @@ -128,13 +168,36 @@ func (p *peer) eachPublish(network *P2P, msg Message) {
}
}

delay := time.Duration(p.processingLatency * float64(time.Millisecond))
if remain := delay - time.Since(start); remain > 0 {
time.Sleep(remain)
}

for _, e := range willSendEdges {
edgeCopy := e
p.sentTo[content][e.targetID] = struct{}{}

if _, ok := p.log[msg.Content]; !ok {
p.log[msg.Content] = make([]logEntry, 0)
}

p.log[content] = append(p.log[content], logEntry{
ID: p.id,
Timestamp: timestamp(),
Type: "send",
From: p.id,
To: e.targetID,
First: false,
})

go func(e edge) {
time.Sleep(time.Duration(e.networkLatency) * time.Millisecond)

targetPeer, ok := network.peers[e.targetID]
if !ok || targetPeer == nil {
return
}

network.peers[e.targetID].msgQueue <- Message{
Publisher: msg.Publisher,
From: p.id,
Expand All @@ -155,3 +218,8 @@ func (p *peer) eachStop() {
p.alive = false
close(p.msgQueue)
}

// timestamp returns the current time formatted as a string for logging purposes.
func timestamp() string {
return time.Now().Format("2006-01-02 15:04:05.000")
}
18 changes: 13 additions & 5 deletions v2/p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,26 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP
targets = append(targets, neighbor)
}

gossipFactor, ok := broadcastParams["gossip_factor"].(float64)
if !ok {
gossipFactor = 0.5 // default gossip factor
gossipFactor, ok1 := broadcastParams["gossip_factor"].(float64)
gossipNode, ok2 := broadcastParams["gossip_node"].(int)

if !ok1 && !ok2 {
ok1 = true
gossipFactor = 0.5
}

if len(targets) > 0 {
rand.Shuffle(len(targets), func(i, j int) {
targets[i], targets[j] = targets[j], targets[i]
})

k := int(float64(len(targets)) * gossipFactor)
targets = targets[:k]
if ok1 {
k := int(float64(len(targets)) * gossipFactor)
targets = targets[:k]
} else if ok2 {
k := gossipNode
targets = targets[:k]
}
}

return &targets
Expand Down
Loading