Skip to content

Commit 65c19d8

Browse files
committed
Merge branch 'main' into feat/event-manager
2 parents d7e9f00 + 38005f4 commit 65c19d8

File tree

5 files changed

+169
-48
lines changed

5 files changed

+169
-48
lines changed

PowerSync/PowerSync.Common/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ foreach (PowerSyncDBEvents.StatusChangedEvent update in listener)
2525
}
2626
```
2727

28+
- Pool read connections in `MDSQLiteAdapter`, improving performance in any case where multiple queries run simultaneously (eg. via `Watch`). The number of connections can be set via `MDSQLiteOptions.ReadPoolSize` and defaults to 5.
2829
- Updated to the latest version (0.4.11) of the core extension.
2930
- `MDSQLiteConnection` now runs query operations on another thread, which stops the caller thread from blocking.
3031
- Removed the `RunListener` and `RunListenerAsync` APIs from `IEventStream`. Users are encouraged to use the `Listen` or `ListenAsync` APIs instead (`RunListener` itself was implemented using the `Listen` API).

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ public async Task Close()
520520
ConnectionManager.Close();
521521
BucketStorageAdapter?.Close();
522522

523-
Database.Close();
523+
await Database.Close();
524524
Closed = true;
525525

526526
Events.Emit(new PowerSyncDBEvents.ClosedEvent());

PowerSync/PowerSync.Common/DB/IDBAdapter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,12 @@ public static string[] ExtractTableUpdates(INotification update)
123123
}
124124
}
125125

126-
public interface IDBAdapter : ILockContext, ICloseable
126+
public interface IDBAdapter : ILockContext, ICloseableAsync
127127
{
128128
/// <summary>
129129
/// Closes the adapter.
130130
/// </summary>
131-
new void Close();
131+
new Task Close();
132132

133133
/// <summary>
134134
/// The name of the adapter.

PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs

Lines changed: 155 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
namespace PowerSync.Common.MDSQLite;
22

33
using System;
4+
using System.Collections.Generic;
5+
using System.Threading.Channels;
46
using System.Threading.Tasks;
57

68
using Microsoft.Data.Sqlite;
@@ -15,7 +17,6 @@ public class MDSQLiteAdapterOptions()
1517
public string Name { get; set; } = null!;
1618

1719
public MDSQLiteOptions? SqliteOptions;
18-
1920
}
2021

2122
public class MDSQLiteAdapter : IDBAdapter
@@ -24,28 +25,29 @@ public class MDSQLiteAdapter : IDBAdapter
2425

2526
public DBAdapterEvents Events { get; } = new();
2627

27-
public MDSQLiteConnection? writeConnection;
28-
public MDSQLiteConnection? readConnection;
28+
// One writer
29+
private MDSQLiteConnection writeConnection = null!;
30+
private readonly AsyncLock writeMutex = new();
31+
32+
// Many readers
33+
private MDSQLiteConnectionPool readPool = null!;
2934

3035
private readonly Task initialized;
3136

3237
protected MDSQLiteAdapterOptions options;
3338

34-
protected RequiredMDSQLiteOptions resolvedMDSQLiteOptions;
39+
protected RequiredMDSQLiteOptions resolvedOptions;
3540
private CancellationTokenSource? tablesUpdatedCts;
3641
private Task? tablesUpdatedTask;
3742

38-
private readonly AsyncLock writeMutex = new();
39-
private readonly AsyncLock readMutex = new();
40-
4143
public MDSQLiteAdapter(MDSQLiteAdapterOptions options)
4244
{
4345
this.options = options;
44-
resolvedMDSQLiteOptions = resolveMDSQLiteOptions(options.SqliteOptions);
46+
resolvedOptions = ResolveMDSQLiteOptions(options.SqliteOptions);
4547
initialized = Init();
4648
}
4749

48-
private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options)
50+
private RequiredMDSQLiteOptions ResolveMDSQLiteOptions(MDSQLiteOptions? options)
4951
{
5052
var defaults = RequiredMDSQLiteOptions.DEFAULT_SQLITE_OPTIONS;
5153
return new RequiredMDSQLiteOptions
@@ -57,28 +59,26 @@ private RequiredMDSQLiteOptions resolveMDSQLiteOptions(MDSQLiteOptions? options)
5759
TemporaryStorage = options?.TemporaryStorage ?? defaults.TemporaryStorage,
5860
LockTimeoutMs = options?.LockTimeoutMs ?? defaults.LockTimeoutMs,
5961
EncryptionKey = options?.EncryptionKey ?? defaults.EncryptionKey,
60-
Extensions = options?.Extensions ?? defaults.Extensions
62+
Extensions = options?.Extensions ?? defaults.Extensions,
63+
ReadPoolSize = options?.ReadPoolSize ?? defaults.ReadPoolSize,
6164
};
6265
}
6366

6467
private async Task Init()
6568
{
66-
writeConnection = await OpenConnection(options.Name);
67-
readConnection = await OpenConnection(options.Name);
68-
6969
string[] baseStatements =
7070
[
71-
$"PRAGMA busy_timeout = {resolvedMDSQLiteOptions.LockTimeoutMs}",
72-
$"PRAGMA cache_size = -{resolvedMDSQLiteOptions.CacheSizeKb}",
73-
$"PRAGMA temp_store = {resolvedMDSQLiteOptions.TemporaryStorage}"
71+
$"PRAGMA busy_timeout = {resolvedOptions.LockTimeoutMs}",
72+
$"PRAGMA cache_size = -{resolvedOptions.CacheSizeKb}",
73+
$"PRAGMA temp_store = {resolvedOptions.TemporaryStorage}"
7474
];
7575

7676
string[] writeConnectionStatements =
7777
[
7878
.. baseStatements,
79-
$"PRAGMA journal_mode = {resolvedMDSQLiteOptions.JournalMode}",
80-
$"PRAGMA journal_size_limit = {resolvedMDSQLiteOptions.JournalSizeLimit}",
81-
$"PRAGMA synchronous = {resolvedMDSQLiteOptions.Synchronous}",
79+
$"PRAGMA journal_mode = {resolvedOptions.JournalMode}",
80+
$"PRAGMA journal_size_limit = {resolvedOptions.JournalSizeLimit}",
81+
$"PRAGMA synchronous = {resolvedOptions.Synchronous}",
8282
];
8383

8484
string[] readConnectionStatements =
@@ -87,20 +87,32 @@ private async Task Init()
8787
"PRAGMA query_only = true",
8888
];
8989

90+
91+
// Prepare write connection
92+
writeConnection = await OpenConnection(options.Name);
9093
foreach (var statement in writeConnectionStatements)
9194
{
9295
await writeConnection!.Execute(statement);
9396
}
9497

95-
foreach (var statement in readConnectionStatements)
98+
// Prepare read pool and create connection factory
99+
Func<Task<MDSQLiteConnection>> readConnectionFactory = async () =>
96100
{
97-
await readConnection!.Execute(statement);
98-
}
101+
var readConnection = await OpenConnection(options.Name);
102+
foreach (var statement in readConnectionStatements)
103+
{
104+
await readConnection.Execute(statement);
105+
}
106+
return readConnection;
107+
};
108+
readPool = new MDSQLiteConnectionPool(resolvedOptions, readConnectionFactory);
109+
await readPool.Init();
99110

111+
// Register TablesUpdated listener
100112
tablesUpdatedCts = new CancellationTokenSource();
101113
tablesUpdatedTask = Task.Run(async () =>
102114
{
103-
await foreach (var notification in writeConnection!.ListenAsync(tablesUpdatedCts.Token))
115+
await foreach (var notification in writeConnection.ListenAsync(tablesUpdatedCts.Token))
104116
{
105117
if (notification.TablesUpdated != null)
106118
{
@@ -123,7 +135,8 @@ protected async Task<MDSQLiteConnection> OpenConnection(string dbFilename)
123135

124136
private static SqliteConnection OpenDatabase(string dbFilename)
125137
{
126-
var connection = new SqliteConnection($"Data Source={dbFilename}");
138+
string connectionString = $"Data Source={dbFilename};Pooling=False;";
139+
var connection = new SqliteConnection(connectionString);
127140
connection.Open();
128141
return connection;
129142
}
@@ -135,12 +148,12 @@ protected virtual void LoadExtension(SqliteConnection db)
135148
db.LoadExtension(extensionPath, "sqlite3_powersync_init");
136149
}
137150

138-
public void Close()
151+
public async Task Close()
139152
{
140153
tablesUpdatedCts?.Cancel();
141-
try { tablesUpdatedTask?.Wait(TimeSpan.FromSeconds(2)); } catch { }
154+
try { tablesUpdatedTask?.Wait(2000); } catch { /* expected */ }
142155
writeConnection?.Close();
143-
readConnection?.Close();
156+
await readPool.Close();
144157
Events.Close();
145158
}
146159

@@ -186,20 +199,13 @@ public async Task<dynamic> Get(string sql, object?[]? parameters = null)
186199

187200
public async Task<T> ReadTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOptions? options = null)
188201
{
189-
return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction(readConnection!)!, fn));
202+
return await ReadLock((ctx) => InternalTransaction(new MDSQLiteTransaction((MDSQLiteConnection)ctx), fn));
190203
}
191204

192205
public async Task<T> ReadLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions? options = null)
193206
{
194207
await initialized;
195-
196-
T result;
197-
using (await readMutex.LockAsync())
198-
{
199-
result = await fn(readConnection!);
200-
}
201-
202-
return result;
208+
return await readPool.Lease(fn);
203209
}
204210

205211
public async Task WriteLock(Func<ILockContext, Task> fn, DBLockOptions? options = null)
@@ -208,10 +214,10 @@ public async Task WriteLock(Func<ILockContext, Task> fn, DBLockOptions? options
208214

209215
using (await writeMutex.LockAsync())
210216
{
211-
await fn(writeConnection!);
217+
await fn(writeConnection);
212218
}
213219

214-
writeConnection!.FlushUpdates();
220+
writeConnection.FlushUpdates();
215221

216222
}
217223

@@ -222,22 +228,22 @@ public async Task<T> WriteLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions?
222228
T result;
223229
using (await writeMutex.LockAsync())
224230
{
225-
result = await fn(writeConnection!);
231+
result = await fn(writeConnection);
226232
}
227233

228-
writeConnection!.FlushUpdates();
234+
writeConnection.FlushUpdates();
229235

230236
return result;
231237
}
232238

233239
public async Task WriteTransaction(Func<ITransaction, Task> fn, DBLockOptions? options = null)
234240
{
235-
await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn));
241+
await WriteLock(ctx => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn));
236242
}
237243

238244
public async Task<T> WriteTransaction<T>(Func<ITransaction, Task<T>> fn, DBLockOptions? options = null)
239245
{
240-
return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection!)!, fn));
246+
return await WriteLock((ctx) => InternalTransaction(new MDSQLiteTransaction(writeConnection), fn));
241247
}
242248

243249
protected static async Task InternalTransaction(
@@ -285,8 +291,113 @@ private static async Task RunTransaction(
285291
public async Task RefreshSchema()
286292
{
287293
await initialized;
288-
await writeConnection!.RefreshSchema();
289-
await readConnection!.RefreshSchema();
294+
await writeConnection.RefreshSchema();
295+
await readPool.LeaseAll(async (connections) =>
296+
{
297+
foreach (var conn in connections) await conn.RefreshSchema();
298+
});
299+
}
300+
}
301+
302+
class MDSQLiteConnectionPool
303+
{
304+
private readonly RequiredMDSQLiteOptions _options;
305+
private readonly Channel<MDSQLiteConnection> _channel;
306+
private readonly int _poolSize;
307+
private readonly Func<Task<MDSQLiteConnection>> _connectionFactory;
308+
309+
private readonly Task _initialized;
310+
311+
public MDSQLiteConnectionPool(RequiredMDSQLiteOptions options, Func<Task<MDSQLiteConnection>> connectionFactory)
312+
{
313+
_options = options;
314+
_channel = Channel.CreateBounded<MDSQLiteConnection>(options.ReadPoolSize);
315+
_poolSize = options.ReadPoolSize;
316+
_connectionFactory = connectionFactory;
317+
_initialized = Initialize();
318+
}
319+
320+
public async Task Init() => await _initialized;
321+
322+
private async Task Initialize()
323+
{
324+
for (int i = 0; i < _poolSize; i++)
325+
{
326+
var connection = await _connectionFactory();
327+
await _channel.Writer.WriteAsync(connection);
328+
}
329+
}
330+
331+
public async Task<T> Lease<T>(Func<MDSQLiteConnection, Task<T>> callback)
332+
{
333+
await _initialized;
334+
var connection = await _channel.Reader.ReadAsync();
335+
try
336+
{
337+
return await callback(connection);
338+
}
339+
finally
340+
{
341+
await _channel.Writer.WriteAsync(connection);
342+
}
343+
}
344+
345+
public async Task LeaseAll(Func<List<MDSQLiteConnection>, Task> callback)
346+
{
347+
await _initialized;
348+
var connections = new List<MDSQLiteConnection>(_poolSize);
349+
for (int i = 0; i < _poolSize; i++)
350+
{
351+
connections.Add(await _channel.Reader.ReadAsync());
352+
}
353+
354+
try
355+
{
356+
await callback(connections);
357+
}
358+
finally
359+
{
360+
foreach (var conn in connections)
361+
{
362+
_channel.Writer.TryWrite(conn);
363+
}
364+
}
365+
}
366+
367+
private async Task<MDSQLiteConnection> OpenConnection(string dbFilename)
368+
{
369+
var db = OpenDatabase(dbFilename);
370+
LoadExtension(db);
371+
372+
var connection = new MDSQLiteConnection(new MDSQLiteConnectionOptions(db));
373+
await connection.Execute("SELECT powersync_init()");
374+
375+
return connection;
376+
}
377+
378+
private static SqliteConnection OpenDatabase(string dbFilename)
379+
{
380+
string connectionString = $"Data Source={dbFilename};Pooling=False;";
381+
var connection = new SqliteConnection(connectionString);
382+
connection.Open();
383+
return connection;
384+
}
385+
386+
private void LoadExtension(SqliteConnection db)
387+
{
388+
string extensionPath = PowerSyncPathResolver.GetNativeLibraryPath(AppContext.BaseDirectory);
389+
db.EnableExtensions(true);
390+
db.LoadExtension(extensionPath, "sqlite3_powersync_init");
391+
}
392+
393+
public async Task Close()
394+
{
395+
await LeaseAll((connections) =>
396+
{
397+
foreach (var conn in connections) conn.Close();
398+
return Task.CompletedTask;
399+
});
400+
_channel.Writer.TryComplete();
290401
}
291402
}
292403

PowerSync/PowerSync.Common/MDSQLite/MDSQLiteOptions.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ public class MDSQLiteOptions
102102
/// Load extensions using the path and entryPoint.
103103
/// </summary>
104104
public SqliteExtension[]? Extensions { get; set; }
105+
106+
/// <summary>
107+
/// The number of MDSQLiteConnection objects to create for the read pool.
108+
/// </summary>
109+
public int? ReadPoolSize { get; set; }
105110
}
106111

107112
public class RequiredMDSQLiteOptions : MDSQLiteOptions
@@ -115,7 +120,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions
115120
TemporaryStorage = TemporaryStorageOption.MEMORY,
116121
LockTimeoutMs = 30000,
117122
EncryptionKey = null,
118-
Extensions = []
123+
Extensions = [],
124+
ReadPoolSize = 5,
119125
};
120126

121127
public new SqliteJournalMode JournalMode { get; set; } = null!;
@@ -131,5 +137,8 @@ public class RequiredMDSQLiteOptions : MDSQLiteOptions
131137
public new TemporaryStorageOption TemporaryStorage { get; set; } = null!;
132138

133139
public new int CacheSizeKb { get; set; }
140+
134141
public new SqliteExtension[] Extensions { get; set; } = null!;
142+
143+
public new int ReadPoolSize { get; set; }
135144
}

0 commit comments

Comments
 (0)