Skip to content

Commit 21d12dc

Browse files
authored
Avoid reading partial lines on unexpected EOF (#341)
1 parent c47e417 commit 21d12dc

File tree

3 files changed

+31
-14
lines changed

3 files changed

+31
-14
lines changed

common/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ import com.powersync.utils.JsonParam
3131
import com.powersync.utils.JsonUtil
3232
import io.kotest.matchers.collections.shouldHaveSingleElement
3333
import io.kotest.matchers.collections.shouldHaveSize
34+
import io.kotest.matchers.nulls.shouldNotBeNull
3435
import io.kotest.matchers.shouldBe
3536
import io.kotest.matchers.shouldNotBe
3637
import io.kotest.matchers.string.shouldContain
3738
import io.ktor.http.ContentType
39+
import io.ktor.utils.io.core.toByteArray
3840
import kotlinx.coroutines.CompletableDeferred
3941
import kotlinx.coroutines.DelicateCoroutinesApi
4042
import kotlinx.coroutines.Dispatchers
43+
import kotlinx.coroutines.channels.Channel
4144
import kotlinx.coroutines.withContext
4245
import kotlinx.serialization.json.jsonObject
4346
import kotlinx.serialization.json.jsonPrimitive
@@ -1056,4 +1059,24 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
10561059
turbine.cancelAndIgnoreRemainingEvents()
10571060
}
10581061
}
1062+
1063+
@Test
1064+
fun `reconnects on unexpected end of response`() =
1065+
databaseTest {
1066+
turbineScope(timeout = 10.0.seconds) {
1067+
val turbine = database.currentStatus.asFlow().testIn(this)
1068+
database.connect(TestConnector(), options = getOptions())
1069+
turbine.waitFor { it.connected }
1070+
1071+
syncLines.send("unterminated line".toByteArray())
1072+
syncLines.close()
1073+
turbine.waitFor { !it.connected }
1074+
database.currentStatus.downloadError shouldNotBeNull {
1075+
}
1076+
1077+
syncLines = Channel()
1078+
turbine.waitFor { it.connected }
1079+
turbine.cancelAndIgnoreRemainingEvents()
1080+
}
1081+
}
10591082
}

common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import com.powersync.utils.JsonUtil
2020
import io.ktor.client.HttpClient
2121
import io.ktor.client.HttpClientConfig
2222
import io.ktor.client.call.body
23-
import io.ktor.client.plugins.pluginOrNull
2423
import io.ktor.client.request.accept
2524
import io.ktor.client.request.get
2625
import io.ktor.client.request.headers
@@ -36,19 +35,17 @@ import io.ktor.http.contentType
3635
import io.ktor.utils.io.ByteReadChannel
3736
import io.ktor.utils.io.readAvailable
3837
import io.ktor.utils.io.readBuffer
39-
import io.ktor.utils.io.readUTF8Line
38+
import io.ktor.utils.io.readLineStrict
4039
import io.rsocket.kotlin.RSocketError
4140
import kotlinx.coroutines.CancellationException
4241
import kotlinx.coroutines.CompletableDeferred
4342
import kotlinx.coroutines.CoroutineName
4443
import kotlinx.coroutines.CoroutineScope
45-
import kotlinx.coroutines.Dispatchers
4644
import kotlinx.coroutines.Job
4745
import kotlinx.coroutines.NonCancellable
4846
import kotlinx.coroutines.cancelAndJoin
4947
import kotlinx.coroutines.channels.BufferOverflow
5048
import kotlinx.coroutines.channels.Channel
51-
import kotlinx.coroutines.channels.ProducerScope
5249
import kotlinx.coroutines.coroutineScope
5350
import kotlinx.coroutines.delay
5451
import kotlinx.coroutines.flow.Flow
@@ -58,16 +55,14 @@ import kotlinx.coroutines.flow.buffer
5855
import kotlinx.coroutines.flow.channelFlow
5956
import kotlinx.coroutines.flow.emitAll
6057
import kotlinx.coroutines.flow.flow
61-
import kotlinx.coroutines.flow.flowOn
6258
import kotlinx.coroutines.flow.map
63-
import kotlinx.coroutines.isActive
6459
import kotlinx.coroutines.launch
6560
import kotlinx.coroutines.withContext
61+
import kotlinx.io.EOFException
6662
import kotlinx.io.readByteArray
6763
import kotlinx.io.readIntLe
6864
import kotlinx.serialization.json.JsonElement
6965
import kotlinx.serialization.json.JsonObject
70-
import kotlinx.serialization.json.JsonPrimitive
7166
import kotlinx.serialization.json.encodeToJsonElement
7267
import kotlin.time.Clock
7368

@@ -855,7 +850,7 @@ internal class StreamingSyncClient(
855850
fun ByteReadChannel.lines(): Flow<String> =
856851
flow {
857852
while (!isClosedForRead) {
858-
val line = readUTF8Line()
853+
val line = readLineStrict()
859854
if (line != null) {
860855
emit(line)
861856
}
@@ -894,10 +889,7 @@ internal class StreamingSyncClient(
894889
if (bytesRead == -1) {
895890
// No bytes available, wait for more
896891
if (isClosedForRead || !awaitContent(1)) {
897-
throw PowerSyncException(
898-
"Unexpected end of response in middle of BSON sync line",
899-
null,
900-
)
892+
throw EOFException()
901893
}
902894
} else {
903895
remaining -= bytesRead

common/src/commonTest/kotlin/powersync/sync/StreamingSyncClientTest.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import dev.mokkery.everySuspend
2323
import dev.mokkery.mock
2424
import io.kotest.matchers.collections.shouldHaveSize
2525
import io.kotest.matchers.shouldBe
26+
import io.kotest.matchers.types.shouldBeTypeOf
2627
import io.ktor.client.HttpClient
2728
import io.ktor.client.engine.mock.MockEngine
2829
import io.ktor.utils.io.ByteChannel
@@ -32,6 +33,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
3233
import kotlinx.coroutines.launch
3334
import kotlinx.coroutines.test.runTest
3435
import kotlinx.coroutines.withTimeout
36+
import kotlinx.io.EOFException
3537
import kotlinx.serialization.json.JsonObject
3638
import kotlin.test.BeforeTest
3739
import kotlin.test.Test
@@ -261,7 +263,7 @@ class StreamingSyncClientTest {
261263
channel.close()
262264

263265
// Still two bytes missing for length
264-
objects.awaitError()
266+
objects.awaitError().shouldBeTypeOf<EOFException>()
265267
}
266268
}
267269

@@ -277,7 +279,7 @@ class StreamingSyncClientTest {
277279
channel.close()
278280

279281
// Still two bytes missing for content
280-
objects.awaitError()
282+
objects.awaitError().shouldBeTypeOf<EOFException>()
281283
}
282284
}
283285
}

0 commit comments

Comments
 (0)