diff --git a/assembly/index.ts b/assembly/index.ts index 79be913..953dbbf 100644 --- a/assembly/index.ts +++ b/assembly/index.ts @@ -13,6 +13,7 @@ export * from "./net/udp"; export * from "./net/util"; export * from "./process"; export * from "./process/bindings"; +export * from "./process/supervisor"; export * from "./registry" export * from "./registry/bindings"; export * from "./util"; diff --git a/assembly/managed/index.ts b/assembly/managed/index.ts index c419949..3409c8b 100644 --- a/assembly/managed/index.ts +++ b/assembly/managed/index.ts @@ -2,94 +2,105 @@ import { Mailbox } from "../message"; import { MessageType } from "../message/util"; import { Process } from "../process"; +/** Represents a shared map using the std lib hashmap. */ export class SharedMap { - private self: Process> = Process.inheritSpawn((mb: Mailbox>) => { + + /** The process that runs the hashmap. */ + private self: Process = Process.inheritSpawn((mb: Mailbox) => { let map = new Map(); while (true) { let message = mb.receive(); if (message.type == MessageType.Data) { let event = message.box!.value; - if (event instanceof GetSharedMapEvent) { - let key = (>event).key; + if (event instanceof GetSharedMapEvent) { + let key = (event).key; message.reply(map.get(key)); } else if (event instanceof SetSharedMapEvent) { let key = (>event).key; let value = (>event).value; map.set(key, value); - } else if (event instanceof DeleteSharedMapEvent) { - let key = (>event).key; + } else if (event instanceof DeleteSharedMapEvent) { + let key = (event).key; map.delete(key); - } else if (event instanceof ClearSharedMapEvent) { + } else if (event instanceof ClearSharedMapEvent) { map.clear(); - } else if (event instanceof HasSharedMapEvent) { - const key = (>event).key; + } else if (event instanceof HasSharedMapEvent) { + const key = (event).key; message.reply(map.has(key)); - } else if (event instanceof SizeSharedMapEvent) { + } else if (event instanceof SizeSharedMapEvent) { message.reply(map.size); - } else if (event instanceof KeysSharedMapEvent) { + } else if (event instanceof KeysSharedMapEvent) { message.reply(map.keys()); - } else if (event instanceof ValuesSharedMapEvent) { + } else if (event instanceof ValuesSharedMapEvent) { message.reply(map.values()); } } } }).expect(); + /** Get a value based on the given key. */ get(key: string): TValue { - let event = new GetSharedMapEvent(key); - let message = this.self.request, TValue>(event); + let event = new GetSharedMapEvent(key); + let message = this.self.request(event); assert(message.type == MessageType.Data); return message.box!.value; } + /** Set a value with a given key. */ set(key: string, value: TValue): this { let event = new SetSharedMapEvent(key, value); this.self.send(event); return this; } + /** Delete a key. */ delete(key: string): void { - let event = new DeleteSharedMapEvent(key); + let event = new DeleteSharedMapEvent(key); this.self.send(event); } + /** Clear the hashmap. */ clear(): void { - const event = new ClearSharedMapEvent(); + const event = new ClearSharedMapEvent(); this.self.send(event); } + /** Check to see if it has the key. */ has(key: string): bool { - const event = new HasSharedMapEvent(key); - const message = this.self.request, bool>(event); + const event = new HasSharedMapEvent(key); + const message = this.self.request(event); assert(message.type == MessageType.Data); return message.box!.value; } + /** Get the size of the hashmap. */ get size(): i32 { - const event = new SizeSharedMapEvent(); - const message = this.self.request, i32>(event); + const event = new SizeSharedMapEvent(); + const message = this.self.request(event); assert(message.type == MessageType.Data); return message.box!.value; } + /** Get all the keys in the hashmap. */ keys(): string[] { - const event = new KeysSharedMapEvent(); - const message = this.self.request, string[]>(event); + const event = new KeysSharedMapEvent(); + const message = this.self.request(event); assert(message.type == MessageType.Data); return message.box!.value; } + /** Get all the values in the hashmap. */ values(): TValue[] { - const event = new ValuesSharedMapEvent(); - const message = this.self.request, TValue[]>(event); + const event = new ValuesSharedMapEvent(); + const message = this.self.request(event); assert(message.type == MessageType.Data); return message.box!.value; } } -abstract class SharedMapEvent {} +abstract class SharedMapEvent {} -class GetSharedMapEvent extends SharedMapEvent { +class GetSharedMapEvent extends SharedMapEvent { constructor( public key: string ) { @@ -97,7 +108,7 @@ class GetSharedMapEvent extends SharedMapEvent { } } -class DeleteSharedMapEvent extends SharedMapEvent { +class DeleteSharedMapEvent extends SharedMapEvent { constructor( public key: string ) { @@ -105,7 +116,7 @@ class DeleteSharedMapEvent extends SharedMapEvent { } } -class SetSharedMapEvent extends SharedMapEvent { +class SetSharedMapEvent extends SharedMapEvent { constructor( public key: string, public value: TValue, @@ -114,16 +125,16 @@ class SetSharedMapEvent extends SharedMapEvent { } } -class ClearSharedMapEvent extends SharedMapEvent {} +class ClearSharedMapEvent extends SharedMapEvent {} -class HasSharedMapEvent extends SharedMapEvent { +class HasSharedMapEvent extends SharedMapEvent { constructor(public key: string) { super(); } } -class SizeSharedMapEvent extends SharedMapEvent {} +class SizeSharedMapEvent extends SharedMapEvent {} -class KeysSharedMapEvent extends SharedMapEvent {} +class KeysSharedMapEvent extends SharedMapEvent {} -class ValuesSharedMapEvent extends SharedMapEvent {} \ No newline at end of file +class ValuesSharedMapEvent extends SharedMapEvent {} diff --git a/assembly/process/supervisor.ts b/assembly/process/supervisor.ts new file mode 100644 index 0000000..87bc7cf --- /dev/null +++ b/assembly/process/supervisor.ts @@ -0,0 +1,192 @@ +import { Process } from "."; +import { Mailbox, Message } from "../message"; +import { MessageType } from "../message/util"; + +/** This class represents an abstract Supervisor Event that the supervisor must handle. */ +export abstract class SupervisorEvent { + abstract handle( + ctx: HandleSupervisorContext, + message: Message>, + ): void; +} + +/** + * This class represents an intention to spawn a single process. It requires a start value, + * and has a handle method that is called by the handleSupervisor process. + */ +export class SupervisorSpawnMessage extends SupervisorEvent { + constructor( + public start: TStart, + ) { + super(); + } + + handle(ctx: HandleSupervisorContext, message: Message>): void { + + // spawn the process + let process = Process.inheritSpawnWith(this.start, ctx.supervisorContext.spawnProcess) + .expect("Could not spawn process"); + + // then reply back to the caller that the process was created + message.reply>(process); + let tag = Process.link(process); + ctx.processes.set(tag, new ProcessState(this.start, process)); + ctx.processList.push(process); + + // and if there's no delegate, delegate it + if (ctx.delegate == null) { + ctx.supervisorContext.onDelegate(process); + ctx.delegate = process; + } + } +} + +/** + * This class represents an intention to send a message to one of the child processes. This + * behavior is configured via the supervisor constructor. + */ +export class SendSupervisorMessage extends SupervisorEvent { + constructor( + public message: TMessage + ) { + super(); + } + + handle(ctx: HandleSupervisorContext, _message: Message>): void { + ctx.supervisorContext.onSend(ctx, this.message); + } +} + +/** This is a helper class for the supervisor process. */ +export class SupervisorContext { + + constructor( + public onDelegate: (process: Process) => void, + public spawnProcess: (start: TStart, mb: Mailbox) => void, + public onSend: (ctx: HandleSupervisorContext, message: TMessage) => void, + ) {} +} + +/** This is the wrapper class that represents a supervisor process. */ +export class Supervisor { + self: Process>; + + constructor( + onDelegate: (process: Process) => void, + spawnProcess: (start: TStart, mb: Mailbox) => void, + /** This callback is responsible for actually sending the messages to the processes in the process list. */ + onSend: (ctx: HandleSupervisorContext, message: TMessage) => void + = (ctx: HandleSupervisorContext, message: TMessage) => { + + // round robin + let length = ctx.processList.length; + let index = ctx.roundRobinIndex; + index++; + if (index >= length) { + index = 0; + } + ctx.processList[index].send(message); + ctx.roundRobinIndex = index; + }, + ) { + // spawn the process + let context = new SupervisorContext(onDelegate, spawnProcess, onSend); + this.self = Process.inheritSpawnWith< + SupervisorContext, + SupervisorEvent + > ( + context, + handleSupervisor + ) + .expect("Could not spawn supervisor."); + } + + /** + * Spawn a process that will be supervised and restarted with the same start value + * each time the process fails. + */ + spawn(startValue: TStart): Process { + // request the process from the supervisor handler + let message = this.self + .request, Process>( + new SupervisorSpawnMessage(startValue) + ); + assert(message.type == MessageType.Data); + return message.unbox(); + } + + /** Send a message to the child processes. */ + send(message: TMessage): void { + this.self.send(new SendSupervisorMessage(message)); + } +} +/** This is a helper class for the supervisor process. */ +export class ProcessState { + constructor( + public startValue: TStart, + public process: Process, + ) {} +} + +/** This is a helper class for the handle supervisor process. */ +export class HandleSupervisorContext { + public roundRobinIndex: i32 = 0; + + constructor( + /** A map of processes that exist based on their tag. */ + public processes: Map>, + /** A list of all the processes. */ + public processList: Process[], + /** The delegated process. */ + public delegate: Process | null, + /** The supervisor context that contains all the callback events. */ + public supervisorContext: SupervisorContext, + ) {} +} + +/** This method is the entry point for a given supervisor. */ +export function handleSupervisor( + context: SupervisorContext, + mb: Mailbox>, +): void { + let handleSupervisorContext = new HandleSupervisorContext( + new Map>(), + [], + null, + context, + ); + + // process loop + while (true) { + let message = mb.receive(); + + if (message.type == MessageType.Data) { + // handle supervisor requests here + let event = message.unbox(); + event.handle(handleSupervisorContext, message); + } else if (message.type == MessageType.Signal) { + // a process died, + let tag = message.tag; + + // get the old process by it's tag + let processContext = handleSupervisorContext.processes.get(tag); + let childProcess = Process.inheritSpawnWith( + processContext.startValue, + context.spawnProcess, + ) + .expect(); + + // delete the old process + handleSupervisorContext.processes.delete(tag); + let index = handleSupervisorContext.processList.indexOf(childProcess); + if (index != -1) { + handleSupervisorContext.processList.splice(index, 1); + } + + // create a new process and set it to the old process context + tag = Process.link(childProcess); + processContext.process = childProcess; + handleSupervisorContext.processes.set(tag, processContext); + } + } +} diff --git a/assembly/process/util.ts b/assembly/process/util.ts index 1309cd4..ac3d255 100644 --- a/assembly/process/util.ts +++ b/assembly/process/util.ts @@ -63,4 +63,4 @@ export class StartWrapper { public start: TStart, public index: usize, ) {} -} \ No newline at end of file +} diff --git a/readme.md b/readme.md index 059dc88..710e9ce 100644 --- a/readme.md +++ b/readme.md @@ -36,15 +36,18 @@ Finally, export a `_start()` function from the entry file, so that the main thre Lunatic errors are represented by an id. They *must* be freed after creation, so `as-lunatic` will bundle it's function return types in a `Result` class. This allows for lunatic errors to be managed by `as-disposable`, and helps prevent memory leaks. For example, when accepting a `TcpStream` from a `TcpListener`, it's possible to check to see if the `TcpListener` is in an errored state. ```ts +import { TCPServer } from "./net/tcp"; +import { IPAddress } from "./net/util"; + export function _start(): void { // bind a tcp server and expect it to open - let server = TCPServer.bindIPv4([127, 0, 0, 1], 10000).expect()!; + let server = TCPServer.bind(IPAddress.v4([127, 0, 0, 1], 10000)).expect(); while (true) { // accept sockets forever let socketResult = server.accept(); // this is a Result if (socketResult.isOk()) { // we can now use this socket resource - let socket = socketResult.expect()!; + let socket = socketResult.expect(); } else { // we can log out the error trace(socketResult.errorString); @@ -99,21 +102,23 @@ Lunatic will create another `Process`, instantiate the current WebAssembly modul To open a TCP server, use the static methods on the `TCPServer` class. ```ts -import { TCPServer, TCPStream } from "as-lunatic"; +import { TCPServer, TCPSocket } from "as-lunatic/assembly"; + -function processSocket(socket: TCPStream, mailbox: Mailbox): void { +function processSocket(socket: TCPSocket, mailbox: Mailbox): void { // do something with the accepted tcp socket here on another thread } export function _start(): void { // bind the server to an ip address and a port - let server = TCPServer.bindIPv4([127, 0, 0, 1], TCP_PORT); + let server = TCPServer.bind(IPAddress.v4([127, 0, 0, 1], 1234)) + .expect(); // blocks until a socket is accepted while (true) { - let socket = server.accept().expect()!;W + let socket = server.accept().expect(); // pass the socket off to another process - Process.spawnInheritWith(stream, processSocket); + Process.inheritSpawnWith(socket, processSocket); } } ``` @@ -121,26 +126,26 @@ export function _start(): void { To open a TCP connection to another server, use a `TCPSocket` connection. ```ts -import { TCPSocket, TCPResultType } from "as-lunatic"; +import { TCPSocket, IPAddress, NetworkResultType } from "as-lunatic/assembly"; export function _start(): void { // connect to an ip and a port - let connection = TCPSocket.connectIPv4(ipAddress, port).expect()!; + let connection = TCPSocket.connect(IPAddress.v4(ipAddress, port)).expect(); - // send a message using a write method - let result = socket.writeBuffer(String.UTF8.encode("Hello world!")); + // send a message using the write method + let result = connection.write(String.UTF8.encode("Hello world!")); - // returns a `Result` + // returns a `Result` switch (result.value) { - case TCPResultType.Error: { + case NetworkResultType.Error: { trace(result.errorString); break; } - case TCPResultType.Closed: { + case NetworkResultType.Closed: { trace("Socket closed"); break; } - case TCPResultType.Success: { + case NetworkResultType.Success: { // bytes written is stored on byteCount trace("Bytes Written", 1, socket.byteCount); break;