Skip to content
287 changes: 237 additions & 50 deletions ts/fast-sync/channels.ts

Large diffs are not rendered by default.

24 changes: 21 additions & 3 deletions ts/fast-sync/chunking.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
getStringChunk,
receiveInChucks,
} from './chunking'
import Interruptable from './interruptable'

describe('Fast sync channel package chunking', () => {
it('should calculate the right chunk count for strings exactly fitting the chunk size', () => {
Expand Down Expand Up @@ -31,8 +32,25 @@ describe('Fast sync channel package chunking', () => {
it('should correctly receive data in chunks', async () => {
const chunks = [`chunk:0:3:ab`, `chunk:1:3:cde`, `chunk:2:3:fghij`]

expect(await receiveInChucks(async () => chunks.shift()!)).toEqual(
'abcdefghij',
)
const interruptable = new Interruptable()
expect(
await receiveInChucks(async () => chunks.shift()!, interruptable),
).toEqual('abcdefghij')
})

it('should throw an exception when cancelled', async () => {
const chunks = [`chunk:0:3:ab`, `chunk:1:3:cde`, `chunk:2:3:fghij`]
let index = -1

const interruptable = new Interruptable({ throwOnCancelled: true })
await expect(
receiveInChucks(async () => {
++index
if (index === 1) {
await interruptable.cancel()
}
return chunks.shift()!
}, interruptable),
).rejects.toThrow('Tried to execute code on a cancelled interruptable')
})
})
48 changes: 41 additions & 7 deletions ts/fast-sync/chunking.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { splitWithTail } from './utils'
import Interruptable from './interruptable'

export function calculateStringChunkCount(
s: string,
Expand All @@ -17,21 +18,20 @@ export function getStringChunk(

export async function receiveInChucks(
receiveChunk: () => Promise<string>,
interruptable: Interruptable,
): Promise<string> {
let data: string[] = []
let expectedChunkCount: null | number = null

while (true) {
const chunk = await receiveChunk()

const processChunk = (chunk: string) => {
const [
chunkConfirmation,
chunkIndexString,
chunkCountString,
chunkContent,
] = splitWithTail(chunk, ':', 4)
if (chunkConfirmation !== 'chunk') {
throw new Error(`Invalid WebRTC package received: ${chunk}`)
throw new Error(`Invalid WebRTC chunk package received: ${chunk}`)
}

const chunkIndex = parseInt(chunkIndexString)
Expand Down Expand Up @@ -67,10 +67,44 @@ export async function receiveInChucks(
}

data.push(chunkContent)
if (data.length === expectedChunkCount) {
break
}
return { finished: data.length === expectedChunkCount }
}

let running = true
await interruptable.whileLoop(
async () => running,
async () => {
const chunk = (await interruptable.execute(receiveChunk))!
const result = await interruptable.execute(async () =>
processChunk(chunk),
)
if (result?.finished) {
running = false
}
},
)

return data.join('')
}

export async function sendInChunks(
message: string,
send: (chunk: string) => Promise<void>,
options: {
interruptable: Interruptable
chunkSize: number
},
) {
const chunkCount = calculateStringChunkCount(message, options)
let chunkIndex = -1
await options.interruptable.whileLoop(
async () => chunkIndex < chunkCount,
async () => {
chunkIndex += 1
const chunkContent = getStringChunk(message, chunkIndex, {
chunkSize: options.chunkSize,
})
await send(`chunk:${chunkIndex}:${chunkCount}:${chunkContent}`)
},
)
}
4 changes: 3 additions & 1 deletion ts/fast-sync/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ async function createWebRTCSyncChannels(options: {
])

return {
senderChannel: new WebRTCFastSyncChannel({ peer: peers[0] }),
senderChannel: new WebRTCFastSyncChannel({
peer: peers[0],
}),
receiverChannel: new WebRTCFastSyncChannel({
peer: peers[1],
}),
Expand Down
23 changes: 15 additions & 8 deletions ts/fast-sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import TypedEmitter from 'typed-emitter'
import StorageManager from '@worldbrain/storex'
import {
FastSyncInfo,
FastSyncChannelEvents,
FastSyncProgress,
FastSyncChannel,
FastSyncRole,
Expand All @@ -29,15 +30,12 @@ export interface FastSyncPreSendProcessorParams {
object: any
}

export interface FastSyncEvents {
export interface FastSyncEvents extends FastSyncChannelEvents {
prepared: (event: { syncInfo: FastSyncInfo; role: FastSyncRole }) => void
progress: (event: {
progress: FastSyncProgress
role: FastSyncRole
}) => void
stalled: () => void
paused: () => void
resumed: () => void
roleSwitch: (event: { before: FastSyncRole; after: FastSyncRole }) => void
error: (event: { error: string }) => void
}
Expand All @@ -59,6 +57,7 @@ export class FastSync {

constructor(private options: FastSyncOptions) {
this.totalObjectsProcessed = 0
this.forwardReconnectEvents()
}

get state() {
Expand Down Expand Up @@ -104,9 +103,9 @@ export class FastSync {
async send(options: { role: FastSyncRole; fastSyncInfo?: FastSyncInfo }) {
const { channel } = this.options

channel.events.on('stalled', () => this.events.emit('stalled'))
const interruptable = (this.interruptable = new Interruptable())
this._state = 'running'

try {
const syncInfo =
options.fastSyncInfo ||
Expand Down Expand Up @@ -185,6 +184,15 @@ export class FastSync {
})
}

private forwardReconnectEvents() {
const events: any[] = ['stalled', 'reconnect', 'reconnected']
for (const event of events) {
this.options.channel.events.on(event, (e: any) =>
this.events.emit(event, e),
)
}
}

async _preproccesObjects(params: { collection: string; objects: any[] }) {
const preSendProcessor = this.options.preSendProcessor
if (!preSendProcessor) {
Expand Down Expand Up @@ -213,11 +221,10 @@ export class FastSync {
this._state = state === 'paused' ? 'paused' : 'running'
this.events.emit(state)
}
this.options.channel.events.on('stalled', () =>
this.events.emit('stalled'),
)

this.options.channel.events.on('paused', stateChangeHandler('paused'))
this.options.channel.events.on('resumed', stateChangeHandler('resumed'))

try {
const syncInfo = await this.options.channel.receiveSyncInfo()
this.events.emit('prepared', { syncInfo, role: options.role })
Expand Down
11 changes: 10 additions & 1 deletion ts/fast-sync/interruptable.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { ResolvablePromise, resolvablePromise } from './utils'

export class InterruptableCancelledError extends Error {}

export default class Interruptable {
cancelled: boolean = false
private pausePromise: ResolvablePromise<void> | null = null // only set if paused, resolved when pause ends

constructor(private options?: { throwOnCancelled: boolean }) {}

get paused(): boolean {
return !!this.pausePromise
}
Expand Down Expand Up @@ -73,7 +77,7 @@ export default class Interruptable {
}
}

async execute(f: () => Promise<void>) {
async execute<ReturnValue>(f: () => Promise<ReturnValue | undefined>) {
if (await this._shouldCancelAfterWaitingForPause()) {
return
}
Expand All @@ -85,6 +89,11 @@ export default class Interruptable {
if (this.pausePromise) {
await this.pausePromise.promise
}
if (this.cancelled && this.options?.throwOnCancelled) {
throw new InterruptableCancelledError(
'Tried to execute code on a cancelled interruptable',
)
}
return this.cancelled
}
}
22 changes: 14 additions & 8 deletions ts/fast-sync/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ export type FastSyncRole = 'sender' | 'receiver'
export const flippedRole = (role: FastSyncRole): FastSyncRole =>
role === 'sender' ? 'receiver' : 'sender'
export type FastSyncOrder = 'receive-first' | 'send-first'
export type FastSyncPackage<UserPackageType = any> =
| { type: 'batch'; batch: any }
| { type: 'confirm' }
| { type: 'state-change'; state: 'paused' | 'running' }
| { type: 'sync-info'; info: FastSyncInfo }
| { type: 'finish' }
| { type: 'user-package'; package: UserPackageType }
export type FastSyncPackage<
UserPackageType = any,
WithIndex extends boolean = true
> = (WithIndex extends true ? { index: number } : {}) &
(
| { type: 'sync-info'; info: FastSyncInfo }
| { type: 'batch'; batch: any }
| { type: 'finish' }
| { type: 'state-change'; state: 'paused' | 'running' }
| { type: 'user-package'; package: UserPackageType }
| { type: 'confirm' }
)

export interface FastSyncChannelEvents {
stalled: () => void
paused: () => void
resumed: () => void
paused: () => void
}
export interface FastSyncChannel<UserPackageType = any> {
timeoutInMiliseconds: number
Expand Down
2 changes: 1 addition & 1 deletion ts/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ if (process.env.TEST_SYNC_FIRESTORE === 'true') {
{
sharedSyncLog: ({ storageManager }) =>
new SharedSyncLogStorage({
storageManager,
storageManager: storageManager as any,
autoPkType: 'string',
excludeTimestampChecks:
!options || !options.includeTimestampChecks,
Expand Down
Loading