Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions yarn-project/foundation/src/collection/array.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
stdDev,
times,
unique,
uniqueBy,
variance,
} from './array.js';

Expand Down Expand Up @@ -143,6 +144,24 @@ describe('unique', () => {
});
});

describe('uniqueBy', () => {
it('keeps the first occurrence per key', () => {
const items = [
{ id: 'a', n: 1 },
{ id: 'b', n: 2 },
{ id: 'a', n: 3 },
];
expect(uniqueBy(items, x => x.id)).toEqual([
{ id: 'a', n: 1 },
{ id: 'b', n: 2 },
]);
});

it('returns an empty array for an empty input', () => {
expect(uniqueBy([], x => x)).toEqual([]);
});
});

describe('maxBy', () => {
it('returns the max value', () => {
expect(maxBy([1, 2, 3], x => x)).toEqual(3);
Expand Down
20 changes: 20 additions & 0 deletions yarn-project/foundation/src/collection/array.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,26 @@ export function unique<T>(arr: T[]): T[] {
return [...new Set(arr)];
}

/**
* Removes duplicates from the given array using a key function. The first occurrence of each key is kept.
* @param arr - The array.
* @param keyFn - A function that returns a primitive key for each element. Elements with the same key are
* considered duplicates.
* @returns A new array.
*/
export function uniqueBy<T, K extends string | number | bigint>(arr: T[], keyFn: (item: T) => K): T[] {
const seen = new Set<K>();
const result: T[] = [];
for (const item of arr) {
const key = keyFn(item);
if (!seen.has(key)) {
seen.add(key);
result.push(item);
}
}
return result;
}

/**
* Removes all undefined elements from the array.
* @param arr - The array.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ARCHIVE_HEIGHT, NOTE_HASH_TREE_HEIGHT } from '@aztec/constants';
import type { BlockNumber } from '@aztec/foundation/branded-types';
import { uniqueBy } from '@aztec/foundation/collection';
import { Aes128 } from '@aztec/foundation/crypto/aes128';
import { Fr } from '@aztec/foundation/curves/bn254';
import { Point } from '@aztec/foundation/curves/grumpkin';
Expand Down Expand Up @@ -28,7 +29,7 @@ import { MessageContext, deriveAppSiloedSharedSecret } from '@aztec/stdlib/logs'
import { getNonNullifiedL1ToL2MessageWitness } from '@aztec/stdlib/messaging';
import type { NoteStatus } from '@aztec/stdlib/note';
import { MerkleTreeId, type NullifierMembershipWitness, PublicDataWitness } from '@aztec/stdlib/trees';
import type { BlockHeader, Capsule, OffchainEffect } from '@aztec/stdlib/tx';
import type { BlockHeader, Capsule, IndexedTxEffect, OffchainEffect, TxHash } from '@aztec/stdlib/tx';

import { createContractLogger, logContractMessage, stripAztecnrLogPrefix } from '../../contract_logging.js';
import type { ContractSyncService } from '../../contract_sync/contract_sync_service.js';
Expand Down Expand Up @@ -639,36 +640,18 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra
eventValidationRequests: EventValidationRequest[],
scope: AztecAddress,
) {
const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockHeader, this.jobId);
const noteStorePromises = noteValidationRequests.map(request =>
noteService.validateAndStoreNote(
request.contractAddress,
request.owner,
request.storageSlot,
request.randomness,
request.noteNonce,
request.content,
request.noteHash,
request.nullifier,
request.txHash,
scope,
),
);
const txEffects = await this.#fetchTxEffects([
...noteValidationRequests.map(r => r.txHash),
...eventValidationRequests.map(r => r.txHash),
]);

const noteService = new NoteService(this.noteStore, this.aztecNode, this.anchorBlockHeader, this.jobId);
const eventService = new EventService(this.anchorBlockHeader, this.aztecNode, this.privateEventStore, this.jobId);
const eventStorePromises = eventValidationRequests.map(request =>
eventService.validateAndStoreEvent(
request.contractAddress,
request.eventTypeId,
request.randomness,
request.serializedEvent,
request.eventCommitment,
request.txHash,
scope,
),
);

await Promise.all([...noteStorePromises, ...eventStorePromises]);
await Promise.all([
noteService.validateAndStoreNotes(noteValidationRequests, scope, txEffects),
eventService.validateAndStoreEvents(eventValidationRequests, scope, txEffects),
]);
}

public async getLogsByTag(
Expand Down Expand Up @@ -976,6 +959,20 @@ export class UtilityExecutionOracle implements IMiscOracle, IUtilityExecutionOra
return this.offchainEffects;
}

/**
* Fetches tx effects for the given hashes in parallel, deduplicating repeated hashes so each tx is only requested
* once. Returns a map keyed by `TxHash.toString()`; hashes for which the node has no tx effect are omitted.
*/
async #fetchTxEffects(txHashes: TxHash[]): Promise<Map<string, IndexedTxEffect>> {
const uniqueTxHashes = uniqueBy(txHashes, h => h.toString());
const fetched = await Promise.all(uniqueTxHashes.map(h => this.aztecNode.getTxEffect(h)));
return new Map(
uniqueTxHashes
.map((h, i): [string, IndexedTxEffect | undefined] => [h.toString(), fetched[i]])
.filter((entry): entry is [string, IndexedTxEffect] => entry[1] !== undefined),
);
}

/** Runs a query concurrently with a validation that the block hash is not ahead of the anchor block. */
async #queryWithBlockHashNotAfterAnchor<T>(blockHash: BlockHash, query: () => Promise<T>): Promise<T> {
const [response] = await Promise.all([
Expand Down
37 changes: 20 additions & 17 deletions yarn-project/pxe/src/events/event_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import { type IndexedTxEffect, TxEffect } from '@aztec/stdlib/tx';

import { mock } from 'jest-mock-extended';

import { EventValidationRequest } from '../contract_function_simulator/noir-structs/event_validation_request.js';
import { PrivateEventStore } from '../storage/private_event_store/private_event_store.js';
import { EventService } from './event_service.js';

describe('validateAndStoreEvent', () => {
describe('validateAndStoreEvents', () => {
let blockNumber: BlockNumber;
let eventSelector: EventSelector;
let randomness: Fr;
Expand Down Expand Up @@ -66,12 +67,11 @@ describe('validateAndStoreEvent', () => {

/* Happy path context conditions:
** - PXE is sync'd to _at least_ block including tx
** - Node returns the corresponding tx effect and the tx effect includes the event commitment
** - Caller provides the corresponding tx effect via the prefetched map and the tx effect includes the event
** commitment.
*/
const anchorBlockHeader = makeBlockHeader(0, { blockNumber });

aztecNode.getTxEffect.mockImplementation(() => Promise.resolve(indexedTxEffect));

logger = mock<Logger>();
eventService = new EventService(anchorBlockHeader, aztecNode, privateEventStore, 'test', logger);
});
Expand All @@ -80,34 +80,33 @@ describe('validateAndStoreEvent', () => {
overrides: {
eventContent?: Fr[];
eventCommitment?: Fr;
txEffectsMap?: Map<string, IndexedTxEffect>;
} = {},
) {
await eventService.validateAndStoreEvent(
const request = new EventValidationRequest(
contractAddress,
eventSelector,
randomness,
overrides.eventContent || eventContent,
overrides.eventCommitment || eventCommitment,
overrides.eventContent ?? eventContent,
overrides.eventCommitment ?? eventCommitment,
txEffect.txHash,
recipient,
);

const map = overrides.txEffectsMap ?? defaultTxEffectsMap();
await eventService.validateAndStoreEvents([request], recipient, map);

await privateEventStore.commit('test');
}

it('should throw when tx does not exist or has no effects', async () => {
aztecNode.getTxEffect.mockImplementation(() => Promise.resolve(undefined));
await expect(runStoreEvent).rejects.toThrow(/Could not find tx effect for tx hash/);
const txEffectsMap = new Map();
await expect(() => runStoreEvent({ txEffectsMap })).rejects.toThrow(/Could not find tx effect for tx hash/);
});

it('should throw when tx block has not yet been synchronized', async () => {
indexedTxEffect = {
...indexedTxEffect,
l2BlockNumber: BlockNumber(blockNumber + 1),
};
aztecNode.getTxEffect.mockImplementation(() => Promise.resolve(indexedTxEffect));

await expect(runStoreEvent).rejects.toThrow(
const laterIndexedTxEffect = { ...indexedTxEffect, l2BlockNumber: BlockNumber(blockNumber + 1) };
const txEffectsMap = new Map([[txEffect.txHash.toString(), laterIndexedTxEffect]]);
await expect(() => runStoreEvent({ txEffectsMap })).rejects.toThrow(
/Obtained a newer tx effect for .* for an event validation request than the anchor block/,
);
});
Expand Down Expand Up @@ -159,4 +158,8 @@ describe('validateAndStoreEvent', () => {
expect(result.length).toEqual(1);
expect(result[0].packedEvent).toEqual(eventContent);
});

function defaultTxEffectsMap() {
return new Map([[txEffect.txHash.toString(), indexedTxEffect]]);
}
});
55 changes: 39 additions & 16 deletions yarn-project/pxe/src/events/event_service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { Fr } from '@aztec/foundation/curves/bn254';
import { createLogger } from '@aztec/foundation/log';
import type { EventSelector } from '@aztec/stdlib/abi';
import type { AztecAddress } from '@aztec/stdlib/aztec-address';
import { computePrivateEventCommitment, siloNullifier } from '@aztec/stdlib/hash';
import type { AztecNode } from '@aztec/stdlib/interfaces/server';
import type { BlockHeader, TxHash } from '@aztec/stdlib/tx';
import type { BlockHeader, IndexedTxEffect } from '@aztec/stdlib/tx';

import type { EventValidationRequest } from '../contract_function_simulator/noir-structs/event_validation_request.js';
import { PrivateEventStore } from '../storage/private_event_store/private_event_store.js';

export class EventService {
Expand All @@ -17,15 +16,43 @@ export class EventService {
private readonly log = createLogger('pxe:event_service'),
) {}

public async validateAndStoreEvent(
contractAddress: AztecAddress,
selector: EventSelector,
randomness: Fr,
content: Fr[],
eventCommitment: Fr,
txHash: TxHash,
/**
* Validates and stores a batch of private events against pre-fetched tx effects.
*
* @param requests - The events to validate and store.
* @param scope - The scope under which the events are being stored.
* @param txEffects - Pre-fetched tx effects keyed by `TxHash.toString()`. Must contain entries for every request's
* txHash; missing entries are treated as a node bug and cause an error.
*/
public async validateAndStoreEvents(
requests: EventValidationRequest[],
scope: AztecAddress,
txEffects: Map<string, IndexedTxEffect>,
): Promise<void> {
if (requests.length === 0) {
return;
}

const anchorBlockNumber = this.anchorBlockHeader.getBlockNumber();

await Promise.all(requests.map(req => this.#validateAndStoreEvent(req, scope, txEffects, anchorBlockNumber)));
}

async #validateAndStoreEvent(
request: EventValidationRequest,
scope: AztecAddress,
txEffects: Map<string, IndexedTxEffect>,
anchorBlockNumber: number,
): Promise<void> {
const {
contractAddress,
eventTypeId: selector,
randomness,
serializedEvent: content,
eventCommitment,
txHash,
} = request;

// Defense-in-depth: the built-in private-event path derives this commitment from content before enqueueing, but
// unconstrained PXE-side code (e.g. a custom message handler) can reach this oracle with arbitrary
// (content, commitment) pairs. Without this check it could bind arbitrary content to a legitimate tx nullifier,
Expand All @@ -42,13 +69,9 @@ export class EventService {
// (and thus we're less concerned about being ahead of the synced block), we use the synced block number to
// maintain consistent behavior in the PXE. Additionally, events should never be ahead of the synced block here
// since `fetchTaggedLogs` only processes logs up to the synced block.
const [siloedEventCommitment, txEffect] = await Promise.all([
siloNullifier(contractAddress, eventCommitment),
this.aztecNode.getTxEffect(txHash),
]);

const anchorBlockNumber = this.anchorBlockHeader.getBlockNumber();
const siloedEventCommitment = await siloNullifier(contractAddress, eventCommitment);

const txEffect = txEffects.get(txHash.toString());
if (!txEffect) {
// We error out instead of just logging a warning and skipping the event because this would indicate a bug. This
// is because the node has already served info about this tx either when obtaining the log (TxScopedL2Log contain
Expand Down
Loading
Loading