Skip to content

Automated Compacting Streams #4049

@ewilliams0305

Description

@ewilliams0305

I have a use case for some very long lived streams with LOTS of events. (IOT device monitoring application.)

This issue proposes a way to add IHostedService instances for specific streams indended to compact the streams periodically. Below is a very rough draft of some code I whipped up in our application to accompish this. Should the Marten team feel this is something that is sutable directly intigrated in Marten I'm happy to make a PR referencing this issue.

Final API (Usage)

Here we would be saying every day check all streams of type DeviceStatus for compacting. If the stream includes events older than 30 days, run the compacting.

builder.Services.AddMarten(options =>
            {
                options.Projections.Snapshot<DeviceStatus>(SnapshotLifecycle.Inline);
            })
            .AddStreamCompactor<DeviceStatus>(ops =>
            {
                ops.Frequency = TimeSpan.FromDays(1);
                ops.Strategy = new AgeCompactingStrategy(TimeSpan.FromDays(30));
            });

Compacting Option

Configuration options could include something like this that is used to define how and when this should run.

public abstract record CompactingStrategy;

public sealed record AgeCompactingStrategy(TimeSpan EventsOlderThan) : CompactingStrategy;

public sealed record VersionCompactingStrategy(long BelowVersion) : CompactingStrategy;

public sealed record CompactingOptions<TStream>
{
    public TimeSpan Frequency { get; set; } = TimeSpan.FromDays(1);

    public TimeOnly ExecutionTime { get; set; } = TimeOnly.MinValue; 
    
    public IEventsArchiver? Archiver { get; set; }

    public CompactingStrategy Strategy { get; set; } = new VersionCompactingStrategy(1000);
    
    public bool ShouldArchiveStream(TStream stream) => true;
    
    public Guid StreamIdSelector(TStream stream) => Guid.Parse(typeof(TStream)
        .GetProperty("Id")?
        .GetValue(stream)?
        .ToString() ?? throw new InvalidOperationException("No default ID located, Please provide a stream ID selector"));
}

This should probably directly use or update the StreamCompactingRequest<T> class.
I would also allow the caller to define a stream state delegate to determin if the stream should be compacted.

Hosted Service

Hosted Background Service

The hosted service would look something like this

I'm still playing around with some different looping strategies. Its also probably a god idea to page through the streams with some sort of strategy.

public sealed class EventStreamCompactor<TStream> : BackgroundService where TStream : class 
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly TimeProvider _timeProvider;
    private readonly CompactingOptions<TStream> _options;
    
    public EventStreamCompactor(IServiceScopeFactory scopeFactory, TimeProvider timeProvider, Action<CompactingOptions<TStream>>? optionsAction = null)
    {
        _scopeFactory = scopeFactory;
        _timeProvider = timeProvider;

        _options = new CompactingOptions<TStream>();
        optionsAction?.Invoke(_options);
    }

    /// <inheritdoc />
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await using var scope = _scopeFactory.CreateAsyncScope();
            var session = scope.ServiceProvider.GetRequiredService<IDocumentSession>();

            var streams = await session.Query<TStream>().ToListAsync(token: stoppingToken);

            foreach (var stream in streams)         
            {
                if (!_options.ShouldArchiveStream(stream))
                {
                    continue;
                }

                await CompactStream(session, _options.StreamIdSelector(stream), stoppingToken, _options.Archiver);
            }
            
            // Not sure if this is the correct approach.
            // I would almost prefer a fast loop check a specific time.
            // Starting this at startup could be VERY slow.
            // And waiting after the loop also isn't great.
            await Task.Delay(_options.Frequency, stoppingToken);
        }
    }

    private async Task CompactStream(IDocumentSession session, Guid streamId, CancellationToken cancellation, IEventsArchiver? archiver)
    {
        await session.Events.CompactStreamAsync<TStream>(streamId, x =>
        {
            switch (_options.Strategy)
            {
                case AgeCompactingStrategy age:
                    x.Timestamp = _timeProvider.GetUtcNow().Subtract(age.EventsOlderThan);
                    break;
                case VersionCompactingStrategy version:
                    x.Version = version.BelowVersion;
                    break;
            }

            x.Archiver = archiver;
            x.CancellationToken = cancellation;
        });
    }
}

Register the Hosted Service using the Marten Expression

public static class EventStreamCompactorServiceExtensions
{
    public static MartenServiceCollectionExtensions.MartenConfigurationExpression AddStreamCompactor<TStream>(
        this MartenServiceCollectionExtensions.MartenConfigurationExpression builder,
        Action<CompactingOptions<TStream>>? optionsAction = null) 
        where TStream : class
    {
        builder.Services.AddHostedService<EventStreamCompactor<TStream>>(sp =>
        {
            var scopeFactory = sp.GetRequiredService<IServiceScopeFactory>();
            var timeProvider = sp.GetRequiredService<TimeProvider>();
            return new EventStreamCompactor<TStream>(scopeFactory, timeProvider, optionsAction);
        });

        return builder;
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions