diff --git a/v2/p2p/p2p.go b/v2/p2p/p2p.go index 2c04371..a362a09 100644 --- a/v2/p2p/p2p.go +++ b/v2/p2p/p2p.go @@ -3,6 +3,7 @@ package p2p import ( "context" "fmt" + "slices" "sync" "time" @@ -131,6 +132,8 @@ func (p *P2P) PeerIDs() []PeerID { ids = append(ids, id) } + slices.Sort(ids) + return ids } @@ -266,3 +269,22 @@ func (p *P2P) MessageInfo(peerID PeerID, content string) (map[string]any, error) return info, nil } + +// PeerLog returns a copy of the log entries for the specified peer, allowing for inspection of message flow and events. +func (p *P2P) PeerLog(peerID PeerID, content string) (map[string][]logEntry, error) { + peer := p.peers[peerID] + + if peer == nil { + return nil, fmt.Errorf("peer %s not found", peerID) + } + + peer.mu.Lock() + defer peer.mu.Unlock() + + logCopy := make(map[string][]logEntry) + for k, v := range peer.log { + logCopy[k] = append([]logEntry(nil), v...) + } + + return logCopy, nil +} diff --git a/v2/p2p/peer.go b/v2/p2p/peer.go index 3dee3bf..a69102d 100644 --- a/v2/p2p/peer.go +++ b/v2/p2p/peer.go @@ -21,6 +21,8 @@ type peer struct { mu sync.Mutex // mutex to protect access to the peer's state alive bool // indicates whether the peer is active in the network + + log map[string][]logEntry } // edge represents a connection from one node to another in the P2P network. @@ -29,6 +31,15 @@ type edge struct { networkLatency float64 // latency for a message sent from this peer to the target peer, in milliseconds } +type logEntry struct { + ID PeerID `json:"id"` // ID of the peer + Timestamp string `json:"timestamp"` // timestamp of the log entry + Type string `json:"type"` // type of log entry (e.g., "recv", "send") + From PeerID `json:"from"` // ID of the sender peer + To PeerID `json:"to"` // ID of the target peer + First bool `json:"first"` // indicates if this is the first time the message is seen +} + // newPeer creates a new Node with the given ID and node latency. func newPeer(id PeerID, nodeLatency float64) *peer { return &peer{ @@ -43,18 +54,25 @@ func newPeer(id PeerID, nodeLatency float64) *peer { msgQueue: make(chan Message, 1000), mu: sync.Mutex{}, + + log: make(map[string][]logEntry), } } // eachRun starts the message handling routine for the peer. func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { go func(ctx context.Context, wg *sync.WaitGroup) { + p.mu.Lock() p.alive = true + p.mu.Unlock() + wg.Done() select { case <-ctx.Done(): + p.mu.Lock() p.alive = false + p.mu.Unlock() return default: for msg := range p.msgQueue { @@ -66,17 +84,39 @@ func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { } p.recvFrom[msg.Content][msg.From] = struct{}{} + if _, ok := p.log[msg.Content]; !ok { + p.log[msg.Content] = make([]logEntry, 0) + } + if _, ok := p.seenAt[msg.Content]; !ok { p.seenAt[msg.Content] = time.Now() p.firstFrom[msg.Content] = msg.From first = true + + p.log[msg.Content] = append(p.log[msg.Content], logEntry{ + ID: p.id, + Timestamp: timestamp(), + Type: "recv", + From: msg.From, + To: p.id, + First: true, + }) + } else { + p.log[msg.Content] = append(p.log[msg.Content], logEntry{ + ID: p.id, + Timestamp: timestamp(), + Type: "recv", + From: msg.From, + To: p.id, + First: false, + }) } p.mu.Unlock() if first { go func(msg Message) { - time.Sleep(time.Duration(p.processingLatency) * time.Millisecond) - p.eachPublish(network, msg) + currentTime := time.Now() + p.eachPublish(network, msg, currentTime) }(msg) } } @@ -85,7 +125,7 @@ func (p *peer) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { } // eachPublish sends the message to neighbors, excluding 'exclude' and already-sent targets. -func (p *peer) eachPublish(network *P2P, msg Message) { +func (p *peer) eachPublish(network *P2P, msg Message, start time.Time) { content := msg.Content protocol := msg.Protocol hopCount := msg.HopCount @@ -128,13 +168,36 @@ func (p *peer) eachPublish(network *P2P, msg Message) { } } + delay := time.Duration(p.processingLatency * float64(time.Millisecond)) + if remain := delay - time.Since(start); remain > 0 { + time.Sleep(remain) + } + for _, e := range willSendEdges { edgeCopy := e p.sentTo[content][e.targetID] = struct{}{} + if _, ok := p.log[msg.Content]; !ok { + p.log[msg.Content] = make([]logEntry, 0) + } + + p.log[content] = append(p.log[content], logEntry{ + ID: p.id, + Timestamp: timestamp(), + Type: "send", + From: p.id, + To: e.targetID, + First: false, + }) + go func(e edge) { time.Sleep(time.Duration(e.networkLatency) * time.Millisecond) + targetPeer, ok := network.peers[e.targetID] + if !ok || targetPeer == nil { + return + } + network.peers[e.targetID].msgQueue <- Message{ Publisher: msg.Publisher, From: p.id, @@ -155,3 +218,8 @@ func (p *peer) eachStop() { p.alive = false close(p.msgQueue) } + +// timestamp returns the current time formatted as a string for logging purposes. +func timestamp() string { + return time.Now().Format("2006-01-02 15:04:05.000") +} diff --git a/v2/p2p/protocol.go b/v2/p2p/protocol.go index 85e805b..8ffa8bd 100644 --- a/v2/p2p/protocol.go +++ b/v2/p2p/protocol.go @@ -52,9 +52,12 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP targets = append(targets, neighbor) } - gossipFactor, ok := broadcastParams["gossip_factor"].(float64) - if !ok { - gossipFactor = 0.5 // default gossip factor + gossipFactor, ok1 := broadcastParams["gossip_factor"].(float64) + gossipNode, ok2 := broadcastParams["gossip_node"].(int) + + if !ok1 && !ok2 { + ok1 = true + gossipFactor = 0.5 } if len(targets) > 0 { @@ -62,8 +65,13 @@ var Gossip ProtocolFunc = func(id PeerID, msg Message, neighbors []PeerID, sentP targets[i], targets[j] = targets[j], targets[i] }) - k := int(float64(len(targets)) * gossipFactor) - targets = targets[:k] + if ok1 { + k := int(float64(len(targets)) * gossipFactor) + targets = targets[:k] + } else if ok2 { + k := gossipNode + targets = targets[:k] + } } return &targets